Skip to content
Snippets Groups Projects

L2 sdp 349

Merged Pieter Donker requested to merge L2SDP-349 into master
All threads resolved!
2 files
+ 163
73
Compare changes
  • Side-by-side
  • Inline
Files
2
+ 147
73
@@ -19,11 +19,23 @@
# ##########################################################################
# Author:
# . [name]
# . Leon Hiemstra, Pieter Donker
# Purpose:
# . " "
# . script to test (control/monitor) opc-ua server
# Description:
# . " "
# Client.py is used to connect (TCP/IP) to an opc-ua server and test (read/write)
# opc-ua points (registers on a fpga).
# For LTS the server is running on DOP36 port 4840, this client can not run
# on DOP36 itself (too old system).
#
# In this version the following is working:
# . --info, read info from server, list all available points on the server.
# . --all, read all *_R point and print needed time.
# . --wg, turn off WG or setup for XST (to generate crosslets plot).
# . --setup, setup SST, BST and XST stream.
# . --stream, turn off or on SST, BST or XST stream.
#
# . run ./Client.py -h for help
# ##########################################################################
import sys
@@ -199,88 +211,134 @@ def read_subband_weights(obj):
vals = var.get_value()
@timing
def write_subband_weights(obj):
weights = [8192, 0, 0, 0] * N_SUB * N_NODES * (S_PN//4)
#weights = [8192] * N_SUB * N_NODES * S_PN
print(len(weights))
def setup_wg_xst_mode(obj):
'''
setup wg for xst stream
'''
# Write WG configuration with phases from 0 - 360 and 1 to 1/12 amplitudes
wg_ampl = [0.01 * (1 - (i / S_PN)) for i in range(S_PN)] * N_NODES
var = obj.get_child("2:FPGA_wg_amplitude_RW")
var.set_value(ua.Variant(value=list(wg_ampl), varianttype=ua.VariantType.Double))
wg_phase = [i * (360 / S_PN) for i in range(S_PN)] * N_NODES
var = obj.get_child("2:FPGA_wg_phase_RW")
var.set_value(ua.Variant(value=list(wg_phase), varianttype=ua.VariantType.Double))
wg_freq = [(102 / 1024) * 200e6 for i in range(S_PN)] * N_NODES
var = obj.get_child("2:FPGA_wg_frequency_RW")
var.set_value(ua.Variant(value=list(wg_freq), varianttype=ua.VariantType.Double))
print("wg on")
enable = [True for i in range(S_PN)] * N_NODES # enable wg
var = obj.get_child("2:FPGA_wg_enable_RW")
var.set_value(ua.Variant(value=list(enable), varianttype=ua.VariantType.Boolean))
def turn_wg_off(obj):
print("wg off")
enable = [False for i in range(S_PN)] * N_NODES # enable wg
var = obj.get_child("2:FPGA_wg_enable_RW")
var.set_value(ua.Variant(value=list(enable), varianttype=ua.VariantType.Boolean))
def setup_sst_stream(obj):
'''
setup bst stream, use git/upe_gear/peripherals/pi_satistics_stream.py for recording and plotting.
- pi_statistic_stream.py --unb2 2 --pn2 0 --cmd 4 -s=BST
'''
port = [5001] * N_NODES # use port 5002
var = obj.get_child("2:FPGA_sst_offload_hdr_udp_destination_port_RW")
var.set_value(ua.Variant(value=list(port), varianttype=ua.VariantType.UInt16))
dest_mac = ["00:1B:21:71:76:B9"] * N_NODES # use mac of dop36
var = obj.get_child("2:FPGA_sst_offload_hdr_eth_destination_mac_RW")
var.set_value(ua.Variant(value=list(dest_mac), varianttype=ua.VariantType.String))
dest_addr = ["10.99.0.254"] * N_NODES # use addr of dop36
var = obj.get_child("2:FPGA_sst_offload_hdr_ip_destination_address_RW")
var.set_value(ua.Variant(value=list(dest_addr), varianttype=ua.VariantType.String))
weights = [8192] * N_SUB * N_NODES * S_PN # set weights for subbands to default
var = obj.get_child("2:FPGA_subband_weights_RW")
#dv = ua.DataValue(ua.Variant(value=weights, varianttype=ua.VariantType.UInt32))
#dv.ServerTimestamp = None
#dv.SourceTimestamp = None
var.set_value(ua.Variant(value=list(weights), varianttype=ua.VariantType.UInt32))
def write_xst_subband_select(obj):
subsel = [7, 102, 103, 104, 105, 106, 107, 108] * N_NODES # fill all fields
var = obj.get_child("2:FPGA_xst_subband_select_RW")
var.set_value(ua.Variant(value=list(subsel), varianttype=ua.VariantType.UInt32))
enable = [True] * N_NODES # enable weighted subbands
var = obj.get_child("2:FPGA_sst_offload_weighted_subbands_RW")
var.set_value(ua.Variant(value=list(enable), varianttype=ua.VariantType.Boolean))
def write_xst_wg_setup(obj, on=False):
def setup_bst_stream(obj):
'''
setup wg for xst stream
setup bst stream, use git/upe_gear/peripherals/pi_satistics_stream.py for recording and plotting.
- pi_statistic_stream.py --unb2 2 --pn2 0 --cmd 4 -s=BST
'''
if on is True:
# Write WG configuration with phases from 0 - 360 and 1 to 1/12 amplitudes
wg_ampl = [0.01 * (1 - (i / S_PN)) for i in range(S_PN)] * N_NODES
var = obj.get_child("2:FPGA_wg_amplitude_RW")
var.set_value(ua.Variant(value=list(wg_ampl), varianttype=ua.VariantType.Double))
print(len(wg_ampl))
wg_phase = [i * (360 / S_PN) for i in range(S_PN)] * N_NODES
var = obj.get_child("2:FPGA_wg_phase_RW")
var.set_value(ua.Variant(value=list(wg_phase), varianttype=ua.VariantType.Double))
wg_freq = [(102 / 1024) * 200e6 for i in range(S_PN)] * N_NODES
var = obj.get_child("2:FPGA_wg_frequency_RW")
var.set_value(ua.Variant(value=list(wg_freq), varianttype=ua.VariantType.Double))
enable = [True for i in range(S_PN)] * N_NODES # enable wg
var = obj.get_child("2:FPGA_wg_enable_RW")
var.set_value(ua.Variant(value=list(enable), varianttype=ua.VariantType.Boolean))
else:
enable = [False for i in range(S_PN)] * N_NODES # enable wg
var = obj.get_child("2:FPGA_wg_enable_RW")
var.set_value(ua.Variant(value=list(enable), varianttype=ua.VariantType.Boolean))
port = [5002] * N_NODES # use port 5002
var = obj.get_child("2:FPGA_bst_offload_hdr_udp_destination_port_RW")
var.set_value(ua.Variant(value=list(port), varianttype=ua.VariantType.UInt16))
dest_mac = ["00:1B:21:71:76:B9"] * N_NODES # use mac of dop36
var = obj.get_child("2:FPGA_bst_offload_hdr_eth_destination_mac_RW")
var.set_value(ua.Variant(value=list(dest_mac), varianttype=ua.VariantType.String))
def write_xst_stream_setup(obj, on=False):
dest_addr = ["10.99.0.254"] * N_NODES # use addr of dop36
var = obj.get_child("2:FPGA_bst_offload_hdr_ip_destination_address_RW")
var.set_value(ua.Variant(value=list(dest_addr), varianttype=ua.VariantType.String))
def setup_xst_stream(obj):
'''
setup xst stream, use git/upe_gear/peripherals/pi_satistics_stream.py for recording and plotting.
- pi_statistic_stream.py --unb2 2 --pn2 0 --cmd 4 -s=XST
'''
if on is True:
port = [5003] * N_NODES # use port 5001
var = obj.get_child("2:FPGA_xst_offload_hdr_udp_destination_port_RW")
var.set_value(ua.Variant(value=list(port), varianttype=ua.VariantType.UInt16))
port = [5003] * N_NODES # use port 5001
var = obj.get_child("2:FPGA_xst_offload_hdr_udp_destination_port_RW")
var.set_value(ua.Variant(value=list(port), varianttype=ua.VariantType.UInt16))
dest_mac = ["00:1B:21:71:76:B9"] * N_NODES # use mac of dop36
var = obj.get_child("2:FPGA_xst_offload_hdr_eth_destination_mac_RW")
var.set_value(ua.Variant(value=list(dest_mac), varianttype=ua.VariantType.String))
dest_mac = ["00:1B:21:71:76:B9"] * N_NODES # use mac of dop36
var = obj.get_child("2:FPGA_xst_offload_hdr_eth_destination_mac_RW")
var.set_value(ua.Variant(value=list(dest_mac), varianttype=ua.VariantType.String))
dest_addr = ["10.99.0.254"] * N_NODES # use addr of dop36
var = obj.get_child("2:FPGA_xst_offload_hdr_ip_destination_address_RW")
var.set_value(ua.Variant(value=list(dest_addr), varianttype=ua.VariantType.String))
dest_addr = ["10.99.0.254"] * N_NODES # use addr of dop36
var = obj.get_child("2:FPGA_xst_offload_hdr_ip_destination_address_RW")
var.set_value(ua.Variant(value=list(dest_addr), varianttype=ua.VariantType.String))
subsel = [0, 102, 0, 0, 0, 0, 0, 0] * N_NODES # use subband 102 only
var = obj.get_child("2:FPGA_xst_subband_select_RW")
var.set_value(ua.Variant(value=list(subsel), varianttype=ua.VariantType.UInt32))
subsel = [0, 102, 0, 0, 0, 0, 0, 0] * N_NODES # use subband 102 only
var = obj.get_child("2:FPGA_xst_subband_select_RW")
var.set_value(ua.Variant(value=list(subsel), varianttype=ua.VariantType.UInt32))
interval = [1.0] * N_NODES # use fixed interval 1.0 second
var = obj.get_child("2:FPGA_xst_integration_interval_RW")
var.set_value(ua.Variant(value=list(interval), varianttype=ua.VariantType.Double))
interval = [1.0] * N_NODES # use fixed interval 1.0 second
var = obj.get_child("2:FPGA_xst_integration_interval_RW")
var.set_value(ua.Variant(value=list(interval), varianttype=ua.VariantType.Double))
processing = [True] * N_NODES # enable processing
var = obj.get_child("2:FPGA_xst_processing_enable_RW")
var.set_value(ua.Variant(value=list(processing), varianttype=ua.VariantType.Boolean))
processing = [True] * N_NODES # enable processing
var = obj.get_child("2:FPGA_xst_processing_enable_RW")
var.set_value(ua.Variant(value=list(processing), varianttype=ua.VariantType.Boolean))
enable = [True] * N_NODES # enable offload
def set_enable_stream(obj, mode):
if mode == 'OFF':
enable = [False] * N_NODES # disable offload
var = obj.get_child("2:FPGA_sst_offload_enable_RW")
var.set_value(ua.Variant(value=list(enable), varianttype=ua.VariantType.Boolean))
var = obj.get_child("2:FPGA_bst_offload_enable_RW")
var.set_value(ua.Variant(value=list(enable), varianttype=ua.VariantType.Boolean))
var = obj.get_child("2:FPGA_xst_offload_enable_RW")
var.set_value(ua.Variant(value=list(enable), varianttype=ua.VariantType.Boolean))
else:
enable = [False] * N_NODES # enable offload
return
enable = [True] * N_NODES # enable offload
if mode == 'SST':
var = obj.get_child("2:FPGA_sst_offload_enable_RW")
var.set_value(ua.Variant(value=list(enable), varianttype=ua.VariantType.Boolean))
elif mode == 'BST':
var = obj.get_child("2:FPGA_bst_offload_enable_RW")
var.set_value(ua.Variant(value=list(enable), varianttype=ua.VariantType.Boolean))
elif mode == 'XST':
var = obj.get_child("2:FPGA_xst_offload_enable_RW")
var.set_value(ua.Variant(value=list(enable), varianttype=ua.VariantType.Boolean))
else:
print('Wrong mode "{}"'.format(mode))
return
if __name__ == "__main__":
# Parse command line arguments
@@ -288,8 +346,9 @@ if __name__ == "__main__":
parser.add_argument('-n', '--nodes', dest='nodes', type=int, nargs='+', help="nodes to use")
parser.add_argument('-i', '--info', dest='info', action='store_true', help="print point infor from server")
parser.add_argument('-a', '--all', dest='all_r', action='store_true', help="recv all*_R points and show time")
parser.add_argument('--wgxst', dest='wg_xst_mode', type=bool, help="turn on/off wg for xst")
parser.add_argument('--offloadxst', dest='offload_xst', type=bool, help="turn on/off xst offload")
parser.add_argument('--wg', dest='wg_mode', type=str, choices=['OFF', 'XST'], help="turn wg off/on for xst")
parser.add_argument('--setup', dest='offload_setup', type=str, choices=['SST', 'BST', 'XST'], help="setup offload for selected mode")
parser.add_argument('--stream', dest='stream', type=str, choices=['OFF', 'SST', 'BST', 'XST'], help="turn off/on selected offload stream")
parser.add_argument('--verbosity', default='INFO', help="stdout log level can be [ERROR | WARNING | INFO | DEBUG]")
args = parser.parse_args()
@@ -299,9 +358,13 @@ if __name__ == "__main__":
# logger = logging.getLogger("KeepAlive")
# logger.setLevel(logging.DEBUG)
host = 'dop36'
port = 4840
client = Client("opc.tcp://{}:{}/".format(host, port))
# client = Client("opc.tcp://LAPTOP-N0VQ3UDT:4840/")
# client = Client("opc.tcp://192.168.137.102:4840/")
client = Client("opc.tcp://dop36:4840/")
# client = Client("opc.tcp://dop36:4840/")
# client = Client("opc.tcp://169.254.91.66:4840/")
# I hope this is secure, because I have zero clue about security
# client.set_security_string("Basic256Sha256,SignAndEncrypt,certificate-example.der,private-key-example.pem")
@@ -309,15 +372,15 @@ if __name__ == "__main__":
cnt = 1
weights = [-1 for i in range(8000)] # 187392)
# 0x7fff
mask = [True for i in range(16)]
scraps = [i+3 for i in range(2048)]
scraps = [i + 3 for i in range(2048)]
clientRunning = True
while clientRunning:
try:
client.connect()
print("I'm in")
print("Connected to {}:{}".format(host, port))
################
# this section contains some code about navigating around the address space
@@ -346,11 +409,22 @@ if __name__ == "__main__":
for i in range(len(children)):
print("\t\tchild", i, ": ", children[i].get_browse_name())
if args.wg_xst_mode:
write_xst_wg_setup(Object, on=args.wg_xst_mode)
if args.offload_xst:
write_xst_stream_setup(Object, on=args.offload_xst)
if args.wg_mode is not None:
if args.wg_mode == 'OFF':
turn_wg_off(obj)
elif args.wg_mode == 'XST':
setup_wg_xst_mode(Object)
if args.offload_setup is not None:
if args.offload_setup == 'SST':
setup_sst_stream(Object)
elif args.offload_setup == 'BST':
setup_bst_stream(Object)
elif args.offload_setup == 'XST':
setup_xst_stream(Object)
if args.stream is not None:
set_enable_stream(Object, mode=args.stream)
if args.all_r is True:
check_get_all_R_time(Object)
Loading