Skip to content
Snippets Groups Projects
Select Git revision
  • 241164b0176a86ae9d998909dcad8ecee3d13a31
  • master default protected
  • L2SS-1914-fix_job_dispatch
  • TMSS-3170
  • TMSS-3167
  • TMSS-3161
  • TMSS-3158-Front-End-Only-Allow-Changing-Again
  • TMSS-3133
  • TMSS-3319-Fix-Templates
  • test-fix-deploy
  • TMSS-3134
  • TMSS-2872
  • defer-state
  • add-custom-monitoring-points
  • TMSS-3101-Front-End-Only
  • TMSS-984-choices
  • SDC-1400-Front-End-Only
  • TMSS-3079-PII
  • TMSS-2936
  • check-for-max-244-subbands
  • TMSS-2927---Front-End-Only-PXII
  • Before-Remove-TMSS
  • LOFAR-Release-4_4_318 protected
  • LOFAR-Release-4_4_317 protected
  • LOFAR-Release-4_4_316 protected
  • LOFAR-Release-4_4_315 protected
  • LOFAR-Release-4_4_314 protected
  • LOFAR-Release-4_4_313 protected
  • LOFAR-Release-4_4_312 protected
  • LOFAR-Release-4_4_311 protected
  • LOFAR-Release-4_4_310 protected
  • LOFAR-Release-4_4_309 protected
  • LOFAR-Release-4_4_308 protected
  • LOFAR-Release-4_4_307 protected
  • LOFAR-Release-4_4_306 protected
  • LOFAR-Release-4_4_304 protected
  • LOFAR-Release-4_4_303 protected
  • LOFAR-Release-4_4_302 protected
  • LOFAR-Release-4_4_301 protected
  • LOFAR-Release-4_4_300 protected
  • LOFAR-Release-4_4_299 protected
41 results

GCF_Task.cc

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    opcuaserv.py 3.89 KiB
    
    
    import sys
    sys.path.insert(0, "..")
    import time
    from opcua import ua, Server
    from datetime import datetime;
    import logging
    #import Vars
    #import HWconf
    #from pcctypes import *
    
    Vars_R={}
    Vars_W={}
    running=False
    
    class SubHandler(object):
        """
        Subscription Handler. To receive events from server for a subscription
        """
        def datachange_notification(self, node, val, data):
    #        print("Python: New data change event", node, val,data)
            if not(running): return
            vname,myvar,v,reader=Vars_W[node.nodeid.Identifier]
    #        val=(val if isinstance(val, list) else [val] )
            logging.info(str(("Datachange callback",vname,val)))
    #        myvar2.Value.Value=val
    #        myvar2.SourceTimestamp = datetime.utcnow()
            reader.setvar(v,val)
    
    #        Inst=Instr(DevType.Var,VarD,len(val),val)
    #        Q1.put(Inst)
     #       P1.SetVarValue(vname,val)
            #readback
    #        if True:
    #        print(Vars_R,Vars_R.values())
    #        for vname2,myvar2,oldvalue in Vars_R.values():
    #            if vname2==vname:
    #              res=P1.GetVarValue(vname,val)
    #              print("Read callback",vname,": Result:",res,oldvalue)
    #              if res:
    
        def event_notification(self, event):
            logging.info(str(("Python: New event", event)))
    
    
    def CallMethod(ObjectID,name,Inst1,Q1):
            logging.info(str(("Callmethod",ObjectID,name)))
    #        Inst1=Vars.Instr(Vars.DevType.Instr,instrs,0,[])
            Q1.callMethod(Inst1)
    
    #        P1.CallMethod(name,None)
    
    def AddVar(name,value):
        myvar2 = PCCobj.add_variable(idx, name, value)
        myvar2.set_writable()
        return myvar2
    
    def AddVarR(vname,varvalue2,v,debug):
        obj=(DEBUGobj if debug else PCCobj) 
        myvar = obj.add_variable(idx, vname, varvalue2)
        logging.info(str(("Variable added: ",vname,len(varvalue2))))
        Vars_R[myvar.nodeid.Identifier]=[vname,myvar.get_data_value(),varvalue2,v]
        return myvar
    
    def AddVarW(vname,varvalue2,v,Q1,debug):
        logging.info(str(("Variable added: ",vname)))#,'=',varvalue2)
        obj=(DEBUGobj if debug else PCCobj) 
        myvar2 = obj.add_variable(idx, vname, varvalue2)
        myvar2.set_writable()
        if v:
            Vars_W[myvar2.nodeid.Identifier]=[vname,myvar2.get_data_value(),v,Q1]
            handle = sub.subscribe_data_change(myvar2)
        return myvar2
    
    def Addmethod(vname,v,Q1,debug):
        obj=(DEBUGobj if debug else PCCobj) 
        myvar = obj.add_method(idx, vname, lambda ObjectId,name=vname,inst=v,Q1=Q1 : CallMethod(ObjectId,name,inst,Q1), [],[] )
        logging.info(str(("AddMethod:",vname)))
    
    def InitServer(port=4840):
    
    # setup our server
        global server,running,PCCobj,DEBUGobj,idx,sub;
        server = Server()
        server.set_endpoint("opc.tcp://0.0.0.0:{}/PCC/".format(port))
    
        idx = server.register_namespace("http://lofar.eu")
    #    uri = "http://examples.freeopcua.github.io"
    #    idx = server.register_namespace(uri)
    
        objects = server.get_objects_node()
    
        # populating our address space
        PCCobj = objects.add_object(idx, "PCC")
        DEBUGobj = PCCobj.add_object(idx, "DEBUG")
    #    self.PCCobj=PCCobj
        
        # starting!
        logging.info("Start server");
        server.start()
        handler = SubHandler()
        sub = server.create_subscription(500, handler)
        running=False;
        logging.info("Add variables:")
    #P1.GetVarNames("",AddVar);
    
    #    time.sleep(1)
    #    running=True
    #    return Vars_R,Vars_W
        return
    #exit()
    
    def start():
        global running
        running=True
    
    #print("Add modes:")
    #P1.GetMethodNames("",AddMethod);
    
    
    
    
    
    
    
    
    #RCU.changemode(0)
    
    
    #RCU.Setvar2('HBA_PwrX0',16*(1,))
    #RCU.Setvar2('HBA_LED0',16*(1,))
    #RCU.Setvar2('HBA_PwrX0',16*(0,))
    #RCU.Setvar2('HBA_LED0',16*(0,))
    
    
    #RCU.Setvar2('Amp_Gain0',[11])
    #RCU.Setvar2('Amp_Gain1',[10])
    #RCU.Setvar2('Power_Ant0',[1])
    #RCU.Setvar2('Power_Ant1',[1])
    #RCU.Setvar2('Power_Ant0',[0])
    #RCU.Setvar2('Power_Ant1',[0])
    
    
    #print(RCU.Getvar2('Amp_Gain0'))
    
    
    #print(RCU.Getvar2('ID'))
    
    
    #print(RCU.Getvar2('ADC_lock1'))
    
    
    #print(RCU.Getvar2('HBA_DelayX1',element=1))