Skip to content
Snippets Groups Projects
Commit 439267ae authored by Paulus Kruger's avatar Paulus Kruger
Browse files

move scripts, DAB boolean

parent 8c8308d5
No related branches found
No related tags found
1 merge request!18Test setup
......@@ -584,7 +584,7 @@ variables:
read_parallel: all
- name: RCU_DTH_PWR
description: RCU Dither source power
description: RCU Dither source power (dBm). Range -25 to -4.
driver: I2C_RCU
devreg: [DTH1.PA_CONFIG,DTH2.PA_CONFIG,DTH3.PA_CONFIG]
width: 48
......@@ -613,13 +613,14 @@ variables:
#RCU2H points
- name: RCU_DAB_select
description: Band select ...
description: DAB filter enable 1=enabled, 2=disabled
driver: I2C_RCU
devreg: [IO4.GPIO1,IO4.GPIO1,IO4.GPIO1]
bitoffset: [0,2,4]
width: 2
rw: rw
dtype: uint8
dtype: boolean
convert_unit: int12_to_bool
dim: 96
dim2: [3,32]
mask: ANT_mask
......
......@@ -11,3 +11,9 @@ def bool_invert(T):
def temp_check(T):
if (T<-100) or (T>200): return float('nan')
return T;
def int12_to_bool_inv(data):
return 1 if data else 2
def int12_to_bool(data):
return data==1
\ No newline at end of file
......@@ -46,3 +46,8 @@ def si4012_6bytes_to_pwr_inv(data):
pwr+=127 #fBetaSteps #default
return pwr
def bool_to_int12(data):
return 1 if data else 2
def bool_to_int12_inv(data):
return data==1
\ No newline at end of file
......@@ -93,7 +93,7 @@ def var2byte(v,data):
data=[int(d/scale) for d in data]
if (dtype=="boolean"):
convert=v.get("convert_unit")
if convert: data=[eval("convert_unit."+convert)(d) for d in data]
if convert: data=[eval("convert_unit."+convert+"_inv")(d) for d in data]
data2=bytearray(data*1);
elif (dtype in ['uint8','uint16','uint32','uint64','double']):
if width<=1:
......
#from opcua import ua, Server
from asyncua.sync import ua, Server
from time import sleep
class SubHandler(object):
"""
Subscription Handler. To receive events from server for a subscription
"""
def datachange_notification(self, node, val, data):
print("Datachange callback",node,val,data)
# def datachange_notification(self, node, val, data):
# logging.warning(str(("Python: New data change event", node, val, data.monitored_item.Value.StatusCode)))
# logging.warning(str(("Python: Data came from client: ", (data.monitored_item.Value.StatusCode == ua.StatusCode(ua.StatusCodes.Good)))))
# if data came from client, mark the status to get a new notification if the client rewrites the same value
if data.monitored_item.Value.StatusCode == ua.StatusCode(ua.StatusCodes.Good):
print(str(("Python: New client data change event", node, val, data.monitored_item.Value.StatusCode)))
#node.set_value(ua.DataValue(val, ua.StatusCode(ua.StatusCodes.GoodCompletesAsynchronously)))
#data.monitored_item.Value.StatusCode=ua.StatusCodes.GoodCompletesAsynchronously
if True:
server = Server()
server.set_endpoint("opc.tcp://0.0.0.0:1234/")
idx = server.register_namespace("http://lofar.eu")
print("idx=",idx)
PCCobj = server.nodes.objects #server.get_objects_node()
handler = SubHandler()
sub = server.create_subscription(50, handler)
myvar2 = PCCobj.add_variable(idx, "test1", 1.5)
myvar2.set_writable()
handle = sub.subscribe_data_change(myvar2)
server.start()
sleep(0.3)
from asyncua.sync import Client
#from opcua import Client
#with Client(url='opc.tcp://localhost:1234/') as client:
# while True:
# Do something with client
# node = client.get_node('i=1')
# value = node.read_value()
# print(value)
client = Client("opc.tcp://localhost:1234/")
try:
client.connect()
#client.load_type_definitions() # load definition of server specific structures/extension objects
# Client has a few methods to get proxy to UA nodes that should always be in address space such as Root or Objects
#root = client.get_root_node()
#print("Root node is: ", root)
#objects = client.get_objects_node()
#print("Objects node is: ", objects)
# Node objects have methods to read and write node attributes as well as browse or populate address space
#print("Children of root are: ", root.get_children())
# get a specific node knowing its node id
#var = client.get_node(ua.NodeId(1002, 2))
var = client.get_node("ns=2;i=1")
# handler = SubHandler()
# sub = client.create_subscription(500, handler)
# handle = sub.subscribe_data_change(var)
#var = client.get_node("ns=2;g=1be5ba38-d004-46bd-aa3a-b5b87940c698")
#print(var)
#var.get_data_value() # get value of node as a DataValue object
#print(var.get_value()) # get value of node as a python builtin
#var.set_value(ua.Variant([23], ua.VariantType.Int64)) #set node value using explicit data type
var.set_value(2.5) # set node value using implicit data type
sleep(0.1)
#print(var.get_value()) # get value of node as a python builtin
var.set_value(2.5) # set node value using implicit data type
sleep(0.1)
#print(var.get_value()) # get value of node as a python builtin
sleep(0.3)
finally:
client.disconnect()
server.stop()
......@@ -31,7 +31,7 @@ class yamlconfig():
def __init__(self,yamlfile='RCU'):
pkg = importlib_resources.files("pypcc")
pkg_data_file = pkg / "config" / (yamlfile+'.yaml')
self.conf=yaml.load(pkg_data_file.open())
self.conf=yaml.load(pkg_data_file.open(), Loader=yaml.FullLoader)
self.expand_variables()
# print([[v['name'],v.get('devreg')] for v in var1])
# print(len(self.conf['variables']),N)
......
File moved
File moved
import yamlconfig as yc
import pypcc.yamlconfig as yc
import sys
if len(sys.argv)<2:
print("Usage: SCTable yamlfile")
exit()
RW={'ro':'_R','rw':'_R/_RW','variable':'_RW'}
DT={'uint8':'numpy.int64 ',
'uint32':'numpy.int64 ',
......
import yamlconfig as yc
import pypcc.yamlconfig as yc
import sys
if len(sys.argv)<2:
print("Usage: VarTable yamlfile")
exit()
RW={'ro':'R','rw':'RW','variable':'RW'}
Masked=True;
Y=yc.yamlconfig(sys.argv[1])
......
name='RECVTR_LB_TEST' #YAML config file with all register values etc
import logging
import argparse
from opcuaserv import opcuaserv
from opcuaserv import i2client
from opcuaserv import yamlreader
#import argparse
from pypcc.opcuaserv import opcuaserv
from pypcc.opcuaserv import i2client
from pypcc.opcuaserv import yamlreader
#from opcuaserv import pypcc2
from i2cserv import i2cthread
import threading
from pypcc.i2cserv import i2cthread
#import threading
import time
import sys
import signal
from yamlconfig import Find;
import yamlconfig as yc
from pypcc.yamlconfig import Find;
import pypcc.yamlconfig as yc
from datetime import datetime
from opcuaserv.yamlreader import byte2var,var2byte
from pypcc.opcuaserv.yamlreader import byte2var,var2byte
class i2cdirect():
def __init__(self,name):
......
File moved
File moved
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment