Select Git revision
update_ConfigDb.sh
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
opcuaserv.py 5.84 KiB
import sys
sys.path.insert(0, "..")
import time
#from opcua import ua, Server
from asyncua.sync import ua, Server
from datetime import datetime;
from queue import PriorityQueue
import logging
#import Vars
#import HWconf
#from pcctypes import *
Vars_R={}
Vars_W={}
methodDict={}
running=False
class SubHandler(object):
"""
Subscription Handler. To receive events from server for a subscription
"""
def __init__(self):
self.datachange_queue=PriorityQueue()
def check_datachange(self,timeout):
while True:
try:
_,val,node=self.datachange_queue.get(timeout=timeout)
except:
break
nodeid=node.nodeid.Identifier
vname,myvar,v,reader,opcvar=Vars_W[nodeid]
# val=(val if isinstance(val, list) else [val] )
logging.warn(str(("Datachange",vname,val[:16] if isinstance(val,list) else val)))
node.set_value(ua.DataValue(val, ua.StatusCode(ua.StatusCodes.GoodCompletesAsynchronously)))
for r in reader:
r.setvar(v,val)
def datachange_notification(self, node, val, data):
# NOTE: OPC variables can not be updates in the datachange_notification when using asyncua.sync!! So we put them in a queue.
# print("Python: New data change event", node, val,data)
if not(running): return
sourcetime=data.monitored_item.Value.SourceTimestamp
logging.info(str(("Datachange callback",node.nodeid.Identifier,None if sourcetime is None else sourcetime.timestamp(),data.monitored_item.Value.StatusCode)))
if data.monitored_item.Value.StatusCode != ua.StatusCode(ua.StatusCodes.Good): return
# logging.warning(str(("Python: New client data change event", node, val, data.monitored_item.Value.StatusCode)))
self.datachange_queue.put((0 if sourcetime is None else sourcetime.timestamp(),val,node))
# myvar2.Value.Value=val
# myvar2.SourceTimestamp = datetime.utcnow()
# Inst=Instr(DevType.Var,VarD,len(val),val)
# Q1.put(Inst)
# P1.SetVarValue(vname,val)
#readback
# if True:
# print(Vars_R,Vars_R.values())
# for vname2,myvar2,oldvalue in Vars_R.values():
# if vname2==vname:
# res=P1.GetVarValue(vname,val)
# print("Read callback",vname,": Result:",res,oldvalue)
# if res:
def event_notification(self, event):
logging.info(str(("Python: New event", event)))
def CallMethod(ObjectID,name,Inst1):
logging.info(str(("Callmethod",ObjectID,name)))
# Inst1=Vars.Instr(Vars.DevType.Instr,instrs,0,[])
for Q1 in methodDict[name]:
Q1.callMethod(Inst1)
# P1.CallMethod(name,None)
def AddVar(name,value):
myvar2 = PCCobj.add_variable(idx, name, value)
myvar2.set_writable()
return myvar2
def AddVarR(vname,varvalue2,v,debug):
for id0,vx in Vars_R.items():
if vx[0]==vname:
logging.info("Duplicate %s "%vname)
# Vars_W[id0][3]+=[Q1]
if vx[3]!=v: logging.error("Duplicate %s with different IDs!"%vname)
return vx[4]
# else: print(vx[0],vname)
obj=(DEBUGobj if debug else PCCobj)
myvar = obj.add_variable(idx, vname, varvalue2)
logging.info(str(("Variable added: ",vname,(len(varvalue2) if isinstance(varvalue2,list) else ''))))
Vars_R[myvar.nodeid.Identifier]=[vname,myvar.get_data_value(),varvalue2,v,myvar]
return myvar
def AddVarW(vname,varvalue2,v,Q1,debug):
for id0,vx in Vars_W.items():
if vx[0]==vname:
logging.info("Duplicate %s "%vname)
Vars_W[id0][3]+=[Q1]
if vx[2]!=v: logging.error("Duplicate %s with different IDs!"%vname)
return vx[4]
logging.info(str(("Variable added: ",vname)))#,'=',varvalue2)
obj=(DEBUGobj if debug else PCCobj)
myvar2 = obj.add_variable(idx, vname, varvalue2)
myvar2.set_writable()
if not v is None:
Vars_W[myvar2.nodeid.Identifier]=[vname,myvar2.get_data_value(),v,[Q1],myvar2]
handle = sub.subscribe_data_change(myvar2)
return myvar2
def Addmethod(vname,v,Q1,debug):
obj=(DEBUGobj if debug else PCCobj)
try:
methodDict[vname]+=[Q1]
logging.info("Duplicate method %s "%vname)
except:
methodDict[vname]=[Q1];
myvar = obj.add_method(idx, vname, lambda ObjectId,name=vname,inst=v : CallMethod(ObjectId,name,inst), [],[] )
logging.info(str(("AddMethod:",vname)))
def InitServer(port=4840):
# setup our server
global server,running,PCCobj,DEBUGobj,idx,sub;
server = Server()
server.set_endpoint("opc.tcp://0.0.0.0:{}/".format(port))
idx = server.register_namespace("http://lofar.eu")
# uri = "http://examples.freeopcua.github.io"
# idx = server.register_namespace(uri)
objects = server.nodes.objects #server.get_objects_node()
# populating our address space
PCCobj = objects #.add_object(idx, "PCC")
DEBUGobj = PCCobj.add_object(idx, "DEBUG")
# self.PCCobj=PCCobj
# starting!
logging.info("Start server");
server.start()
handler = SubHandler()
sub = server.create_subscription(50, handler)
running=False;
logging.info("Add variables:")
#P1.GetVarNames("",AddVar);
# time.sleep(1)
# running=True
# return Vars_R,Vars_W
return handler
#exit()
def start():
global running
running=True
#print("Add modes:")
#P1.GetMethodNames("",AddMethod);
#RCU.changemode(0)
#RCU.Setvar2('HBA_PwrX0',16*(1,))
#RCU.Setvar2('HBA_LED0',16*(1,))
#RCU.Setvar2('HBA_PwrX0',16*(0,))
#RCU.Setvar2('HBA_LED0',16*(0,))
#RCU.Setvar2('Amp_Gain0',[11])
#RCU.Setvar2('Amp_Gain1',[10])
#RCU.Setvar2('Power_Ant0',[1])
#RCU.Setvar2('Power_Ant1',[1])
#RCU.Setvar2('Power_Ant0',[0])
#RCU.Setvar2('Power_Ant1',[0])
#print(RCU.Getvar2('Amp_Gain0'))
#print(RCU.Getvar2('ID'))
#print(RCU.Getvar2('ADC_lock1'))
#print(RCU.Getvar2('HBA_DelayX1',element=1))