Skip to content
Snippets Groups Projects
Commit 20f26a2a authored by Paulus Kruger's avatar Paulus Kruger
Browse files

Merge branch 'master' of git.astron.nl:lofar2.0/pypcc

parents b5b3c5c8 c6582715
No related branches found
No related tags found
No related merge requests found
"""
Fake OPC-UA server that won't access I2C but will expose the interface as derived from the YaML files.
"""
import sys
sys.path.insert(0, "..")
import time
from opcua import ua, Server
from datetime import datetime;
import pypcc;
P1=pypcc.pypcc("LTS_pypcc.yaml")
if True:
# setup our server
server = Server()
server.set_endpoint("opc.tcp://0.0.0.0:4848/PCC/")
idx = server.register_namespace("http://lofar.eu")
# uri = "http://examples.freeopcua.github.io"
# idx = server.register_namespace(uri)
objects = server.get_objects_node()
# populating our address space
PCCobj = objects.add_object(idx, "PCC")
# myvar = myobj.add_variable(idx, "MyVariable", 6.7)
# myvar.set_writable() # Set MyVariable to be writable by clients
# starting!
#Vars={}
#Vars[1]='123'
#print(Vars)
#exit()
def ValCallback(nodeid):
vname,myvar=Vars_R[nodeid.Identifier]
print("Value callback",vname)
# vname=vname[vname.find('_')+1:]
# print("RCU variable:",vname)
# X=RCU1.Getvar2(vname)
a=[0]
res=P1.GetVarValue(vname,a)
print("Result from RCU:",res,a)
if res:
myvar.Value.Value=a[0]
myvar.SourceTimestamp = datetime.utcnow()
return myvar
class SubHandler(object):
"""
Subscription Handler. To receive events from server for a subscription
"""
def datachange_notification(self, node, val, data):
# print("Python: New data change event", node, val,data)
vname,myvar=Vars_W[node.nodeid.Identifier]
print("Value callback",vname,val)
if not(running): return
P1.SetVarValue(vname,[val])
def event_notification(self, event):
print("Python: New event", event)
def CallMethod(ObjectID,name):
print("Callmethod",name)
P1.CallMethod(name,None)
#P1.RunMethod("RCU_on",None)
#P1.SetVarValue("Band1",3)
Vars_R={}
Vars_W={}
def AddVar(name,dtype=0,RW=0):
if RW in [1,3]:
vname=name+"_R";
if dtype==1:
myvar = PCCobj.add_variable(idx, vname, 0.0)
else:
myvar = PCCobj.add_variable(idx, vname, 0)
print("Variable added: ",vname)
varvalue=myvar.get_data_value().Value
else:
if dtype==1:
varvalue=0.0
else:
varvalue=0
if RW in [2,3]:
vname=name+"_RW";
print("Variable added: ",vname,'=',varvalue)
myvar2 = PCCobj.add_variable(idx, vname, varvalue)
myvar2.set_writable()
def AddMethod(name):
vname=name;
myvar = PCCobj.add_method(idx, vname, lambda ObjectId : CallMethod(ObjectId,name), [],[] )
print("AddMethod:",vname)
print("Start server");
server.start()
handler = SubHandler()
sub = server.create_subscription(500, handler)
running=False;
print("Add variables:")
P1.GetVarNames("",AddVar);
print("Add modes:")
P1.GetMethodNames("",AddMethod);
try:
while True:
time.sleep(1)
finally:
print("Stop server");
server.stop()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment