diff --git a/fake-opcuaserv.py b/fake-opcuaserv.py new file mode 100644 index 0000000000000000000000000000000000000000..dc60ad299e91b8073eb91bff7c9ffd51d55c3b2f --- /dev/null +++ b/fake-opcuaserv.py @@ -0,0 +1,123 @@ +""" + Fake OPC-UA server that won't access I2C but will expose the interface as derived from the YaML files. +""" + +import sys +sys.path.insert(0, "..") +import time +from opcua import ua, Server +from datetime import datetime; + +import pypcc; +P1=pypcc.pypcc("LTS_pypcc.yaml") + + +if True: +# setup our server + server = Server() + server.set_endpoint("opc.tcp://0.0.0.0:4848/PCC/") + + idx = server.register_namespace("http://lofar.eu") +# uri = "http://examples.freeopcua.github.io" +# idx = server.register_namespace(uri) + + objects = server.get_objects_node() + + # populating our address space + PCCobj = objects.add_object(idx, "PCC") + + # myvar = myobj.add_variable(idx, "MyVariable", 6.7) +# myvar.set_writable() # Set MyVariable to be writable by clients + + # starting! + +#Vars={} +#Vars[1]='123' +#print(Vars) +#exit() +def ValCallback(nodeid): + vname,myvar=Vars_R[nodeid.Identifier] + print("Value callback",vname) +# vname=vname[vname.find('_')+1:] +# print("RCU variable:",vname) +# X=RCU1.Getvar2(vname) + a=[0] + res=P1.GetVarValue(vname,a) + print("Result from RCU:",res,a) + if res: + myvar.Value.Value=a[0] + myvar.SourceTimestamp = datetime.utcnow() + return myvar + + + +class SubHandler(object): + """ + Subscription Handler. To receive events from server for a subscription + """ + + def datachange_notification(self, node, val, data): +# print("Python: New data change event", node, val,data) + vname,myvar=Vars_W[node.nodeid.Identifier] + print("Value callback",vname,val) + if not(running): return + P1.SetVarValue(vname,[val]) + + def event_notification(self, event): + print("Python: New event", event) + + +def CallMethod(ObjectID,name): + print("Callmethod",name) + P1.CallMethod(name,None) + + + +#P1.RunMethod("RCU_on",None) +#P1.SetVarValue("Band1",3) +Vars_R={} +Vars_W={} +def AddVar(name,dtype=0,RW=0): + if RW in [1,3]: + vname=name+"_R"; + if dtype==1: + myvar = PCCobj.add_variable(idx, vname, 0.0) + else: + myvar = PCCobj.add_variable(idx, vname, 0) + print("Variable added: ",vname) + varvalue=myvar.get_data_value().Value + else: + if dtype==1: + varvalue=0.0 + else: + varvalue=0 + + if RW in [2,3]: + vname=name+"_RW"; + print("Variable added: ",vname,'=',varvalue) + myvar2 = PCCobj.add_variable(idx, vname, varvalue) + myvar2.set_writable() + + +def AddMethod(name): + vname=name; + myvar = PCCobj.add_method(idx, vname, lambda ObjectId : CallMethod(ObjectId,name), [],[] ) + print("AddMethod:",vname) + +print("Start server"); +server.start() +handler = SubHandler() +sub = server.create_subscription(500, handler) +running=False; +print("Add variables:") +P1.GetVarNames("",AddVar); + +print("Add modes:") +P1.GetMethodNames("",AddMethod); + +try: + while True: + time.sleep(1) +finally: + print("Stop server"); + server.stop()