Skip to content
Snippets Groups Projects
Select Git revision
  • 3ad2dfcf981564ca3e700e7046e3c4e8841eddf4
  • master default protected
  • gec-95-add-filter-skymodel
  • gec-95-migrate-filter-skymodel-utils
  • add-coverage-badge
  • releases/1.6
  • releases/1.5
  • rap-457-implement-ci-v2
  • test_heroku
  • action-python-publish
  • v1.7.1
  • v1.7.0
  • v1.6.2
  • v1.6.1
  • v1.6.post1
  • v1.6
  • v1.5.1
  • v1.5.0
  • v1.4.11
  • v1.4.10
  • v1.4.9
  • v1.4.8
  • v1.4.7
  • v1.4.6
  • v1.4.5
  • v1.4.4
  • v1.4.3
  • v1.4.2
  • v1.4.1
  • v1.4.0
30 results

_kdtree.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    wg.py 6.98 KiB
    #! /usr/bin/env python3
    
    # ##########################################################################
    # Copyright 2020
    # ASTRON (Netherlands Institute for Radio Astronomy) <http://www.astron.nl/>
    # P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    # http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    # ##########################################################################
    
    # ##########################################################################
    # Author:
    # . Pieter Donker
    # Purpose:
    # . set Waveform generator
    # Description:
    # . run ./wg.py -h for help
    # ##########################################################################
    
    
    import sys
    sys.path.insert(0, "..")
    
    import logging
    import traceback
    import argparse
    import time
    import pprint
    import math
    
    from base import *
    
    
    class WaveformGenerator(object):
        def __init__(self, client):
            self.client = client
            self.S_PN = 12
            self.n_nodes = self.client.read('sdp_config_nof_fpgas')  # get number of nodes from server
            self._ampl = None
            self._phase = None
            self._freq = None
    
        def set_ampl(self, value, mode=None):
            ampl = value
            mode = 'sinus' if mode is None else mode.lower()
            if mode == 'xst':
                self.client.write('wg_amplitude', ([ampl * (1 - (i / self.S_PN)) for i in range(self.S_PN)] * self.n_nodes))
            else:
                # must be for sinus
                self.client.write('wg_amplitude', ([ampl] * self.S_PN * self.n_nodes))
            return
    
        def get_ampl(self):
            self._ampl = self.client.read('wg_amplitude')
            return self._ampl
    
        def set_phase(self, value, mode=None):
            phase = value / (360 / (2 * math.pi))
            mode = 'sinus' if mode is None else mode.lower()
            if mode == 'xst':
                self.client.write('wg_phase', ([i * ((2 * math.pi) / self.S_PN) for i in range(self.S_PN)] * self.n_nodes))
            else:
                # must be for sinus
                self.client.write('wg_phase', ([phase] * self.S_PN * self.n_nodes))
            return
    
        def get_phase(self):
            self._phase = self.client.read('wg_phase')
            return [i * (360 / (2 * math.pi)) for i in self._phase]
    
        def set_freq(self, value, mode=None):
            freq = value
            mode = 'sinus' if mode is None else mode.lower()
            if mode == 'xst':
                self.client.write('wg_frequency' ([(102 / 1024) * freq for i in range(self.S_PN)] * self.n_nodes))
            else:
                # must be for sinus
                self.client.write('wg_frequency', ([freq] * self.S_PN * self.n_nodes))
            return
    
        def get_freq(self):
            self._freq = self.client.read('wg_frequency')
            return self._freq
    
        def set_sinus(self, ampl, phase, freq):
            self.set_ampl(ampl, 'sinus')
            self.set_phase(phase, 'sinus')
            self.set_freq(freq, 'sinus')
    
        def set_xst_mode(self, ampl, phase, freq):
            # Write WG configuration with phases from 0 - 360 and 1 to 1/12 amplitudes
            self.set_ampl(ampl, 'xst')
            self.set_phase(phase, 'xst')
            self.set_freq(freq, 'xst')
    
        def enable(self):
            if self.client.write('wg_enable', ([True] * self.S_PN * self.n_nodes)):
                return True
            return False
    
        def disable(self):
            if self.client.write('wg_enable', ([False] * self.S_PN * self.n_nodes)):
                return True
            return False
    
    def main():
        client = OpcuaClient(args.host, args.port)
        client.connect()
    
        client.set_mask(node_list)
        logger.info("fpga mask={}".format(client.get_mask()))  # read back nodes set.
    
        wg = WaveformGenerator(client)
        
        while clientRunning:
            if args.toggle or args.disable:
                print('turn off wg signal')
                wg.disable()
    
            if args.setfreq is not None:
                wg.set_freq(args.setfreq, args.mode)
            if args.setampl is not None:
                wg.set_ampl(args.setampl, args.mode)
            if args.setphase is not None:
                wg.set_phase(args.setphase, args.mode)
    
            if args.getfreq:
                pprint.pprint(wg.get_freq(), width=100, compact=True)
            if args.getampl:
                pprint.pprint(wg.get_ampl(), width=100, compact=True)
            if args.getphase:
                pprint.pprint(wg.get_phase(), width=100, compact=True)
    
            if args.toggle or args.enable:
                print('turn on wg signal')
                wg.enable()
                print('wait until wg active again')
                time.sleep(2.0)  # wain until active
            break
        client.disconnect()
    
    
    if __name__ == "__main__":
        # Parse command line arguments
        parser = argparse.ArgumentParser(description="opcua client command line argument parser")
        parser.add_argument('--host', dest='host', type=str, default='dop36', help="host to connect to")
        parser.add_argument('--port', dest='port', type=int, default=4840, help="port to use")
        parser.add_argument('-n', '--nodes', dest='nodes', type=str, help="nodes to use")
        parser.add_argument('-v', action='count', default=0, help="verbosity 'WARNING', 'INFO', 'DEBUG' -v, -vv, -vvv")
        #
        parser.add_argument('--mode',     type=str, choices=['xst', 'sinus'], help="select wg mode")
        parser.add_argument('--setfreq',  type=float, help="set wg freq")
        parser.add_argument('--setampl',  type=float, help="set wg ampl")
        parser.add_argument('--setphase', type=float, help="set wg phase in degrees")
        parser.add_argument('--getfreq',  action='store_true', help="get wg freq")
        parser.add_argument('--getampl',  action='store_true', help="get wg ampl")
        parser.add_argument('--getphase', action='store_true', help="get wg phase in degrees")
        parser.add_argument('--enable',   action='store_true', help="turn on wg signal")
        parser.add_argument('--disable',  action='store_true', help="turn off wg signal")
        parser.add_argument('--toggle',   action='store_true', help="turn the wg signal off and on again")
    
        args = parser.parse_args()
    
        node_list = arg_str_to_list(args.nodes) if args.nodes else None
    
        LOGLEVEL = ['ERROR', 'WARNING', 'INFO', 'DEBUG']
        log_level = eval("logging.{}".format(LOGLEVEL[args.v]))
    
        logging.basicConfig(level=log_level)
        logger = logging.getLogger('main')
    
        logger.info("parsed arguments: {}".format(args))
    
        clientRunning = True
        try:
            main()
        except KeyboardInterrupt:
            print(" user hit ctrl-c")
            clientRunning = False
        except:
            print('Caught %s', str(sys.exc_info()[0]))
            print(str(sys.exc_info()[1]))
            print('TRACEBACK:\n%s', traceback.format_exc())
            print('Aborting NOW')
    
            clientRunning = False
            time.sleep(1)
    
    sys.exit(1)