Skip to content
Snippets Groups Projects
Select Git revision
  • 09eef0cc70960e64a23933afc2696d36ccee3492
  • master default protected
  • L2SDP-1134
  • L2SDP-1137
  • L2SDP-LIFT
  • L2SDP-1113
  • HPR-158
7 results

tox.ini

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    opcuaserv.py 3.89 KiB
    
    
    import sys
    sys.path.insert(0, "..")
    import time
    from opcua import ua, Server
    from datetime import datetime;
    import logging
    #import Vars
    #import HWconf
    #from pcctypes import *
    
    Vars_R={}
    Vars_W={}
    running=False
    
    class SubHandler(object):
        """
        Subscription Handler. To receive events from server for a subscription
        """
        def datachange_notification(self, node, val, data):
    #        print("Python: New data change event", node, val,data)
            if not(running): return
            vname,myvar,v,reader=Vars_W[node.nodeid.Identifier]
    #        val=(val if isinstance(val, list) else [val] )
            logging.info(str(("Datachange callback",vname,val)))
    #        myvar2.Value.Value=val
    #        myvar2.SourceTimestamp = datetime.utcnow()
            reader.setvar(v,val)
    
    #        Inst=Instr(DevType.Var,VarD,len(val),val)
    #        Q1.put(Inst)
     #       P1.SetVarValue(vname,val)
            #readback
    #        if True:
    #        print(Vars_R,Vars_R.values())
    #        for vname2,myvar2,oldvalue in Vars_R.values():
    #            if vname2==vname:
    #              res=P1.GetVarValue(vname,val)
    #              print("Read callback",vname,": Result:",res,oldvalue)
    #              if res:
    
        def event_notification(self, event):
            logging.info(str(("Python: New event", event)))
    
    
    def CallMethod(ObjectID,name,Inst1,Q1):
            logging.info(str(("Callmethod",ObjectID,name)))
    #        Inst1=Vars.Instr(Vars.DevType.Instr,instrs,0,[])
            Q1.callMethod(Inst1)
    
    #        P1.CallMethod(name,None)
    
    def AddVar(name,value):
        myvar2 = PCCobj.add_variable(idx, name, value)
        myvar2.set_writable()
        return myvar2
    
    def AddVarR(vname,varvalue2,v,debug):
        obj=(DEBUGobj if debug else PCCobj) 
        myvar = obj.add_variable(idx, vname, varvalue2)
        logging.info(str(("Variable added: ",vname,len(varvalue2))))
        Vars_R[myvar.nodeid.Identifier]=[vname,myvar.get_data_value(),varvalue2,v]
        return myvar
    
    def AddVarW(vname,varvalue2,v,Q1,debug):
        logging.info(str(("Variable added: ",vname)))#,'=',varvalue2)
        obj=(DEBUGobj if debug else PCCobj) 
        myvar2 = obj.add_variable(idx, vname, varvalue2)
        myvar2.set_writable()
        if v:
            Vars_W[myvar2.nodeid.Identifier]=[vname,myvar2.get_data_value(),v,Q1]
            handle = sub.subscribe_data_change(myvar2)
        return myvar2
    
    def Addmethod(vname,v,Q1,debug):
        obj=(DEBUGobj if debug else PCCobj) 
        myvar = obj.add_method(idx, vname, lambda ObjectId,name=vname,inst=v,Q1=Q1 : CallMethod(ObjectId,name,inst,Q1), [],[] )
        logging.info(str(("AddMethod:",vname)))
    
    def InitServer(port=4840):
    
    # setup our server
        global server,running,PCCobj,DEBUGobj,idx,sub;
        server = Server()
        server.set_endpoint("opc.tcp://0.0.0.0:{}/PCC/".format(port))
    
        idx = server.register_namespace("http://lofar.eu")
    #    uri = "http://examples.freeopcua.github.io"
    #    idx = server.register_namespace(uri)
    
        objects = server.get_objects_node()
    
        # populating our address space
        PCCobj = objects.add_object(idx, "PCC")
        DEBUGobj = PCCobj.add_object(idx, "DEBUG")
    #    self.PCCobj=PCCobj
        
        # starting!
        logging.info("Start server");
        server.start()
        handler = SubHandler()
        sub = server.create_subscription(500, handler)
        running=False;
        logging.info("Add variables:")
    #P1.GetVarNames("",AddVar);
    
    #    time.sleep(1)
    #    running=True
    #    return Vars_R,Vars_W
        return
    #exit()
    
    def start():
        global running
        running=True
    
    #print("Add modes:")
    #P1.GetMethodNames("",AddMethod);
    
    
    
    
    
    
    
    
    #RCU.changemode(0)
    
    
    #RCU.Setvar2('HBA_PwrX0',16*(1,))
    #RCU.Setvar2('HBA_LED0',16*(1,))
    #RCU.Setvar2('HBA_PwrX0',16*(0,))
    #RCU.Setvar2('HBA_LED0',16*(0,))
    
    
    #RCU.Setvar2('Amp_Gain0',[11])
    #RCU.Setvar2('Amp_Gain1',[10])
    #RCU.Setvar2('Power_Ant0',[1])
    #RCU.Setvar2('Power_Ant1',[1])
    #RCU.Setvar2('Power_Ant0',[0])
    #RCU.Setvar2('Power_Ant1',[0])
    
    
    #print(RCU.Getvar2('Amp_Gain0'))
    
    
    #print(RCU.Getvar2('ID'))
    
    
    #print(RCU.Getvar2('ADC_lock1'))
    
    
    #print(RCU.Getvar2('HBA_DelayX1',element=1))