Select Git revision

L2SDP-349, processed review comment.
Pieter Donker authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
Client.py 19.16 KiB
#!/usr/bin/python3
# ##########################################################################
# Copyright 2020
# ASTRON (Netherlands Institute for Radio Astronomy) <http://www.astron.nl/>
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ##########################################################################
# ##########################################################################
# Author:
# . Leon Hiemstra, Pieter Donker
# Purpose:
# . script to test (control/monitor) opc-ua server
# Description:
# Client.py is used to connect (TCP/IP) to an opc-ua server and test (read/write)
# opc-ua points (registers on a fpga).
# For LTS the server is running on DOP36 port 4840, this client can not run
# on DOP36 itself (too old system).
#
# In this version the following is working:
# . --info, read info from server, list all available points on the server.
# . --all, read all *_R point and print needed time.
# . --wg, turn off WG or setup for XST (to generate crosslets plot).
# . --setup, setup SST, BST and XST stream.
# . --stream, turn off or on SST, BST or XST stream.
#
# . run ./Client.py -h for help
# ##########################################################################
import sys
import signal
import time
import logging
import traceback
import argparse
from functools import wraps
sys.path.insert(0, "..")
Temp_only = True
N_NODES = 16
S_PN = 12
N_SUB = 512
# timing decorator
# put @timing for the function to time
def timing(f):
@wraps(f)
def wrap(*args, **kw):
ts = time.time()
result = f(*args, **kw)
te = time.time()
print('func:{} args:[{}, {}] took: {:2.4f} sec'.format(f.__name__, args, kw, te-ts))
return result
return wrap
class SubHandler(object):
"""
Subscription Handler. To receive events from server for a subscription
data_change and event methods are called directly from receiving thread.
Do not do expensive, slow or network operation there. Create another
thread if you need to do such a thing
"""
def datachange_notification(self, node, val, data):
print("Python: New data change event", node, val)
def event_notification(self, event):
print("Python: New event", event)
try:
from IPython import embed
except ImportError:
import code
def embed():
vars = globals()
vars.update(locals())
shell = code.InteractiveConsole(vars)
shell.interact()
from opcua import Client
from opcua import ua
def datachange_notification(self, node, val, data):
print("Python: New data change event", node, val)
def event_notification(self, event):
print("Python: New event", event)
def explore(node):
print("explore")
children = node.get_children()
for i in range(len(children)):
print("node ", i, ": ", children[i].get_browse_name())
try:
print("has a value of: ", children[i].get_value())
except:
print("no variables to read")
explore(children[i])
def explore_temp(node):
children = node.get_children()
for i in range(len(children)):
try:
print("temperature: ", children[i].get_value())
except:
print("no variables to read")
explore(children[i])
@timing
def check_get_all_R_time(obj):
info = []
info.append(str(obj.get_child("2:FPGA_bsn_monitor_input_bsn_R").get_value()))
info.append(str(obj.get_child("2:FPGA_bsn_monitor_input_nof_err_R").get_value()))
info.append(str(obj.get_child("2:FPGA_bsn_monitor_input_nof_packets_R").get_value()))
info.append(str(obj.get_child("2:FPGA_bsn_monitor_input_nof_valid_R").get_value()))
info.append(str(obj.get_child("2:FPGA_firmware_version_R").get_value()))
info.append(str(obj.get_child("2:FPGA_hardware_version_R").get_value()))
info.append(str(obj.get_child("2:FPGA_jesd204b_csr_dev_syncn_R").get_value()))
info.append(str(obj.get_child("2:FPGA_jesd204b_csr_rbd_count_R").get_value()))
info.append(str(obj.get_child("2:FPGA_jesd204b_rx_err0_R").get_value()))
info.append(str(obj.get_child("2:FPGA_jesd204b_rx_err1_R").get_value()))
info.append(str(obj.get_child("2:FPGA_mask_R").get_value()))
info.append(str(obj.get_child("2:FPGA_processing_enable_R").get_value()))
info.append(str(obj.get_child("2:FPGA_sdp_info_antenna_band_index_R").get_value()))
info.append(str(obj.get_child("2:FPGA_sdp_info_block_period_R").get_value()))
info.append(str(obj.get_child("2:FPGA_sdp_info_f_adc_R").get_value()))
info.append(str(obj.get_child("2:FPGA_sdp_info_fsub_type_R").get_value()))
info.append(str(obj.get_child("2:FPGA_sdp_info_nyquist_sampling_zone_index_R").get_value()))
info.append(str(obj.get_child("2:FPGA_sdp_info_observation_id_R").get_value()))
info.append(str(obj.get_child("2:FPGA_sdp_info_station_id_R").get_value()))
info.append(str(obj.get_child("2:FPGA_signal_input_samples_delay_R").get_value()))
info.append(str(obj.get_child("2:FPGA_sst_offload_enable_R").get_value()))
info.append(str(obj.get_child("2:FPGA_sst_offload_hdr_eth_destination_mac_R").get_value()))
info.append(str(obj.get_child("2:FPGA_sst_offload_hdr_ip_destination_address_R").get_value()))
info.append(str(obj.get_child("2:FPGA_sst_offload_hdr_udp_destination_port_R").get_value()))
info.append(str(obj.get_child("2:FPGA_sst_offload_weighted_subbands_R").get_value()))
info.append(str(obj.get_child("2:FPGA_bst_offload_enable_R").get_value()))
info.append(str(obj.get_child("2:FPGA_bst_offload_hdr_eth_destination_mac_R").get_value()))
info.append(str(obj.get_child("2:FPGA_bst_offload_hdr_ip_destination_address_R").get_value()))
info.append(str(obj.get_child("2:FPGA_bst_offload_hdr_udp_destination_port_R").get_value()))
#info.append(str(obj.get_child("2:FPGA_bst_offload_nof_beamlets_per_packet_R").get_value()))
info.append(str(obj.get_child("2:FPGA_xst_subband_select_R").get_value()))
info.append(str(obj.get_child("2:FPGA_xst_integration_interval_R").get_value()))
info.append(str(obj.get_child("2:FPGA_xst_offload_enable_R").get_value()))
info.append(str(obj.get_child("2:FPGA_xst_processing_enable_R").get_value()))
info.append(str(obj.get_child("2:FPGA_xst_offload_hdr_eth_destination_mac_R").get_value()))
info.append(str(obj.get_child("2:FPGA_xst_offload_hdr_ip_destination_address_R").get_value()))
info.append(str(obj.get_child("2:FPGA_xst_offload_hdr_udp_destination_port_R").get_value()))
info.append(str(obj.get_child("2:FPGA_status_R").get_value()))
info.append(str(obj.get_child("2:FPGA_subband_weights_R").get_value()))
info.append(str(obj.get_child("2:FPGA_temp_R").get_value()))
info.append(str(obj.get_child("2:FPGA_weights_R").get_value()))
info.append(str(obj.get_child("2:FPGA_wg_amplitude_R").get_value()))
info.append(str(obj.get_child("2:FPGA_wg_enable_R").get_value()))
info.append(str(obj.get_child("2:FPGA_wg_frequency_R").get_value()))
info.append(str(obj.get_child("2:FPGA_wg_phase_R").get_value()))
info.append(str(obj.get_child("2:TR_busy_R").get_value()))
info.append(str(obj.get_child("2:TR_software_version_R").get_value()))
info.append(str(obj.get_child("2:TR_tod_R").get_value()))
info.append(str(obj.get_child("2:TR_uptime_R").get_value()))
print(f"checked {len(info)} FPGA_*_R points")
# print('\n'.join(info))
def write_fpga_mask(obj, nodes=None, mask=None):
enable_mask = [False] * N_NODES
if mask is not None:
enable_mask = mask
elif nodes is not None:
for node in list(nodes):
enable_mask[node] = True
#print(enable_mask)
var = obj.get_child("2:FPGA_mask_RW")
var.set_value(ua.Variant(value=list(enable_mask), varianttype=ua.VariantType.Boolean))
def get_fpga_mask(obj):
var = obj.get_child("2:FPGA_mask_R")
enable_mask = var.get_value()
#print(enable_mask)
return enable_mask
@timing
def read_subband_weights(obj):
var = obj.get_child("2:FPGA_subband_weights_R")
vals = var.get_value()
def setup_wg_xst_mode(obj):
'''
setup wg for xst stream
'''
# Write WG configuration with phases from 0 - 360 and 1 to 1/12 amplitudes
wg_ampl = [0.01 * (1 - (i / S_PN)) for i in range(S_PN)] * N_NODES
var = obj.get_child("2:FPGA_wg_amplitude_RW")
var.set_value(ua.Variant(value=list(wg_ampl), varianttype=ua.VariantType.Double))
wg_phase = [i * (360 / S_PN) for i in range(S_PN)] * N_NODES
var = obj.get_child("2:FPGA_wg_phase_RW")
var.set_value(ua.Variant(value=list(wg_phase), varianttype=ua.VariantType.Double))
wg_freq = [(102 / 1024) * 200e6 for i in range(S_PN)] * N_NODES
var = obj.get_child("2:FPGA_wg_frequency_RW")
var.set_value(ua.Variant(value=list(wg_freq), varianttype=ua.VariantType.Double))
print("wg on")
enable = [True for i in range(S_PN)] * N_NODES # enable wg
var = obj.get_child("2:FPGA_wg_enable_RW")
var.set_value(ua.Variant(value=list(enable), varianttype=ua.VariantType.Boolean))
def turn_wg_off(obj):
print("wg off")
enable = [False for i in range(S_PN)] * N_NODES # enable wg
var = obj.get_child("2:FPGA_wg_enable_RW")
var.set_value(ua.Variant(value=list(enable), varianttype=ua.VariantType.Boolean))
def setup_sst_stream(obj):
'''
setup bst stream, use git/upe_gear/peripherals/pi_satistics_stream.py for recording and plotting.
- pi_statistic_stream.py --unb2 2 --pn2 0 --cmd 4 -s=BST
'''
port = [5001] * N_NODES # use port 5002
var = obj.get_child("2:FPGA_sst_offload_hdr_udp_destination_port_RW")
var.set_value(ua.Variant(value=list(port), varianttype=ua.VariantType.UInt16))
dest_mac = ["00:1B:21:71:76:B9"] * N_NODES # use mac of dop36
var = obj.get_child("2:FPGA_sst_offload_hdr_eth_destination_mac_RW")
var.set_value(ua.Variant(value=list(dest_mac), varianttype=ua.VariantType.String))
dest_addr = ["10.99.0.254"] * N_NODES # use addr of dop36
var = obj.get_child("2:FPGA_sst_offload_hdr_ip_destination_address_RW")
var.set_value(ua.Variant(value=list(dest_addr), varianttype=ua.VariantType.String))
weights = [8192] * N_SUB * N_NODES * S_PN # set weights for subbands to default
var = obj.get_child("2:FPGA_subband_weights_RW")
var.set_value(ua.Variant(value=list(weights), varianttype=ua.VariantType.UInt32))
enable = [True] * N_NODES # enable weighted subbands
var = obj.get_child("2:FPGA_sst_offload_weighted_subbands_RW")
var.set_value(ua.Variant(value=list(enable), varianttype=ua.VariantType.Boolean))
def setup_bst_stream(obj):
'''
setup bst stream, use git/upe_gear/peripherals/pi_satistics_stream.py for recording and plotting.
- pi_statistic_stream.py --unb2 2 --pn2 0 --cmd 4 -s=BST
'''
port = [5002] * N_NODES # use port 5002
var = obj.get_child("2:FPGA_bst_offload_hdr_udp_destination_port_RW")
var.set_value(ua.Variant(value=list(port), varianttype=ua.VariantType.UInt16))
dest_mac = ["00:1B:21:71:76:B9"] * N_NODES # use mac of dop36
var = obj.get_child("2:FPGA_bst_offload_hdr_eth_destination_mac_RW")
var.set_value(ua.Variant(value=list(dest_mac), varianttype=ua.VariantType.String))
dest_addr = ["10.99.0.254"] * N_NODES # use addr of dop36
var = obj.get_child("2:FPGA_bst_offload_hdr_ip_destination_address_RW")
var.set_value(ua.Variant(value=list(dest_addr), varianttype=ua.VariantType.String))
def setup_xst_stream(obj):
'''
setup xst stream, use git/upe_gear/peripherals/pi_satistics_stream.py for recording and plotting.
- pi_statistic_stream.py --unb2 2 --pn2 0 --cmd 4 -s=XST
'''
port = [5003] * N_NODES # use port 5001
var = obj.get_child("2:FPGA_xst_offload_hdr_udp_destination_port_RW")
var.set_value(ua.Variant(value=list(port), varianttype=ua.VariantType.UInt16))
dest_mac = ["00:1B:21:71:76:B9"] * N_NODES # use mac of dop36
var = obj.get_child("2:FPGA_xst_offload_hdr_eth_destination_mac_RW")
var.set_value(ua.Variant(value=list(dest_mac), varianttype=ua.VariantType.String))
dest_addr = ["10.99.0.254"] * N_NODES # use addr of dop36
var = obj.get_child("2:FPGA_xst_offload_hdr_ip_destination_address_RW")
var.set_value(ua.Variant(value=list(dest_addr), varianttype=ua.VariantType.String))
subsel = [0, 102, 0, 0, 0, 0, 0, 0] * N_NODES # use subband 102 only
var = obj.get_child("2:FPGA_xst_subband_select_RW")
var.set_value(ua.Variant(value=list(subsel), varianttype=ua.VariantType.UInt32))
interval = [1.0] * N_NODES # use fixed interval 1.0 second
var = obj.get_child("2:FPGA_xst_integration_interval_RW")
var.set_value(ua.Variant(value=list(interval), varianttype=ua.VariantType.Double))
processing = [True] * N_NODES # enable processing
var = obj.get_child("2:FPGA_xst_processing_enable_RW")
var.set_value(ua.Variant(value=list(processing), varianttype=ua.VariantType.Boolean))
def set_enable_stream(obj, mode):
if mode == 'OFF':
enable = [False] * N_NODES # disable offload
var = obj.get_child("2:FPGA_sst_offload_enable_RW")
var.set_value(ua.Variant(value=list(enable), varianttype=ua.VariantType.Boolean))
var = obj.get_child("2:FPGA_bst_offload_enable_RW")
var.set_value(ua.Variant(value=list(enable), varianttype=ua.VariantType.Boolean))
var = obj.get_child("2:FPGA_xst_offload_enable_RW")
var.set_value(ua.Variant(value=list(enable), varianttype=ua.VariantType.Boolean))
return
enable = [True] * N_NODES # enable offload
if mode == 'SST':
var = obj.get_child("2:FPGA_sst_offload_enable_RW")
var.set_value(ua.Variant(value=list(enable), varianttype=ua.VariantType.Boolean))
elif mode == 'BST':
var = obj.get_child("2:FPGA_bst_offload_enable_RW")
var.set_value(ua.Variant(value=list(enable), varianttype=ua.VariantType.Boolean))
elif mode == 'XST':
var = obj.get_child("2:FPGA_xst_offload_enable_RW")
var.set_value(ua.Variant(value=list(enable), varianttype=ua.VariantType.Boolean))
else:
print('Wrong mode "{}"'.format(mode))
return
if __name__ == "__main__":
# Parse command line arguments
parser = argparse.ArgumentParser(description="opcua client command line argument parser")
parser.add_argument('-n', '--nodes', dest='nodes', type=int, nargs='+', help="nodes to use")
parser.add_argument('-i', '--info', dest='info', action='store_true', help="print point infor from server")
parser.add_argument('-a', '--all', dest='all_r', action='store_true', help="recv all*_R points and show time")
parser.add_argument('--wg', dest='wg_mode', type=str, choices=['OFF', 'XST'], help="turn wg off/on for xst")
parser.add_argument('--setup', dest='offload_setup', type=str, choices=['SST', 'BST', 'XST'], help="setup offload for selected mode")
parser.add_argument('--stream', dest='stream', type=str, choices=['OFF', 'SST', 'BST', 'XST'], help="turn off/on selected offload stream")
parser.add_argument('--verbosity', default='INFO', help="stdout log level can be [ERROR | WARNING | INFO | DEBUG]")
args = parser.parse_args()
print(args)
logging.basicConfig(level=logging.ERROR)
# logger = logging.getLogger("KeepAlive")
# logger.setLevel(logging.DEBUG)
host = 'dop36'
port = 4840
client = Client("opc.tcp://{}:{}/".format(host, port))
# client = Client("opc.tcp://LAPTOP-N0VQ3UDT:4840/")
# client = Client("opc.tcp://192.168.137.102:4840/")
# client = Client("opc.tcp://dop36:4840/")
# client = Client("opc.tcp://169.254.91.66:4840/")
# I hope this is secure, because I have zero clue about security
# client.set_security_string("Basic256Sha256,SignAndEncrypt,certificate-example.der,private-key-example.pem")
cnt = 1
weights = [-1 for i in range(8000)] # 187392)
# 0x7fff
mask = [True for i in range(16)]
scraps = [i + 3 for i in range(2048)]
clientRunning = True
while clientRunning:
try:
client.connect()
print("Connected to {}:{}".format(host, port))
################
# this section contains some code about navigating around the address space
####################
# Client has a few methods to get proxy to UA nodes that should always be in address space such as Root or Objects
while clientRunning:
print("\n=============================")
Object = client.get_objects_node()
fpga_mask = get_fpga_mask(Object) # get active fpga_mask
write_fpga_mask(Object, nodes=args.nodes)
if args.info:
root = client.get_root_node()
print("Root node is: ", root)
print("name of root is: ", root.get_display_name())
# get the objects nodes NodeID
print("\tObject node: ", Object)
print("\tname of object node is: ", Object.get_browse_name())
# get all the children that "Object" has
children = Object.get_children()
print("\tchildren of Object: ", children)
for i in range(len(children)):
print("\t\tchild", i, ": ", children[i].get_browse_name())
if args.wg_mode is not None:
if args.wg_mode == 'OFF':
turn_wg_off(obj)
elif args.wg_mode == 'XST':
setup_wg_xst_mode(Object)
if args.offload_setup is not None:
if args.offload_setup == 'SST':
setup_sst_stream(Object)
elif args.offload_setup == 'BST':
setup_bst_stream(Object)
elif args.offload_setup == 'XST':
setup_xst_stream(Object)
if args.stream is not None:
set_enable_stream(Object, mode=args.stream)
if args.all_r is True:
check_get_all_R_time(Object)
#write_xst_subband_select(Object)
#write_subband_weights(Object)
#read_subband_weights(Object)
write_fpga_mask(Object, mask=fpga_mask) # write back start mask
clientRunning = False
break;
time.sleep(1)
# sub.unsubscribe(handle)
# sub.delete()
except KeyboardInterrupt:
print(" user hit ctrl-c")
clientRunning = False
except:
print('Caught %s', str(sys.exc_info()[0]))
print(str(sys.exc_info()[1]))
print('TRACEBACK:\n%s', traceback.format_exc())
print('Aborting NOW')
clientRunning = False
time.sleep(1)
finally:
print("\ndisconnect from server")
client.disconnect()