Select Git revision
opcuaserv.py
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
opcuaserv.py 3.89 KiB
import sys
sys.path.insert(0, "..")
import time
from opcua import ua, Server
from datetime import datetime;
import logging
#import Vars
#import HWconf
#from pcctypes import *
Vars_R={}
Vars_W={}
running=False
class SubHandler(object):
"""
Subscription Handler. To receive events from server for a subscription
"""
def datachange_notification(self, node, val, data):
# print("Python: New data change event", node, val,data)
if not(running): return
vname,myvar,v,reader=Vars_W[node.nodeid.Identifier]
# val=(val if isinstance(val, list) else [val] )
logging.info(str(("Datachange callback",vname,val)))
# myvar2.Value.Value=val
# myvar2.SourceTimestamp = datetime.utcnow()
reader.setvar(v,val)
# Inst=Instr(DevType.Var,VarD,len(val),val)
# Q1.put(Inst)
# P1.SetVarValue(vname,val)
#readback
# if True:
# print(Vars_R,Vars_R.values())
# for vname2,myvar2,oldvalue in Vars_R.values():
# if vname2==vname:
# res=P1.GetVarValue(vname,val)
# print("Read callback",vname,": Result:",res,oldvalue)
# if res:
def event_notification(self, event):
logging.info(str(("Python: New event", event)))
def CallMethod(ObjectID,name,Inst1,Q1):
logging.info(str(("Callmethod",ObjectID,name)))
# Inst1=Vars.Instr(Vars.DevType.Instr,instrs,0,[])
Q1.callMethod(Inst1)
# P1.CallMethod(name,None)
def AddVar(name,value):
myvar2 = PCCobj.add_variable(idx, name, value)
myvar2.set_writable()
return myvar2
def AddVarR(vname,varvalue2,v,debug):
obj=(DEBUGobj if debug else PCCobj)
myvar = obj.add_variable(idx, vname, varvalue2)
logging.info(str(("Variable added: ",vname,len(varvalue2))))
Vars_R[myvar.nodeid.Identifier]=[vname,myvar.get_data_value(),varvalue2,v]
return myvar
def AddVarW(vname,varvalue2,v,Q1,debug):
logging.info(str(("Variable added: ",vname)))#,'=',varvalue2)
obj=(DEBUGobj if debug else PCCobj)
myvar2 = obj.add_variable(idx, vname, varvalue2)
myvar2.set_writable()
if v:
Vars_W[myvar2.nodeid.Identifier]=[vname,myvar2.get_data_value(),v,Q1]
handle = sub.subscribe_data_change(myvar2)
return myvar2
def Addmethod(vname,v,Q1,debug):
obj=(DEBUGobj if debug else PCCobj)
myvar = obj.add_method(idx, vname, lambda ObjectId,name=vname,inst=v,Q1=Q1 : CallMethod(ObjectId,name,inst,Q1), [],[] )
logging.info(str(("AddMethod:",vname)))
def InitServer(port=4840):
# setup our server
global server,running,PCCobj,DEBUGobj,idx,sub;
server = Server()
server.set_endpoint("opc.tcp://0.0.0.0:{}/PCC/".format(port))
idx = server.register_namespace("http://lofar.eu")
# uri = "http://examples.freeopcua.github.io"
# idx = server.register_namespace(uri)
objects = server.get_objects_node()
# populating our address space
PCCobj = objects.add_object(idx, "PCC")
DEBUGobj = PCCobj.add_object(idx, "DEBUG")
# self.PCCobj=PCCobj
# starting!
logging.info("Start server");
server.start()
handler = SubHandler()
sub = server.create_subscription(500, handler)
running=False;
logging.info("Add variables:")
#P1.GetVarNames("",AddVar);
# time.sleep(1)
# running=True
# return Vars_R,Vars_W
return
#exit()
def start():
global running
running=True
#print("Add modes:")
#P1.GetMethodNames("",AddMethod);
#RCU.changemode(0)
#RCU.Setvar2('HBA_PwrX0',16*(1,))
#RCU.Setvar2('HBA_LED0',16*(1,))
#RCU.Setvar2('HBA_PwrX0',16*(0,))
#RCU.Setvar2('HBA_LED0',16*(0,))
#RCU.Setvar2('Amp_Gain0',[11])
#RCU.Setvar2('Amp_Gain1',[10])
#RCU.Setvar2('Power_Ant0',[1])
#RCU.Setvar2('Power_Ant1',[1])
#RCU.Setvar2('Power_Ant0',[0])
#RCU.Setvar2('Power_Ant1',[0])
#print(RCU.Getvar2('Amp_Gain0'))
#print(RCU.Getvar2('ID'))
#print(RCU.Getvar2('ADC_lock1'))
#print(RCU.Getvar2('HBA_DelayX1',element=1))