Skip to content
Snippets Groups Projects
Select Git revision
  • 2d711491fb6a7f1cc5d3702463bafead7b223e01
  • MCCS-163 default
  • main
  • sar-277-update-docs-with-examples-for-lrc
  • st-946-automate
  • sar_302-log-fix
  • sar-287_subarray_commands_to_lrc
  • sar_302-POC_await_sub_device_state
  • sat_302_fix_pipelines
  • sar-286_lrc_one_subarry_command
  • sar-286_lrc_improvements
  • sar-288-async-controller
  • sar-276-combine-tango-queue
  • sar-255_remove_nexus_reference
  • sar-275-add-LRC
  • sar-273-add-lrc-attributes
  • sar-272
  • sp-1106-marvin-1230525148-ska-tango-base
  • sp-1106-marvin-813091765-ska-tango-base
  • sar-255/Publish-package-to-CAR
  • mccs-661-device-under-test-fixture
  • mccs-659-pep257-docstring-linting
  • 0.11.3
  • 0.11.2
  • 0.11.1
  • 0.11.0
  • 0.10.1
  • 0.10.0
  • 0.9.1
  • 0.9.0
  • 0.8.1
  • 0.8.0
  • 0.7.2
  • 0.7.1
  • 0.7.0
  • 0.6.6
  • 0.6.5
  • 0.6.4
  • 0.6.3
  • 0.6.2
  • 0.6.1
  • 0.6.0
42 results

task_queue_manager.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    opcuaserv.py 3.63 KiB
    
    
    import sys
    sys.path.insert(0, "..")
    import time
    from opcua import ua, Server
    from datetime import datetime;
    import logging
    #import Vars
    #import HWconf
    
    Vars_R={}
    Vars_W={}
    running=False
    
    class SubHandler(object):
        """
        Subscription Handler. To receive events from server for a subscription
        """
        def datachange_notification(self, node, val, data):
    #        print("Python: New data change event", node, val,data)
            if not(running): return
            vname,myvar,VarD,Q1=Vars_W[node.nodeid.Identifier]
    #        val=(val if isinstance(val, list) else [val] )
            logging.info(str(("Datachange callback",vname,val)))
    #        myvar2.Value.Value=val
    #        myvar2.SourceTimestamp = datetime.utcnow()
    
            Inst=Vars.Instr(Vars.DevType.Var,VarD,len(val),val)
            Q1.put(Inst)
     #       P1.SetVarValue(vname,val)
            #readback
    #        if True:
    #        print(Vars_R,Vars_R.values())
    #        for vname2,myvar2,oldvalue in Vars_R.values():
    #            if vname2==vname:
    #              res=P1.GetVarValue(vname,val)
    #              print("Read callback",vname,": Result:",res,oldvalue)
    #              if res:
    
        def event_notification(self, event):
            logging.info(str(("Python: New event", event)))
    
    
    def CallMethod(ObjectID,name,Inst1,Q1):
            logging.info(str(("Callmethod",ObjectID,name)))
    #        Inst1=Vars.Instr(Vars.DevType.Instr,instrs,0,[])
            Q1.put(Inst1)
    
    #        P1.CallMethod(name,None)
    
    def AddVar(name,value):
        myvar2 = PCCobj.add_variable(idx, name, value)
        myvar2.set_writable()
        return myvar2
    
    def AddVarR(vname,varvalue2,v):
        myvar = PCCobj.add_variable(idx, vname, varvalue2)
        logging.info(str(("Variable added: ",vname,len(varvalue2))))
        Vars_R[myvar.nodeid.Identifier]=[v.name,myvar.get_data_value(),varvalue2,v]
        return myvar
    
    def AddVarW(vname,varvalue2,v,Q1):
        logging.info(str(("Variable added: ",vname)))#,'=',varvalue2)
        myvar2 = PCCobj.add_variable(idx, vname, varvalue2)
        myvar2.set_writable()
        Vars_W[myvar2.nodeid.Identifier]=[v.name,myvar2.get_data_value(),v,Q1]
        handle = sub.subscribe_data_change(myvar2)
        return myvar2
    
    
    def Addmethod(vname,v,Q1):
        myvar = PCCobj.add_method(idx, vname, lambda ObjectId,name=vname,inst=v,Q1=Q1 : CallMethod(ObjectId,name,inst,Q1), [],[] )
        logging.info(str(("AddMethod:",vname)))
    
    def InitServer(port=4840):
    
    # setup our server
        global server,running,PCCobj,idx,sub;
        server = Server()
        server.set_endpoint("opc.tcp://0.0.0.0:{}/PCC/".format(port))
    
        idx = server.register_namespace("http://lofar.eu")
    #    uri = "http://examples.freeopcua.github.io"
    #    idx = server.register_namespace(uri)
    
        objects = server.get_objects_node()
    
        # populating our address space
        PCCobj = objects.add_object(idx, "PCC")
    #    self.PCCobj=PCCobj
        
        # starting!
        logging.info("Start server");
        server.start()
        handler = SubHandler()
        sub = server.create_subscription(500, handler)
        running=False;
        logging.info("Add variables:")
    #P1.GetVarNames("",AddVar);
    
    #    time.sleep(1)
    #    running=True
    #    return Vars_R,Vars_W
        return
    #exit()
    
    def start():
        running=True
    
    #print("Add modes:")
    #P1.GetMethodNames("",AddMethod);
    
    
    
    
    
    
    
    
    #RCU.changemode(0)
    
    
    #RCU.Setvar2('HBA_PwrX0',16*(1,))
    #RCU.Setvar2('HBA_LED0',16*(1,))
    #RCU.Setvar2('HBA_PwrX0',16*(0,))
    #RCU.Setvar2('HBA_LED0',16*(0,))
    
    
    #RCU.Setvar2('Amp_Gain0',[11])
    #RCU.Setvar2('Amp_Gain1',[10])
    #RCU.Setvar2('Power_Ant0',[1])
    #RCU.Setvar2('Power_Ant1',[1])
    #RCU.Setvar2('Power_Ant0',[0])
    #RCU.Setvar2('Power_Ant1',[0])
    
    
    #print(RCU.Getvar2('Amp_Gain0'))
    
    
    #print(RCU.Getvar2('ID'))
    
    
    #print(RCU.Getvar2('ADC_lock1'))
    
    
    #print(RCU.Getvar2('HBA_DelayX1',element=1))