Skip to content
Snippets Groups Projects
Select Git revision
  • dcd87661f0032333297e07f83142a490f00c276f
  • master default protected
  • sdptr_lift
  • v1.5.0
  • v1.4.0
  • v1.3.0
  • v1.2.0
  • v1.1.2
  • v1.1.1
  • v1.1.0
  • v1.0.0
11 results

Client.py

Blame
  • donker's avatar
    L2SDP-349, processed review comment.
    Pieter Donker authored
    dcd87661
    History
    Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    Client.py 19.16 KiB
    #!/usr/bin/python3
    # ##########################################################################
    # Copyright 2020
    # ASTRON (Netherlands Institute for Radio Astronomy) <http://www.astron.nl/>
    # P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    # http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    # ##########################################################################
    
    # ##########################################################################
    # Author:
    # . Leon Hiemstra, Pieter Donker
    # Purpose:
    # . script to test (control/monitor) opc-ua server
    # Description:
    # Client.py is used to connect (TCP/IP) to an opc-ua server and test (read/write)
    # opc-ua points (registers on a fpga).
    # For LTS the server is running on DOP36 port 4840, this client can not run
    # on DOP36 itself (too old system).
    #
    # In this version the following is working:
    # . --info,   read info from server, list all available points on the server.
    # . --all,    read all *_R point and print needed time.
    # . --wg,     turn off WG or setup for XST (to generate crosslets plot).
    # . --setup,  setup SST, BST and XST stream.
    # . --stream, turn off or on SST, BST or XST stream.
    #
    # . run ./Client.py -h for help
    # ##########################################################################
    
    import sys
    import signal
    import time
    import logging
    import traceback
    import argparse
    from functools import wraps
    sys.path.insert(0, "..")
    
    Temp_only = True
    
    N_NODES = 16
    S_PN    = 12
    N_SUB   = 512
    
    # timing decorator
    # put @timing for the function to time
    def timing(f):
        @wraps(f)
        def wrap(*args, **kw):
            ts = time.time()
            result = f(*args, **kw)
            te = time.time()
            print('func:{} args:[{}, {}] took: {:2.4f} sec'.format(f.__name__, args, kw, te-ts))
            return result
        return wrap
    
    
    class SubHandler(object):
    
        """
        Subscription Handler. To receive events from server for a subscription
        data_change and event methods are called directly from receiving thread.
        Do not do expensive, slow or network operation there. Create another
        thread if you need to do such a thing
        """
    
        def datachange_notification(self, node, val, data):
            print("Python: New data change event", node, val)
    
        def event_notification(self, event):
            print("Python: New event", event)
    
    
    
    try:
        from IPython import embed
    except ImportError:
        import code
    
        def embed():
            vars = globals()
            vars.update(locals())
            shell = code.InteractiveConsole(vars)
            shell.interact()
    
    from opcua import Client
    from opcua import ua
    
    
    def datachange_notification(self, node, val, data):
        print("Python: New data change event", node, val)
    
    
    def event_notification(self, event):
        print("Python: New event", event)
    
    
    def explore(node):
        print("explore")
        children = node.get_children()
        for i in range(len(children)):
    
            print("node ", i, ": ", children[i].get_browse_name())
            try:
                print("has a value of: ", children[i].get_value())
            except:
                print("no variables to read")
    
            explore(children[i])
    
    
    def explore_temp(node):
        children = node.get_children()
        for i in range(len(children)):
            try:
                print("temperature: ", children[i].get_value())
            except:
                print("no variables to read")
            explore(children[i])
    
    
    @timing
    def check_get_all_R_time(obj):
        info = []
        info.append(str(obj.get_child("2:FPGA_bsn_monitor_input_bsn_R").get_value()))
        info.append(str(obj.get_child("2:FPGA_bsn_monitor_input_nof_err_R").get_value()))
        info.append(str(obj.get_child("2:FPGA_bsn_monitor_input_nof_packets_R").get_value()))
        info.append(str(obj.get_child("2:FPGA_bsn_monitor_input_nof_valid_R").get_value()))
        info.append(str(obj.get_child("2:FPGA_firmware_version_R").get_value()))
        info.append(str(obj.get_child("2:FPGA_hardware_version_R").get_value()))
        info.append(str(obj.get_child("2:FPGA_jesd204b_csr_dev_syncn_R").get_value()))
        info.append(str(obj.get_child("2:FPGA_jesd204b_csr_rbd_count_R").get_value()))
        info.append(str(obj.get_child("2:FPGA_jesd204b_rx_err0_R").get_value()))
        info.append(str(obj.get_child("2:FPGA_jesd204b_rx_err1_R").get_value()))
        info.append(str(obj.get_child("2:FPGA_mask_R").get_value()))
        info.append(str(obj.get_child("2:FPGA_processing_enable_R").get_value()))
        info.append(str(obj.get_child("2:FPGA_sdp_info_antenna_band_index_R").get_value()))
        info.append(str(obj.get_child("2:FPGA_sdp_info_block_period_R").get_value()))
        info.append(str(obj.get_child("2:FPGA_sdp_info_f_adc_R").get_value()))
        info.append(str(obj.get_child("2:FPGA_sdp_info_fsub_type_R").get_value()))
        info.append(str(obj.get_child("2:FPGA_sdp_info_nyquist_sampling_zone_index_R").get_value()))
        info.append(str(obj.get_child("2:FPGA_sdp_info_observation_id_R").get_value()))
        info.append(str(obj.get_child("2:FPGA_sdp_info_station_id_R").get_value()))
        info.append(str(obj.get_child("2:FPGA_signal_input_samples_delay_R").get_value()))
        info.append(str(obj.get_child("2:FPGA_sst_offload_enable_R").get_value()))
        info.append(str(obj.get_child("2:FPGA_sst_offload_hdr_eth_destination_mac_R").get_value()))
        info.append(str(obj.get_child("2:FPGA_sst_offload_hdr_ip_destination_address_R").get_value()))
        info.append(str(obj.get_child("2:FPGA_sst_offload_hdr_udp_destination_port_R").get_value()))
        info.append(str(obj.get_child("2:FPGA_sst_offload_weighted_subbands_R").get_value()))
        info.append(str(obj.get_child("2:FPGA_bst_offload_enable_R").get_value()))
        info.append(str(obj.get_child("2:FPGA_bst_offload_hdr_eth_destination_mac_R").get_value()))
        info.append(str(obj.get_child("2:FPGA_bst_offload_hdr_ip_destination_address_R").get_value()))
        info.append(str(obj.get_child("2:FPGA_bst_offload_hdr_udp_destination_port_R").get_value()))
        #info.append(str(obj.get_child("2:FPGA_bst_offload_nof_beamlets_per_packet_R").get_value()))
        info.append(str(obj.get_child("2:FPGA_xst_subband_select_R").get_value()))
        info.append(str(obj.get_child("2:FPGA_xst_integration_interval_R").get_value()))
        info.append(str(obj.get_child("2:FPGA_xst_offload_enable_R").get_value()))
        info.append(str(obj.get_child("2:FPGA_xst_processing_enable_R").get_value()))
        info.append(str(obj.get_child("2:FPGA_xst_offload_hdr_eth_destination_mac_R").get_value()))
        info.append(str(obj.get_child("2:FPGA_xst_offload_hdr_ip_destination_address_R").get_value()))
        info.append(str(obj.get_child("2:FPGA_xst_offload_hdr_udp_destination_port_R").get_value()))
        info.append(str(obj.get_child("2:FPGA_status_R").get_value()))
        info.append(str(obj.get_child("2:FPGA_subband_weights_R").get_value()))
        info.append(str(obj.get_child("2:FPGA_temp_R").get_value()))
        info.append(str(obj.get_child("2:FPGA_weights_R").get_value()))
        info.append(str(obj.get_child("2:FPGA_wg_amplitude_R").get_value()))
        info.append(str(obj.get_child("2:FPGA_wg_enable_R").get_value()))
        info.append(str(obj.get_child("2:FPGA_wg_frequency_R").get_value()))
        info.append(str(obj.get_child("2:FPGA_wg_phase_R").get_value()))
        info.append(str(obj.get_child("2:TR_busy_R").get_value()))
        info.append(str(obj.get_child("2:TR_software_version_R").get_value()))
        info.append(str(obj.get_child("2:TR_tod_R").get_value()))
        info.append(str(obj.get_child("2:TR_uptime_R").get_value()))
        print(f"checked {len(info)} FPGA_*_R points")
        # print('\n'.join(info))
    
    
    def write_fpga_mask(obj, nodes=None, mask=None):
        enable_mask = [False] * N_NODES
        if mask is not None:
            enable_mask = mask
        elif nodes is not None:
            for node in list(nodes):
                enable_mask[node] = True
        #print(enable_mask)
        var = obj.get_child("2:FPGA_mask_RW")
        var.set_value(ua.Variant(value=list(enable_mask), varianttype=ua.VariantType.Boolean))
    
    
    def get_fpga_mask(obj):
        var = obj.get_child("2:FPGA_mask_R")
        enable_mask = var.get_value()
        #print(enable_mask)
        return enable_mask
    
    
    @timing
    def read_subband_weights(obj):
        var = obj.get_child("2:FPGA_subband_weights_R")
        vals = var.get_value()
    
    
    def setup_wg_xst_mode(obj):
        '''
        setup wg for xst stream
        '''
        # Write WG configuration with phases from 0 - 360 and 1 to 1/12 amplitudes
        wg_ampl = [0.01 * (1 - (i / S_PN)) for i in range(S_PN)] * N_NODES
        var = obj.get_child("2:FPGA_wg_amplitude_RW")
        var.set_value(ua.Variant(value=list(wg_ampl), varianttype=ua.VariantType.Double))
    
        wg_phase = [i * (360 / S_PN) for i in range(S_PN)] * N_NODES
        var = obj.get_child("2:FPGA_wg_phase_RW")
        var.set_value(ua.Variant(value=list(wg_phase), varianttype=ua.VariantType.Double))
    
        wg_freq = [(102 / 1024) * 200e6 for i in range(S_PN)] * N_NODES
        var = obj.get_child("2:FPGA_wg_frequency_RW")
        var.set_value(ua.Variant(value=list(wg_freq), varianttype=ua.VariantType.Double))
    
        print("wg on")
        enable = [True for i in range(S_PN)] * N_NODES  # enable wg
        var = obj.get_child("2:FPGA_wg_enable_RW")
        var.set_value(ua.Variant(value=list(enable), varianttype=ua.VariantType.Boolean))
    
    
    def turn_wg_off(obj):
        print("wg off")
        enable = [False for i in range(S_PN)] * N_NODES  # enable wg
        var = obj.get_child("2:FPGA_wg_enable_RW")
        var.set_value(ua.Variant(value=list(enable), varianttype=ua.VariantType.Boolean))
    
    
    def setup_sst_stream(obj):
        '''
        setup bst stream, use git/upe_gear/peripherals/pi_satistics_stream.py for recording and plotting.
        - pi_statistic_stream.py --unb2 2 --pn2 0 --cmd 4 -s=BST
        '''
        port = [5001] * N_NODES  # use port 5002
        var = obj.get_child("2:FPGA_sst_offload_hdr_udp_destination_port_RW")
        var.set_value(ua.Variant(value=list(port), varianttype=ua.VariantType.UInt16))
    
        dest_mac = ["00:1B:21:71:76:B9"] * N_NODES  # use mac of dop36
        var = obj.get_child("2:FPGA_sst_offload_hdr_eth_destination_mac_RW")
        var.set_value(ua.Variant(value=list(dest_mac), varianttype=ua.VariantType.String))
    
        dest_addr = ["10.99.0.254"] * N_NODES  # use addr of dop36
        var = obj.get_child("2:FPGA_sst_offload_hdr_ip_destination_address_RW")
        var.set_value(ua.Variant(value=list(dest_addr), varianttype=ua.VariantType.String))
    
        weights = [8192] * N_SUB * N_NODES * S_PN  # set weights for subbands to default
        var = obj.get_child("2:FPGA_subband_weights_RW")
        var.set_value(ua.Variant(value=list(weights), varianttype=ua.VariantType.UInt32))
    
        enable = [True] * N_NODES  # enable weighted subbands
        var = obj.get_child("2:FPGA_sst_offload_weighted_subbands_RW")
        var.set_value(ua.Variant(value=list(enable), varianttype=ua.VariantType.Boolean))
    
    
    def setup_bst_stream(obj):
        '''
        setup bst stream, use git/upe_gear/peripherals/pi_satistics_stream.py for recording and plotting.
        - pi_statistic_stream.py --unb2 2 --pn2 0 --cmd 4 -s=BST
        '''
        port = [5002] * N_NODES  # use port 5002
        var = obj.get_child("2:FPGA_bst_offload_hdr_udp_destination_port_RW")
        var.set_value(ua.Variant(value=list(port), varianttype=ua.VariantType.UInt16))
    
        dest_mac = ["00:1B:21:71:76:B9"] * N_NODES  # use mac of dop36
        var = obj.get_child("2:FPGA_bst_offload_hdr_eth_destination_mac_RW")
        var.set_value(ua.Variant(value=list(dest_mac), varianttype=ua.VariantType.String))
    
        dest_addr = ["10.99.0.254"] * N_NODES  # use addr of dop36
        var = obj.get_child("2:FPGA_bst_offload_hdr_ip_destination_address_RW")
        var.set_value(ua.Variant(value=list(dest_addr), varianttype=ua.VariantType.String))
    
    
    def setup_xst_stream(obj):
        '''
        setup xst stream, use git/upe_gear/peripherals/pi_satistics_stream.py for recording and plotting.
        - pi_statistic_stream.py --unb2 2 --pn2 0 --cmd 4 -s=XST
        '''
        port = [5003] * N_NODES  # use port 5001
        var = obj.get_child("2:FPGA_xst_offload_hdr_udp_destination_port_RW")
        var.set_value(ua.Variant(value=list(port), varianttype=ua.VariantType.UInt16))
    
        dest_mac = ["00:1B:21:71:76:B9"] * N_NODES  # use mac of dop36
        var = obj.get_child("2:FPGA_xst_offload_hdr_eth_destination_mac_RW")
        var.set_value(ua.Variant(value=list(dest_mac), varianttype=ua.VariantType.String))
    
        dest_addr = ["10.99.0.254"] * N_NODES  # use addr of dop36
        var = obj.get_child("2:FPGA_xst_offload_hdr_ip_destination_address_RW")
        var.set_value(ua.Variant(value=list(dest_addr), varianttype=ua.VariantType.String))
    
        subsel = [0, 102, 0, 0, 0, 0, 0, 0] * N_NODES  # use subband 102 only
        var = obj.get_child("2:FPGA_xst_subband_select_RW")
        var.set_value(ua.Variant(value=list(subsel), varianttype=ua.VariantType.UInt32))
    
        interval = [1.0] * N_NODES  # use fixed interval 1.0 second
        var = obj.get_child("2:FPGA_xst_integration_interval_RW")
        var.set_value(ua.Variant(value=list(interval), varianttype=ua.VariantType.Double))
    
        processing = [True] * N_NODES  # enable processing
        var = obj.get_child("2:FPGA_xst_processing_enable_RW")
        var.set_value(ua.Variant(value=list(processing), varianttype=ua.VariantType.Boolean))
    
    
    def set_enable_stream(obj, mode):
        if mode == 'OFF':
            enable = [False] * N_NODES  # disable offload
            var = obj.get_child("2:FPGA_sst_offload_enable_RW")
            var.set_value(ua.Variant(value=list(enable), varianttype=ua.VariantType.Boolean))
            var = obj.get_child("2:FPGA_bst_offload_enable_RW")
            var.set_value(ua.Variant(value=list(enable), varianttype=ua.VariantType.Boolean))
            var = obj.get_child("2:FPGA_xst_offload_enable_RW")
            var.set_value(ua.Variant(value=list(enable), varianttype=ua.VariantType.Boolean))
            return
    
        enable = [True] * N_NODES  # enable offload
        if mode == 'SST':
            var = obj.get_child("2:FPGA_sst_offload_enable_RW")
            var.set_value(ua.Variant(value=list(enable), varianttype=ua.VariantType.Boolean))
        elif mode == 'BST':
            var = obj.get_child("2:FPGA_bst_offload_enable_RW")
            var.set_value(ua.Variant(value=list(enable), varianttype=ua.VariantType.Boolean))
        elif mode == 'XST':
            var = obj.get_child("2:FPGA_xst_offload_enable_RW")
            var.set_value(ua.Variant(value=list(enable), varianttype=ua.VariantType.Boolean))
        else:
            print('Wrong mode "{}"'.format(mode))
        return
    
    if __name__ == "__main__":
        # Parse command line arguments
        parser = argparse.ArgumentParser(description="opcua client command line argument parser")
        parser.add_argument('-n', '--nodes', dest='nodes', type=int, nargs='+', help="nodes to use")
        parser.add_argument('-i', '--info', dest='info', action='store_true', help="print point infor from server")
        parser.add_argument('-a', '--all', dest='all_r', action='store_true', help="recv all*_R points and show time")
        parser.add_argument('--wg', dest='wg_mode', type=str, choices=['OFF', 'XST'], help="turn wg off/on for xst")
        parser.add_argument('--setup', dest='offload_setup', type=str, choices=['SST', 'BST', 'XST'], help="setup offload for selected mode")
        parser.add_argument('--stream', dest='stream', type=str, choices=['OFF', 'SST', 'BST', 'XST'], help="turn off/on selected offload stream")
        parser.add_argument('--verbosity', default='INFO', help="stdout log level can be [ERROR | WARNING | INFO | DEBUG]")
        args = parser.parse_args()
    
        print(args)
    
        logging.basicConfig(level=logging.ERROR)
        # logger = logging.getLogger("KeepAlive")
        # logger.setLevel(logging.DEBUG)
    
        host = 'dop36'
        port = 4840
        client = Client("opc.tcp://{}:{}/".format(host, port))
    
        # client = Client("opc.tcp://LAPTOP-N0VQ3UDT:4840/")
        # client = Client("opc.tcp://192.168.137.102:4840/")
        # client = Client("opc.tcp://dop36:4840/")
        # client = Client("opc.tcp://169.254.91.66:4840/")
        # I hope this is secure, because I have zero clue about security
        # client.set_security_string("Basic256Sha256,SignAndEncrypt,certificate-example.der,private-key-example.pem")
    
        cnt = 1
        weights = [-1 for i in range(8000)]  # 187392)
        # 0x7fff
    
        mask = [True for i in range(16)]
        scraps = [i + 3 for i in range(2048)]
    
        clientRunning = True
        while clientRunning:
            try:
                client.connect()
                print("Connected to {}:{}".format(host, port))
    
                ################
                # this section  contains some code about navigating around the address space
                ####################
                # Client has a few methods to get proxy to UA nodes that should always be in address space such as Root or Objects
    
                while clientRunning:
                    print("\n=============================")
    
                    Object = client.get_objects_node()
                    fpga_mask = get_fpga_mask(Object)  # get active fpga_mask
                    write_fpga_mask(Object, nodes=args.nodes)
    
                    if args.info:
                        root = client.get_root_node()
                        print("Root node is: ", root)
                        print("name of root is: ", root.get_display_name())
    
                        # get the objects nodes NodeID
                        print("\tObject node: ", Object)
                        print("\tname of object node is: ", Object.get_browse_name())
    
                        # get all the children that "Object" has
                        children = Object.get_children()
                        print("\tchildren of Object: ", children)
                        for i in range(len(children)):
                            print("\t\tchild", i, ": ", children[i].get_browse_name())
    
                    if args.wg_mode is not None:
                        if args.wg_mode == 'OFF':
                            turn_wg_off(obj)
                        elif args.wg_mode == 'XST':
                            setup_wg_xst_mode(Object)
    
                    if args.offload_setup is not None:
                        if args.offload_setup == 'SST':
                            setup_sst_stream(Object)
                        elif args.offload_setup == 'BST':
                            setup_bst_stream(Object)
                        elif args.offload_setup == 'XST':
                            setup_xst_stream(Object)
    
                    if args.stream is not None:
                        set_enable_stream(Object, mode=args.stream)
    
                    if args.all_r is True:
                        check_get_all_R_time(Object)
    
    
                    #write_xst_subband_select(Object)
    
                    #write_subband_weights(Object)
    
                    #read_subband_weights(Object)
    
    
                    write_fpga_mask(Object, mask=fpga_mask)  # write back start mask
    
                    clientRunning = False
                    break;
                    time.sleep(1)
    
    
                # sub.unsubscribe(handle)
                # sub.delete()
            except KeyboardInterrupt:
                print(" user hit ctrl-c")
                clientRunning = False
            except:
                print('Caught %s', str(sys.exc_info()[0]))
                print(str(sys.exc_info()[1]))
                print('TRACEBACK:\n%s', traceback.format_exc())
                print('Aborting NOW')
    
                clientRunning = False
                time.sleep(1)
            finally:
                print("\ndisconnect from server")
                client.disconnect()