Select Git revision
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
wg.py 6.98 KiB
#! /usr/bin/env python3
# ##########################################################################
# Copyright 2020
# ASTRON (Netherlands Institute for Radio Astronomy) <http://www.astron.nl/>
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ##########################################################################
# ##########################################################################
# Author:
# . Pieter Donker
# Purpose:
# . set Waveform generator
# Description:
# . run ./wg.py -h for help
# ##########################################################################
import sys
sys.path.insert(0, "..")
import logging
import traceback
import argparse
import time
import pprint
import math
from base import *
class WaveformGenerator(object):
def __init__(self, client):
self.client = client
self.S_PN = 12
self.n_nodes = self.client.read('sdp_config_nof_fpgas') # get number of nodes from server
self._ampl = None
self._phase = None
self._freq = None
def set_ampl(self, value, mode=None):
ampl = value
mode = 'sinus' if mode is None else mode.lower()
if mode == 'xst':
self.client.write('wg_amplitude', ([ampl * (1 - (i / self.S_PN)) for i in range(self.S_PN)] * self.n_nodes))
else:
# must be for sinus
self.client.write('wg_amplitude', ([ampl] * self.S_PN * self.n_nodes))
return
def get_ampl(self):
self._ampl = self.client.read('wg_amplitude')
return self._ampl
def set_phase(self, value, mode=None):
phase = value / (360 / (2 * math.pi))
mode = 'sinus' if mode is None else mode.lower()
if mode == 'xst':
self.client.write('wg_phase', ([i * ((2 * math.pi) / self.S_PN) for i in range(self.S_PN)] * self.n_nodes))
else:
# must be for sinus
self.client.write('wg_phase', ([phase] * self.S_PN * self.n_nodes))
return
def get_phase(self):
self._phase = self.client.read('wg_phase')
return [i * (360 / (2 * math.pi)) for i in self._phase]
def set_freq(self, value, mode=None):
freq = value
mode = 'sinus' if mode is None else mode.lower()
if mode == 'xst':
self.client.write('wg_frequency' ([(102 / 1024) * freq for i in range(self.S_PN)] * self.n_nodes))
else:
# must be for sinus
self.client.write('wg_frequency', ([freq] * self.S_PN * self.n_nodes))
return
def get_freq(self):
self._freq = self.client.read('wg_frequency')
return self._freq
def set_sinus(self, ampl, phase, freq):
self.set_ampl(ampl, 'sinus')
self.set_phase(phase, 'sinus')
self.set_freq(freq, 'sinus')
def set_xst_mode(self, ampl, phase, freq):
# Write WG configuration with phases from 0 - 360 and 1 to 1/12 amplitudes
self.set_ampl(ampl, 'xst')
self.set_phase(phase, 'xst')
self.set_freq(freq, 'xst')
def enable(self):
if self.client.write('wg_enable', ([True] * self.S_PN * self.n_nodes)):
return True
return False
def disable(self):
if self.client.write('wg_enable', ([False] * self.S_PN * self.n_nodes)):
return True
return False
def main():
client = OpcuaClient(args.host, args.port)
client.connect()
client.set_mask(node_list)
logger.info("fpga mask={}".format(client.get_mask())) # read back nodes set.
wg = WaveformGenerator(client)
while clientRunning:
if args.toggle or args.disable:
print('turn off wg signal')
wg.disable()
if args.setfreq is not None:
wg.set_freq(args.setfreq, args.mode)
if args.setampl is not None:
wg.set_ampl(args.setampl, args.mode)
if args.setphase is not None:
wg.set_phase(args.setphase, args.mode)
if args.getfreq:
pprint.pprint(wg.get_freq(), width=100, compact=True)
if args.getampl:
pprint.pprint(wg.get_ampl(), width=100, compact=True)
if args.getphase:
pprint.pprint(wg.get_phase(), width=100, compact=True)
if args.toggle or args.enable:
print('turn on wg signal')
wg.enable()
print('wait until wg active again')
time.sleep(2.0) # wain until active
break
client.disconnect()
if __name__ == "__main__":
# Parse command line arguments
parser = argparse.ArgumentParser(description="opcua client command line argument parser")
parser.add_argument('--host', dest='host', type=str, default='dop36', help="host to connect to")
parser.add_argument('--port', dest='port', type=int, default=4840, help="port to use")
parser.add_argument('-n', '--nodes', dest='nodes', type=str, help="nodes to use")
parser.add_argument('-v', action='count', default=0, help="verbosity 'WARNING', 'INFO', 'DEBUG' -v, -vv, -vvv")
#
parser.add_argument('--mode', type=str, choices=['xst', 'sinus'], help="select wg mode")
parser.add_argument('--setfreq', type=float, help="set wg freq")
parser.add_argument('--setampl', type=float, help="set wg ampl")
parser.add_argument('--setphase', type=float, help="set wg phase in degrees")
parser.add_argument('--getfreq', action='store_true', help="get wg freq")
parser.add_argument('--getampl', action='store_true', help="get wg ampl")
parser.add_argument('--getphase', action='store_true', help="get wg phase in degrees")
parser.add_argument('--enable', action='store_true', help="turn on wg signal")
parser.add_argument('--disable', action='store_true', help="turn off wg signal")
parser.add_argument('--toggle', action='store_true', help="turn the wg signal off and on again")
args = parser.parse_args()
node_list = arg_str_to_list(args.nodes) if args.nodes else None
LOGLEVEL = ['ERROR', 'WARNING', 'INFO', 'DEBUG']
log_level = eval("logging.{}".format(LOGLEVEL[args.v]))
logging.basicConfig(level=log_level)
logger = logging.getLogger('main')
logger.info("parsed arguments: {}".format(args))
clientRunning = True
try:
main()
except KeyboardInterrupt:
print(" user hit ctrl-c")
clientRunning = False
except:
print('Caught %s', str(sys.exc_info()[0]))
print(str(sys.exc_info()[1]))
print('TRACEBACK:\n%s', traceback.format_exc())
print('Aborting NOW')
clientRunning = False
time.sleep(1)
sys.exit(1)