Skip to content
GitLab
Explore
Sign in
Register
Primary navigation
Search or go to…
Project
T
tango
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Iterations
Wiki
Requirements
Jira issues
Open Jira
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Locked files
Build
Pipelines
Jobs
Pipeline schedules
Test cases
Artifacts
Deploy
Releases
Package Registry
Container Registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Code review analytics
Issue analytics
Insights
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
LOFAR2.0
tango
Commits
1dbd76bc
Commit
1dbd76bc
authored
3 years ago
by
Taya Snijder
Browse files
Options
Downloads
Plain Diff
merged origin
parents
fa8254be
7b6de01d
No related branches found
No related tags found
1 merge request
!24
Resolve #2021 "04 16 branched from master ini file device"
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
devices/APSCTL.py
+0
-190
0 additions, 190 deletions
devices/APSCTL.py
devices/examples/HW_device_template.py
+5
-0
5 additions, 0 deletions
devices/examples/HW_device_template.py
with
5 additions
and
190 deletions
devices/APSCTL.py
deleted
100644 → 0
+
0
−
190
View file @
fa8254be
# -*- coding: utf-8 -*-
#
# This file is part of the SDP project
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
"""
SDP Device Server for LOFAR2.0
"""
# PyTango imports
from
tango.server
import
run
from
tango.server
import
device_property
from
tango
import
AttrWriteType
# Additional import
from
clients.opcua_connection
import
OPCUAConnection
from
util.attribute_wrapper
import
attribute_wrapper
from
util.hardware_device
import
hardware_device
from
util.lofar_logging
import
device_logging_to_python
,
log_exceptions
import
numpy
__all__
=
[
"
APSCTL
"
,
"
main
"
]
@device_logging_to_python
({
"
device
"
:
"
APSCTL
"
})
class
APSCTL
(
hardware_device
):
"""
**Properties:**
- Device Property
OPC_Server_Name
- Type:
'
DevString
'
OPC_Server_Port
- Type:
'
DevULong
'
OPC_Time_Out
- Type:
'
DevDouble
'
"""
# -----------------
# Device Properties
# -----------------
OPC_Server_Name
=
device_property
(
dtype
=
'
DevString
'
,
mandatory
=
True
)
OPC_Server_Port
=
device_property
(
dtype
=
'
DevULong
'
,
mandatory
=
True
)
OPC_Time_Out
=
device_property
(
dtype
=
'
DevDouble
'
,
mandatory
=
True
)
# ----------
# Attributes
# ----------
N_unb
=
2
N_fpga
=
4
N_ddr
=
2
N_qsfp
=
6
# Central CP per Uniboard
UNB2_Power_ON_OFF_RW
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_Power_ON_OFF_RW
"
],
datatype
=
numpy
.
bool_
,
dims
=
(
N_unb
,),
access
=
AttrWriteType
.
READ_WRITE
)
UNB2_Front_Panel_LED_RW
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_Front_Panel_LED_RW
"
],
datatype
=
numpy
.
uint8
,
dims
=
(
N_unb
,),
access
=
AttrWriteType
.
READ_WRITE
)
UNB2_Mask_RW
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_Mask_RW
"
],
datatype
=
numpy
.
bool_
,
dims
=
(
N_unb
,),
access
=
AttrWriteType
.
READ_WRITE
)
# Central MP per Uniboard
UNB2_I2C_bus_OK_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_I2C_bus_OK_R
"
],
datatype
=
numpy
.
bool_
,
dims
=
(
N_unb
,))
UNB2_Front_Panel_LED_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_Front_Panel_LED_R
"
],
datatype
=
numpy
.
uint8
,
dims
=
(
N_unb
,))
UNB2_EEPROM_Serial_Number_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_EEPROM_Serial_Number_R
"
],
datatype
=
numpy
.
str
,
dims
=
(
N_unb
,))
UNB2_EEPROM_Unique_ID_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_EEPROM_Unique_ID_R
"
],
datatype
=
numpy
.
uint32
,
dims
=
(
N_unb
,))
UNB2_DC_DC_48V_12V_VIN_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_DC_DC_48V_12V_VIN_R
"
],
datatype
=
numpy
.
double
,
dims
=
(
N_unb
,))
UNB2_DC_DC_48V_12V_VOUT_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_DC_DC_48V_12V_VOUT_R
"
],
datatype
=
numpy
.
double
,
dims
=
(
N_unb
,))
UNB2_DC_DC_48V_12V_IOUT_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_DC_DC_48V_12V_IOUT_R
"
],
datatype
=
numpy
.
double
,
dims
=
(
N_unb
,))
UNB2_DC_DC_48V_12V_TEMP_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_DC_DC_48V_12V_TEMP_R
"
],
datatype
=
numpy
.
double
,
dims
=
(
N_unb
,))
UNB2_POL_QSFP_N01_VOUT_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_POL_QSFP_N01_VOUT_R
"
],
datatype
=
numpy
.
double
,
dims
=
(
N_unb
,))
UNB2_POL_QSFP_N01_IOUT_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_POL_QSFP_N01_IOUT_R
"
],
datatype
=
numpy
.
double
,
dims
=
(
N_unb
,))
UNB2_POL_QSFP_N01_TEMP_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_POL_QSFP_N01_TEMP_R
"
],
datatype
=
numpy
.
double
,
dims
=
(
N_unb
,))
UNB2_POL_QSFP_N23_VOUT_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_POL_QSFP_N23_VOUT_R
"
],
datatype
=
numpy
.
double
,
dims
=
(
N_unb
,))
UNB2_POL_QSFP_N23_IOUT_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_POL_QSFP_N23_IOUT_R
"
],
datatype
=
numpy
.
double
,
dims
=
(
N_unb
,))
UNB2_POL_QSFP_N23_TEMP_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_POL_QSFP_N23_TEMP_R
"
],
datatype
=
numpy
.
double
,
dims
=
(
N_unb
,))
UNB2_POL_SWITCH_1V2_VOUT_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_POL_SWITCH_1V2_VOUT_R
"
],
datatype
=
numpy
.
double
,
dims
=
(
N_unb
,))
UNB2_POL_SWITCH_1V2_IOUT_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_POL_SWITCH_1V2_IOUT_R
"
],
datatype
=
numpy
.
double
,
dims
=
(
N_unb
,))
UNB2_POL_SWITCH_1V2_TEMP_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_POL_SWITCH_1V2_TEMP_R
"
],
datatype
=
numpy
.
double
,
dims
=
(
N_unb
,))
UNB2_POL_SWITCH_PHY_VOUT_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_POL_SWITCH_PHY_VOUT_R
"
],
datatype
=
numpy
.
double
,
dims
=
(
N_unb
,))
UNB2_POL_SWITCH_PHY_IOUT_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_POL_SWITCH_PHY_IOUT_R
"
],
datatype
=
numpy
.
double
,
dims
=
(
N_unb
,))
UNB2_POL_SWITCH_PHY_TEMP_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_POL_SWITCH_PHY_TEMP_R
"
],
datatype
=
numpy
.
double
,
dims
=
(
N_unb
,))
UNB2_POL_CLOCK_VOUT_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_POL_CLOCK_VOUT_R
"
],
datatype
=
numpy
.
double
,
dims
=
(
N_unb
,))
UNB2_POL_CLOCK_IOUT_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_POL_CLOCK_IOUT_R
"
],
datatype
=
numpy
.
double
,
dims
=
(
N_unb
,))
UNB2_POL_CLOCK_TEMP_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_POL_CLOCK_TEMP_R
"
],
datatype
=
numpy
.
double
,
dims
=
(
N_unb
,))
# monitor points per FPGA
UNB2_FPGA_DDR4_SLOT_TEMP_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_FPGA_DDR4_SLOT_TEMP_R
"
],
datatype
=
numpy
.
double
,
dims
=
(
N_unb
,
N_fpga
))
UNB2_FPGA_DDR4_SLOT_PART_NUMBER_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_FPGA_DDR4_SLOT_PART_NUMBER_R
"
],
datatype
=
numpy
.
str
,
dims
=
(
N_unb
*
N_qsfp
,
N_fpga
))
UNB2_FPGA_QSFP_CAGE_0_TEMP_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_FPGA_QSFP_CAGE_0_TEMP_R
"
],
datatype
=
numpy
.
double
,
dims
=
(
N_unb
,
N_fpga
))
UNB2_FPGA_QSFP_CAGE_1_TEMP_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_FPGA_QSFP_CAGE_1_TEMP_R
"
],
datatype
=
numpy
.
double
,
dims
=
(
N_unb
,
N_fpga
))
UNB2_FPGA_QSFP_CAGE_2_TEMP_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_FPGA_QSFP_CAGE_2_TEMP_R
"
],
datatype
=
numpy
.
double
,
dims
=
(
N_unb
,
N_fpga
))
UNB2_FPGA_QSFP_CAGE_3_TEMP_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_FPGA_QSFP_CAGE_3_TEMP_R
"
],
datatype
=
numpy
.
double
,
dims
=
(
N_unb
,
N_fpga
))
UNB2_FPGA_QSFP_CAGE_4_TEMP_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_FPGA_QSFP_CAGE_4_TEMP_R
"
],
datatype
=
numpy
.
double
,
dims
=
(
N_unb
,
N_fpga
))
UNB2_FPGA_QSFP_CAGE_5_TEMP_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_FPGA_QSFP_CAGE_5_TEMP_R
"
],
datatype
=
numpy
.
double
,
dims
=
(
N_unb
,
N_fpga
))
UNB2_FPGA_QSFP_CAGE_0_LOS_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_FPGA_QSFP_CAGE_0_LOS_R
"
],
datatype
=
numpy
.
uint8
,
dims
=
(
N_unb
,
N_fpga
))
UNB2_FPGA_QSFP_CAGE_1_LOS_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_FPGA_QSFP_CAGE_1_LOS_R
"
],
datatype
=
numpy
.
uint8
,
dims
=
(
N_unb
,
N_fpga
))
UNB2_FPGA_QSFP_CAGE_2_LOS_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_FPGA_QSFP_CAGE_2_LOS_R
"
],
datatype
=
numpy
.
uint8
,
dims
=
(
N_unb
,
N_fpga
))
UNB2_FPGA_QSFP_CAGE_3_LOS_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_FPGA_QSFP_CAGE_3_LOS_R
"
],
datatype
=
numpy
.
uint8
,
dims
=
(
N_unb
,
N_fpga
))
UNB2_FPGA_QSFP_CAGE_4_LOS_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_FPGA_QSFP_CAGE_4_LOS_R
"
],
datatype
=
numpy
.
uint8
,
dims
=
(
N_unb
,
N_fpga
))
UNB2_FPGA_QSFP_CAGE_5_LOS_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_FPGA_QSFP_CAGE_5_LOS_R
"
],
datatype
=
numpy
.
uint8
,
dims
=
(
N_unb
,
N_fpga
))
UNB2_FPGA_POL_CORE_VOUT_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_FPGA_POL_CORE_VOUT_R
"
],
datatype
=
numpy
.
double
,
dims
=
(
N_unb
,
N_fpga
))
UNB2_FPGA_POL_CORE_IOUT_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_FPGA_POL_CORE_IOUT_R
"
],
datatype
=
numpy
.
double
,
dims
=
(
N_unb
,
N_fpga
))
UNB2_FPGA_POL_CORE_TEMP_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_FPGA_POL_CORE_TEMP_R
"
],
datatype
=
numpy
.
double
,
dims
=
(
N_unb
,
N_fpga
))
UNB2_FPGA_POL_ERAM_VOUT_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_FPGA_POL_ERAM_VOUT_R
"
],
datatype
=
numpy
.
double
,
dims
=
(
N_unb
,
N_fpga
))
UNB2_FPGA_POL_ERAM_IOUT_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_FPGA_POL_ERAM_IOUT_R
"
],
datatype
=
numpy
.
double
,
dims
=
(
N_unb
,
N_fpga
))
UNB2_FPGA_POL_ERAM_TEMP_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_FPGA_POL_ERAM_TEMP_R
"
],
datatype
=
numpy
.
double
,
dims
=
(
N_unb
,
N_fpga
))
UNB2_FPGA_POL_RXGXB_VOUT_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_FPGA_POL_RXGXB_VOUT_R
"
],
datatype
=
numpy
.
double
,
dims
=
(
N_unb
,
N_fpga
))
UNB2_FPGA_POL_RXGXB_IOUT_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_FPGA_POL_RXGXB_IOUT_R
"
],
datatype
=
numpy
.
double
,
dims
=
(
N_unb
,
N_fpga
))
UNB2_FPGA_POL_RXGXB_TEMP_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_FPGA_POL_RXGXB_TEMP_R
"
],
datatype
=
numpy
.
double
,
dims
=
(
N_unb
,
N_fpga
))
UNB2_FPGA_POL_TXGXB_VOUT_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_FPGA_POL_TXGXB_VOUT_R
"
],
datatype
=
numpy
.
double
,
dims
=
(
N_unb
,
N_fpga
))
UNB2_FPGA_POL_TXGXB_IOUT_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_FPGA_POL_TXGXB_IOUT_R
"
],
datatype
=
numpy
.
double
,
dims
=
(
N_unb
,
N_fpga
))
UNB2_FPGA_POL_TXGXB_TEMP_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:PCC
"
,
"
2:UNB2_FPGA_POL_TXGXB_TEMP_R
"
],
datatype
=
numpy
.
double
,
dims
=
(
N_unb
,
N_fpga
))
UNB2_FPGA_POL_HGXB_VOUT_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:UNB2_FPGA_POL_HGXB_VOUT_R
"
],
datatype
=
numpy
.
double
,
dims
=
(
N_unb
,
N_fpga
))
UNB2_FPGA_POL_HGXB_IOUT_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:UNB2_FPGA_POL_HGXB_IOUT_R
"
],
datatype
=
numpy
.
double
,
dims
=
(
N_unb
,
N_fpga
))
UNB2_FPGA_POL_HGXB_TEMP_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:UNB2_FPGA_POL_HGXB_TEMP_R
"
],
datatype
=
numpy
.
double
,
dims
=
(
N_unb
,
N_fpga
))
UNB2_FPGA_POL_PGM_VOUT_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:UNB2_FPGA_POL_PGM_VOUT_R
"
],
datatype
=
numpy
.
double
,
dims
=
(
N_unb
,
N_fpga
))
UNB2_FPGA_POL_PGM_IOUT_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:UNB2_FPGA_POL_PGM_IOUT_R
"
],
datatype
=
numpy
.
double
,
dims
=
(
N_unb
,
N_fpga
))
UNB2_FPGA_POL_PGM_TEMP_R
=
attribute_wrapper
(
comms_annotation
=
[
"
2:UNB2_FPGA_POL_PGM_TEMP_R
"
],
datatype
=
numpy
.
double
,
dims
=
(
N_unb
,
N_fpga
))
@log_exceptions
()
def
delete_device
(
self
):
"""
Hook to delete resources allocated in init_device.
This method allows for any memory or other resources allocated in the
init_device method to be released. This method is called by the device
destructor and by the device Init command (a Tango built-in).
"""
self
.
debug_stream
(
"
Shutting down...
"
)
self
.
Off
()
self
.
debug_stream
(
"
Shut down. Good bye.
"
)
# --------
# overloaded functions
# --------
@log_exceptions
()
def
configure_for_off
(
self
):
"""
user code here. is called when the state is set to OFF
"""
# Stop keep-alive
self
.
opcua_connection
.
stop
()
@log_exceptions
()
def
configure_for_initialise
(
self
):
"""
user code here. is called when the sate is set to INIT
"""
"""
Initialises the attributes and properties of the PCC.
"""
# set up the OPC ua client
self
.
OPCua_client
=
OPCUAConnection
(
"
opc.tcp://{}:{}/
"
.
format
(
self
.
OPC_Server_Name
,
self
.
OPC_Server_Port
),
"
http://lofar.eu
"
,
self
.
OPC_Time_Out
,
self
.
Fault
,
self
)
# map an access helper class
for
i
in
self
.
attr_list
():
try
:
i
.
set_comm_client
(
self
.
OPCua_client
)
except
:
self
.
debug_stream
(
"
error in getting APSCTL attribute: {} from client
"
.
format
(
i
))
self
.
OPCua_client
.
start
()
# --------
# Commands
# --------
# ----------
# Run server
# ----------
def
main
(
args
=
None
,
**
kwargs
):
"""
Main function of the SDP module.
"""
return
run
((
APSCTL
,),
args
=
args
,
**
kwargs
)
if
__name__
==
'
__main__
'
:
main
()
This diff is collapsed.
Click to expand it.
devices/examples/HW_device_template.py
+
5
−
0
View file @
1dbd76bc
...
...
@@ -14,8 +14,13 @@ from tango.server import run
from
tango
import
AttrWriteType
# Additional import
<<<<<<<
HEAD
:
devices
/
HW_device_template
.
py
from
src.attribute_wrapper
import
*
from
src.hardware_device
import
*
=======
from
util.attribute_wrapper
import
attribute_wrapper
from
util.hardware_device
import
hardware_device
>>>>>>>
master
:
devices
/
examples
/
HW_device_template
.
py
__all__
=
[
"
HW_dev
"
]
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment