Skip to content
Snippets Groups Projects
Commit c6582715 authored by Jan David Mol's avatar Jan David Mol
Browse files

Added a fake OPC-UA server for testing purposes

parent 82250c99
No related branches found
No related tags found
No related merge requests found
"""
Fake OPC-UA server that won't access I2C but will expose the interface as derived from the YaML files.
"""
import sys
sys.path.insert(0, "..")
import time
from opcua import ua, Server
from datetime import datetime;
import pypcc;
P1=pypcc.pypcc("LTS_pypcc.yaml")
if True:
# setup our server
server = Server()
server.set_endpoint("opc.tcp://0.0.0.0:4848/PCC/")
idx = server.register_namespace("http://lofar.eu")
# uri = "http://examples.freeopcua.github.io"
# idx = server.register_namespace(uri)
objects = server.get_objects_node()
# populating our address space
PCCobj = objects.add_object(idx, "PCC")
# myvar = myobj.add_variable(idx, "MyVariable", 6.7)
# myvar.set_writable() # Set MyVariable to be writable by clients
# starting!
#Vars={}
#Vars[1]='123'
#print(Vars)
#exit()
def ValCallback(nodeid):
vname,myvar=Vars_R[nodeid.Identifier]
print("Value callback",vname)
# vname=vname[vname.find('_')+1:]
# print("RCU variable:",vname)
# X=RCU1.Getvar2(vname)
a=[0]
res=P1.GetVarValue(vname,a)
print("Result from RCU:",res,a)
if res:
myvar.Value.Value=a[0]
myvar.SourceTimestamp = datetime.utcnow()
return myvar
class SubHandler(object):
"""
Subscription Handler. To receive events from server for a subscription
"""
def datachange_notification(self, node, val, data):
# print("Python: New data change event", node, val,data)
vname,myvar=Vars_W[node.nodeid.Identifier]
print("Value callback",vname,val)
if not(running): return
P1.SetVarValue(vname,[val])
def event_notification(self, event):
print("Python: New event", event)
def CallMethod(ObjectID,name):
print("Callmethod",name)
P1.CallMethod(name,None)
#P1.RunMethod("RCU_on",None)
#P1.SetVarValue("Band1",3)
Vars_R={}
Vars_W={}
def AddVar(name,dtype=0,RW=0):
if RW in [1,3]:
vname=name+"_R";
if dtype==1:
myvar = PCCobj.add_variable(idx, vname, 0.0)
else:
myvar = PCCobj.add_variable(idx, vname, 0)
print("Variable added: ",vname)
varvalue=myvar.get_data_value().Value
else:
if dtype==1:
varvalue=0.0
else:
varvalue=0
if RW in [2,3]:
vname=name+"_RW";
print("Variable added: ",vname,'=',varvalue)
myvar2 = PCCobj.add_variable(idx, vname, varvalue)
myvar2.set_writable()
def AddMethod(name):
vname=name;
myvar = PCCobj.add_method(idx, vname, lambda ObjectId : CallMethod(ObjectId,name), [],[] )
print("AddMethod:",vname)
print("Start server");
server.start()
handler = SubHandler()
sub = server.create_subscription(500, handler)
running=False;
print("Add variables:")
P1.GetVarNames("",AddVar);
print("Add modes:")
P1.GetMethodNames("",AddMethod);
try:
while True:
time.sleep(1)
finally:
print("Stop server");
server.stop()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment