Select Git revision
tech_complex_mult.vhd
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
opcuaserv.py 5.62 KiB
import sys
sys.path.insert(0, "..")
import time
from opcua import ua, Server
from datetime import datetime;
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("-s", "--simulator", help="Do not connect to I2c, but simulate behaviour.", action="store_true")
parser.add_argument("--no-lib-hack", help="Do not require a hacked opcua library. Breaks behaviour.", action="store_true")
parser.add_argument("-p", "--port", help="Port number to listen on [%(default)s].", type=int, default=4842)
args = parser.parse_args()
if args.simulator:
import pypcc_test as pypcc
else:
import pypcc
P1=pypcc.pypcc("LTS_pypcc.yaml")
if True:
# setup our server
server = Server()
server.set_endpoint("opc.tcp://0.0.0.0:{}/PCC/".format(args.port))
idx = server.register_namespace("http://lofar.eu")
# uri = "http://examples.freeopcua.github.io"
# idx = server.register_namespace(uri)
objects = server.get_objects_node()
# populating our address space
PCCobj = objects.add_object(idx, "PCC")
# myvar = myobj.add_variable(idx, "MyVariable", 6.7)
# myvar.set_writable() # Set MyVariable to be writable by clients
# starting!
#Vars={}
#Vars[1]='123'
#print(Vars)
#exit()
def ValCallback(nodeid,force=False):
vname,myvar,oldvalue=Vars_R[nodeid.Identifier]
# print("Value callback",vname)
# vname=vname[vname.find('_')+1:]
# print("RCU variable:",vname,oldvalue)
# X=RCU1.Getvar2(vname)
# if not(running):
if False:
myvar.Value.Value=(oldvalue[0] if len(oldvalue)==1 else oldvalue)
myvar.SourceTimestamp = datetime.utcnow()
return myvar
timenow=datetime.utcnow()
timediff=(timenow-myvar.SourceTimestamp).total_seconds()
# print(timediff)
if not(force) and timediff<90: return myvar;
res=P1.GetVarValue(vname,oldvalue)
print("Read callback",vname,": Result:",res,oldvalue)
if res:
myvar.Value.Value=(oldvalue[0] if len(oldvalue)==1 else oldvalue)
myvar.SourceTimestamp = datetime.utcnow()
# Vars_R[nodeid.Identifier][3]=oldvalue
return myvar
class SubHandler(object):
"""
Subscription Handler. To receive events from server for a subscription
"""
def datachange_notification(self, node, val, data):
# print("Python: New data change event", node, val,data)
if not(running): return
vname,myvar=Vars_W[node.nodeid.Identifier]
val=(val if isinstance(val, list) else [val] )
print("Write callback",vname,val)
P1.SetVarValue(vname,val)
#readback
# if True:
# print(Vars_R,Vars_R.values())
for vname2,myvar2,oldvalue in Vars_R.values():
if vname2==vname:
if args.simulator:
res=True
print("Simulating fallthrough _RW->_R for",vname,": Result:",res,oldvalue)
else:
res=P1.GetVarValue(vname,val)
print("Read callback",vname,": Result:",res,oldvalue)
if res:
myvar2.Value.Value=(val[0] if len(val)==1 else val)
myvar2.SourceTimestamp = datetime.utcnow()
def event_notification(self, event):
print("Python: New event", event)
def CallMethod(ObjectID,name):
print("Callmethod",name)
P1.CallMethod(name,None)
#P1.RunMethod("RCU_on",None)
#P1.SetVarValue("Band1",3)
Vars_R={}
Vars_W={}
def AddVar(name,dtype=0,RW=0,cnt=1):
if dtype==1:
varvalue2=([0.0] if cnt<=1 else cnt*[0.0])
else:
varvalue2=([0] if cnt<=1 else cnt*[0])
if RW in [1,3]:
vname=name+"_R";
if dtype==1:
myvar = (PCCobj.add_variable(idx, vname, 0.0) if cnt<=1 else PCCobj.add_variable(idx, vname, cnt*[0.0]))
else:
myvar = (PCCobj.add_variable(idx, vname, 0) if cnt<=1 else PCCobj.add_variable(idx, vname, cnt*[0]))
print("Variable added: ",vname)
Vars_R[myvar.nodeid.Identifier]=[name,myvar.get_data_value(),varvalue2]
ValCallback(myvar.nodeid,force=True)
# print(myvar.get_value())
if not args.no_lib_hack:
server.set_attribute_callback(myvar.nodeid, ValCallback)
# varvalue=myvar.get_data_value().Value
# Vars_R[myvar.nodeid.Identifier][2]=varvalue
# print(varvalue2,varvalue)
if RW in [2,3]:
vname=name+"_RW";
print("Variable added: ",vname)#,'=',varvalue2)
myvar2 = PCCobj.add_variable(idx, vname, (varvalue2[0] if len(varvalue2)==1 else varvalue2))
myvar2.set_writable()
Vars_W[myvar2.nodeid.Identifier]=[name,myvar2.get_data_value()]
handle = sub.subscribe_data_change(myvar2)
def AddMethod(name):
vname=name;
myvar = PCCobj.add_method(idx, vname, lambda ObjectId : CallMethod(ObjectId,name), [],[] )
print("AddMethod:",vname)
print("Start server");
server.start()
handler = SubHandler()
sub = server.create_subscription(500, handler)
running=False;
print("Add variables:")
P1.GetVarNames("",AddVar);
print("Add modes:")
P1.GetMethodNames("",AddMethod);
time.sleep(1)
running=True;
print("Server started")
try:
while True:
time.sleep(1)
finally:
print("Stop server");
server.stop()
#RCU.changemode(0)
#RCU.Setvar2('HBA_PwrX0',16*(1,))
#RCU.Setvar2('HBA_LED0',16*(1,))
#RCU.Setvar2('HBA_PwrX0',16*(0,))
#RCU.Setvar2('HBA_LED0',16*(0,))
#RCU.Setvar2('Amp_Gain0',[11])
#RCU.Setvar2('Amp_Gain1',[10])
#RCU.Setvar2('Power_Ant0',[1])
#RCU.Setvar2('Power_Ant1',[1])
#RCU.Setvar2('Power_Ant0',[0])
#RCU.Setvar2('Power_Ant1',[0])
#print(RCU.Getvar2('Amp_Gain0'))
#print(RCU.Getvar2('ID'))
#print(RCU.Getvar2('ADC_lock1'))
#print(RCU.Getvar2('HBA_DelayX1',element=1))