Skip to content
Snippets Groups Projects
Select Git revision
  • bc2df7471aa8da42b3c555a2a7a6157cdc6a7ce6
  • master default protected
  • dither_on_off_disabled
  • yocto
  • pypcc2
  • pypcc3
  • 2020-12-07-the_only_working_copy
  • v2.1
  • v2.0
  • v1.0
  • v0.9
  • Working-RCU_ADC,ID
  • 2020-12-11-Holiday_Season_release
13 results

opcuaserv.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    opcuaserv.py 5.84 KiB
    
    
    import sys
    sys.path.insert(0, "..")
    import time
    #from opcua import ua, Server
    from asyncua.sync import ua, Server
    from datetime import datetime;
    from queue import PriorityQueue
    import logging
    #import Vars
    #import HWconf
    #from pcctypes import *
    
    Vars_R={}
    Vars_W={}
    methodDict={}
    running=False
    
    class SubHandler(object):
        """
        Subscription Handler. To receive events from server for a subscription
        """
        def __init__(self):
            self.datachange_queue=PriorityQueue()
        def check_datachange(self,timeout):
            while True:
                try:
                    _,val,node=self.datachange_queue.get(timeout=timeout)
                except:
                    break
                nodeid=node.nodeid.Identifier
                vname,myvar,v,reader,opcvar=Vars_W[nodeid]
        #        val=(val if isinstance(val, list) else [val] )
                logging.warn(str(("Datachange",vname,val[:16] if isinstance(val,list) else val)))
                node.set_value(ua.DataValue(val, ua.StatusCode(ua.StatusCodes.GoodCompletesAsynchronously)))
                for r in reader:
                    r.setvar(v,val)
    
        def datachange_notification(self, node, val, data):
    #       NOTE: OPC variables can not be updates in the datachange_notification when using asyncua.sync!! So we put them in a queue.
    #        print("Python: New data change event", node, val,data)
            if not(running): return
            sourcetime=data.monitored_item.Value.SourceTimestamp
            logging.info(str(("Datachange callback",node.nodeid.Identifier,None if sourcetime is None else sourcetime.timestamp(),data.monitored_item.Value.StatusCode)))
            if data.monitored_item.Value.StatusCode != ua.StatusCode(ua.StatusCodes.Good): return
    #            logging.warning(str(("Python: New client data change event", node, val, data.monitored_item.Value.StatusCode)))
            self.datachange_queue.put((0 if sourcetime is None else sourcetime.timestamp(),val,node))
    #        myvar2.Value.Value=val
    #        myvar2.SourceTimestamp = datetime.utcnow()
    
    #        Inst=Instr(DevType.Var,VarD,len(val),val)
    #        Q1.put(Inst)
     #       P1.SetVarValue(vname,val)
            #readback
    #        if True:
    #        print(Vars_R,Vars_R.values())
    #        for vname2,myvar2,oldvalue in Vars_R.values():
    #            if vname2==vname:
    #              res=P1.GetVarValue(vname,val)
    #              print("Read callback",vname,": Result:",res,oldvalue)
    #              if res:
    
        def event_notification(self, event):
            logging.info(str(("Python: New event", event)))
    
    
    def CallMethod(ObjectID,name,Inst1):
            logging.info(str(("Callmethod",ObjectID,name)))
    #        Inst1=Vars.Instr(Vars.DevType.Instr,instrs,0,[])
            for Q1 in methodDict[name]:
               Q1.callMethod(Inst1)
    #        P1.CallMethod(name,None)
    
    def AddVar(name,value):
        myvar2 = PCCobj.add_variable(idx, name, value)
        myvar2.set_writable()
        return myvar2
    
    def AddVarR(vname,varvalue2,v,debug):
        for id0,vx in Vars_R.items():
          if vx[0]==vname: 
               logging.info("Duplicate %s "%vname)
    #           Vars_W[id0][3]+=[Q1]
               if vx[3]!=v: logging.error("Duplicate %s with different IDs!"%vname)
               return vx[4]
    #      else: print(vx[0],vname)
        obj=(DEBUGobj if debug else PCCobj) 
        myvar = obj.add_variable(idx, vname, varvalue2)
        logging.info(str(("Variable added: ",vname,(len(varvalue2) if isinstance(varvalue2,list) else ''))))
        Vars_R[myvar.nodeid.Identifier]=[vname,myvar.get_data_value(),varvalue2,v,myvar]
        return myvar
    
    def AddVarW(vname,varvalue2,v,Q1,debug):
        for id0,vx in Vars_W.items():
          if vx[0]==vname: 
               logging.info("Duplicate %s "%vname)
               Vars_W[id0][3]+=[Q1]
               if vx[2]!=v: logging.error("Duplicate %s with different IDs!"%vname)
               return vx[4]
        logging.info(str(("Variable added: ",vname)))#,'=',varvalue2)
        obj=(DEBUGobj if debug else PCCobj) 
        myvar2 = obj.add_variable(idx, vname, varvalue2)
        myvar2.set_writable()
        if not v is None:
            Vars_W[myvar2.nodeid.Identifier]=[vname,myvar2.get_data_value(),v,[Q1],myvar2]
            handle = sub.subscribe_data_change(myvar2)
        return myvar2
    
    def Addmethod(vname,v,Q1,debug):
        obj=(DEBUGobj if debug else PCCobj) 
        try: 
          methodDict[vname]+=[Q1]
          logging.info("Duplicate method %s "%vname)
        except:
          methodDict[vname]=[Q1];
        myvar = obj.add_method(idx, vname, lambda ObjectId,name=vname,inst=v : CallMethod(ObjectId,name,inst), [],[] )
        logging.info(str(("AddMethod:",vname)))
    
    def InitServer(port=4840):
    
    # setup our server
        global server,running,PCCobj,DEBUGobj,idx,sub;
        server = Server()
        server.set_endpoint("opc.tcp://0.0.0.0:{}/".format(port))
    
        idx = server.register_namespace("http://lofar.eu")
    #    uri = "http://examples.freeopcua.github.io"
    #    idx = server.register_namespace(uri)
    
        objects = server.nodes.objects #server.get_objects_node()
    
        # populating our address space
        PCCobj = objects #.add_object(idx, "PCC")
        DEBUGobj = PCCobj.add_object(idx, "DEBUG")
    #    self.PCCobj=PCCobj
        
        # starting!
        logging.info("Start server");
        server.start()
        handler = SubHandler()
        sub = server.create_subscription(50, handler)
        running=False;
        logging.info("Add variables:")
    #P1.GetVarNames("",AddVar);
    
    #    time.sleep(1)
    #    running=True
    #    return Vars_R,Vars_W
        return handler
    #exit()
    
    def start():
        global running
        running=True
    
    #print("Add modes:")
    #P1.GetMethodNames("",AddMethod);
    
    
    
    
    
    
    
    
    #RCU.changemode(0)
    
    
    #RCU.Setvar2('HBA_PwrX0',16*(1,))
    #RCU.Setvar2('HBA_LED0',16*(1,))
    #RCU.Setvar2('HBA_PwrX0',16*(0,))
    #RCU.Setvar2('HBA_LED0',16*(0,))
    
    
    #RCU.Setvar2('Amp_Gain0',[11])
    #RCU.Setvar2('Amp_Gain1',[10])
    #RCU.Setvar2('Power_Ant0',[1])
    #RCU.Setvar2('Power_Ant1',[1])
    #RCU.Setvar2('Power_Ant0',[0])
    #RCU.Setvar2('Power_Ant1',[0])
    
    
    #print(RCU.Getvar2('Amp_Gain0'))
    
    
    #print(RCU.Getvar2('ID'))
    
    
    #print(RCU.Getvar2('ADC_lock1'))
    
    
    #print(RCU.Getvar2('HBA_DelayX1',element=1))