Select Git revision
task_queue_manager.py
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
opcuaserv.py 3.63 KiB
import sys
sys.path.insert(0, "..")
import time
from opcua import ua, Server
from datetime import datetime;
import logging
#import Vars
#import HWconf
Vars_R={}
Vars_W={}
running=False
class SubHandler(object):
"""
Subscription Handler. To receive events from server for a subscription
"""
def datachange_notification(self, node, val, data):
# print("Python: New data change event", node, val,data)
if not(running): return
vname,myvar,VarD,Q1=Vars_W[node.nodeid.Identifier]
# val=(val if isinstance(val, list) else [val] )
logging.info(str(("Datachange callback",vname,val)))
# myvar2.Value.Value=val
# myvar2.SourceTimestamp = datetime.utcnow()
Inst=Vars.Instr(Vars.DevType.Var,VarD,len(val),val)
Q1.put(Inst)
# P1.SetVarValue(vname,val)
#readback
# if True:
# print(Vars_R,Vars_R.values())
# for vname2,myvar2,oldvalue in Vars_R.values():
# if vname2==vname:
# res=P1.GetVarValue(vname,val)
# print("Read callback",vname,": Result:",res,oldvalue)
# if res:
def event_notification(self, event):
logging.info(str(("Python: New event", event)))
def CallMethod(ObjectID,name,Inst1,Q1):
logging.info(str(("Callmethod",ObjectID,name)))
# Inst1=Vars.Instr(Vars.DevType.Instr,instrs,0,[])
Q1.put(Inst1)
# P1.CallMethod(name,None)
def AddVar(name,value):
myvar2 = PCCobj.add_variable(idx, name, value)
myvar2.set_writable()
return myvar2
def AddVarR(vname,varvalue2,v):
myvar = PCCobj.add_variable(idx, vname, varvalue2)
logging.info(str(("Variable added: ",vname,len(varvalue2))))
Vars_R[myvar.nodeid.Identifier]=[v.name,myvar.get_data_value(),varvalue2,v]
return myvar
def AddVarW(vname,varvalue2,v,Q1):
logging.info(str(("Variable added: ",vname)))#,'=',varvalue2)
myvar2 = PCCobj.add_variable(idx, vname, varvalue2)
myvar2.set_writable()
Vars_W[myvar2.nodeid.Identifier]=[v.name,myvar2.get_data_value(),v,Q1]
handle = sub.subscribe_data_change(myvar2)
return myvar2
def Addmethod(vname,v,Q1):
myvar = PCCobj.add_method(idx, vname, lambda ObjectId,name=vname,inst=v,Q1=Q1 : CallMethod(ObjectId,name,inst,Q1), [],[] )
logging.info(str(("AddMethod:",vname)))
def InitServer(port=4840):
# setup our server
global server,running,PCCobj,idx,sub;
server = Server()
server.set_endpoint("opc.tcp://0.0.0.0:{}/PCC/".format(port))
idx = server.register_namespace("http://lofar.eu")
# uri = "http://examples.freeopcua.github.io"
# idx = server.register_namespace(uri)
objects = server.get_objects_node()
# populating our address space
PCCobj = objects.add_object(idx, "PCC")
# self.PCCobj=PCCobj
# starting!
logging.info("Start server");
server.start()
handler = SubHandler()
sub = server.create_subscription(500, handler)
running=False;
logging.info("Add variables:")
#P1.GetVarNames("",AddVar);
# time.sleep(1)
# running=True
# return Vars_R,Vars_W
return
#exit()
def start():
running=True
#print("Add modes:")
#P1.GetMethodNames("",AddMethod);
#RCU.changemode(0)
#RCU.Setvar2('HBA_PwrX0',16*(1,))
#RCU.Setvar2('HBA_LED0',16*(1,))
#RCU.Setvar2('HBA_PwrX0',16*(0,))
#RCU.Setvar2('HBA_LED0',16*(0,))
#RCU.Setvar2('Amp_Gain0',[11])
#RCU.Setvar2('Amp_Gain1',[10])
#RCU.Setvar2('Power_Ant0',[1])
#RCU.Setvar2('Power_Ant1',[1])
#RCU.Setvar2('Power_Ant0',[0])
#RCU.Setvar2('Power_Ant1',[0])
#print(RCU.Getvar2('Amp_Gain0'))
#print(RCU.Getvar2('ID'))
#print(RCU.Getvar2('ADC_lock1'))
#print(RCU.Getvar2('HBA_DelayX1',element=1))