Skip to content
Snippets Groups Projects
Commit 88e496be authored by Jan David Mol's avatar Jan David Mol
Browse files

Added command-line arguments to configure port, and disable some behaviour...

Added command-line arguments to configure port, and disable some behaviour allowing the server to run as a simulator
parent 476c792d
Branches
No related tags found
No related merge requests found
"""
Fake OPC-UA server that won't access I2C but will expose the interface as derived from the YaML files.
"""
import sys
sys.path.insert(0, "..")
import time
from opcua import ua, Server
from datetime import datetime;
import pypcc;
P1=pypcc.pypcc("LTS_pypcc.yaml")
if True:
# setup our server
server = Server()
server.set_endpoint("opc.tcp://0.0.0.0:4848/PCC/")
idx = server.register_namespace("http://lofar.eu")
# uri = "http://examples.freeopcua.github.io"
# idx = server.register_namespace(uri)
objects = server.get_objects_node()
# populating our address space
PCCobj = objects.add_object(idx, "PCC")
# myvar = myobj.add_variable(idx, "MyVariable", 6.7)
# myvar.set_writable() # Set MyVariable to be writable by clients
# starting!
#Vars={}
#Vars[1]='123'
#print(Vars)
#exit()
def ValCallback(nodeid):
vname,myvar=Vars_R[nodeid.Identifier]
print("Value callback",vname)
# vname=vname[vname.find('_')+1:]
# print("RCU variable:",vname)
# X=RCU1.Getvar2(vname)
a=[0]
res=P1.GetVarValue(vname,a)
print("Result from RCU:",res,a)
if res:
myvar.Value.Value=a[0]
myvar.SourceTimestamp = datetime.utcnow()
return myvar
class SubHandler(object):
"""
Subscription Handler. To receive events from server for a subscription
"""
def datachange_notification(self, node, val, data):
# print("Python: New data change event", node, val,data)
vname,myvar=Vars_W[node.nodeid.Identifier]
print("Value callback",vname,val)
if not(running): return
P1.SetVarValue(vname,[val])
def event_notification(self, event):
print("Python: New event", event)
def CallMethod(ObjectID,name):
print("Callmethod",name)
P1.CallMethod(name,None)
#P1.RunMethod("RCU_on",None)
#P1.SetVarValue("Band1",3)
Vars_R={}
Vars_W={}
def AddVar(name,dtype=0,RW=0):
if RW in [1,3]:
vname=name+"_R";
if dtype==1:
myvar = PCCobj.add_variable(idx, vname, 0.0)
else:
myvar = PCCobj.add_variable(idx, vname, 0)
print("Variable added: ",vname)
varvalue=myvar.get_data_value().Value
else:
if dtype==1:
varvalue=0.0
else:
varvalue=0
if RW in [2,3]:
vname=name+"_RW";
print("Variable added: ",vname,'=',varvalue)
myvar2 = PCCobj.add_variable(idx, vname, varvalue)
myvar2.set_writable()
def AddMethod(name):
vname=name;
myvar = PCCobj.add_method(idx, vname, lambda ObjectId : CallMethod(ObjectId,name), [],[] )
print("AddMethod:",vname)
print("Start server");
server.start()
handler = SubHandler()
sub = server.create_subscription(500, handler)
running=False;
print("Add variables:")
P1.GetVarNames("",AddVar);
print("Add modes:")
P1.GetMethodNames("",AddMethod);
try:
while True:
time.sleep(1)
finally:
print("Stop server");
server.stop()
...@@ -5,17 +5,25 @@ sys.path.insert(0, "..") ...@@ -5,17 +5,25 @@ sys.path.insert(0, "..")
import time import time
from opcua import ua, Server from opcua import ua, Server
from datetime import datetime; from datetime import datetime;
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("-s", "--simulator", help="Do not connect to I2c, but simulate behaviour.", action="store_true")
parser.add_argument("--no-lib-hack", help="Do not require a hacked opcua library. Breaks behaviour.", action="store_true")
parser.add_argument("-p", "--port", help="Port number to listen on [%(default)s].", type=int, default=4842)
args = parser.parse_args()
import pypcc; if args.simulator:
#import pypcc_test as pypcc; import pypcc_test as pypcc
P1=pypcc.pypcc("LTS_pypcc.yaml") else:
import pypcc
P1=pypcc.pypcc("LTS_pypcc.yaml")
if True: if True:
# setup our server # setup our server
server = Server() server = Server()
server.set_endpoint("opc.tcp://0.0.0.0:4842/PCC/") server.set_endpoint("opc.tcp://0.0.0.0:{}/PCC/".format(args.port))
idx = server.register_namespace("http://lofar.eu") idx = server.register_namespace("http://lofar.eu")
# uri = "http://examples.freeopcua.github.io" # uri = "http://examples.freeopcua.github.io"
...@@ -113,6 +121,7 @@ def AddVar(name,dtype=0,RW=0,cnt=1): ...@@ -113,6 +121,7 @@ def AddVar(name,dtype=0,RW=0,cnt=1):
Vars_R[myvar.nodeid.Identifier]=[name,myvar.get_data_value(),varvalue2] Vars_R[myvar.nodeid.Identifier]=[name,myvar.get_data_value(),varvalue2]
ValCallback(myvar.nodeid,force=True) ValCallback(myvar.nodeid,force=True)
# print(myvar.get_value()) # print(myvar.get_value())
if not args.no_lib_hack:
server.set_attribute_callback(myvar.nodeid, ValCallback) server.set_attribute_callback(myvar.nodeid, ValCallback)
# varvalue=myvar.get_data_value().Value # varvalue=myvar.get_data_value().Value
# Vars_R[myvar.nodeid.Identifier][2]=varvalue # Vars_R[myvar.nodeid.Identifier][2]=varvalue
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment