Skip to content
Snippets Groups Projects
Select Git revision
  • 1c0f217a98018759ec33e8ef01b84ab2b02018bb
  • master default protected
  • L2SDP-1134
  • L2SDP-LIFT
  • L2SDP-1137
  • L2SDP-1113
  • HPR-158
7 results

tech_complex_mult.vhd

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    opcuaserv.py 5.62 KiB
    
    
    import sys
    sys.path.insert(0, "..")
    import time
    from opcua import ua, Server
    from datetime import datetime;
    import argparse
    
    parser = argparse.ArgumentParser()
    parser.add_argument("-s", "--simulator", help="Do not connect to I2c, but simulate behaviour.", action="store_true")
    parser.add_argument("--no-lib-hack", help="Do not require a hacked opcua library. Breaks behaviour.", action="store_true")
    parser.add_argument("-p", "--port", help="Port number to listen on [%(default)s].", type=int, default=4842)
    args = parser.parse_args()
    
    if args.simulator:
        import pypcc_test as pypcc
    else:
        import pypcc
    
    P1=pypcc.pypcc("LTS_pypcc.yaml")
    
    if True:
    # setup our server
        server = Server()
        server.set_endpoint("opc.tcp://0.0.0.0:{}/PCC/".format(args.port))
    
        idx = server.register_namespace("http://lofar.eu")
    #    uri = "http://examples.freeopcua.github.io"
    #    idx = server.register_namespace(uri)
    
        objects = server.get_objects_node()
    
        # populating our address space
        PCCobj = objects.add_object(idx, "PCC")
        
        #    myvar = myobj.add_variable(idx, "MyVariable", 6.7)
    #    myvar.set_writable()    # Set MyVariable to be writable by clients
    
        # starting!
    
    #Vars={}
    #Vars[1]='123'
    #print(Vars)
    #exit()
    def ValCallback(nodeid,force=False):
        vname,myvar,oldvalue=Vars_R[nodeid.Identifier]
    #    print("Value callback",vname)
    #    vname=vname[vname.find('_')+1:]
    #    print("RCU variable:",vname,oldvalue)
    #    X=RCU1.Getvar2(vname)
    #    if not(running): 
        if False: 
            myvar.Value.Value=(oldvalue[0] if len(oldvalue)==1 else oldvalue)
            myvar.SourceTimestamp = datetime.utcnow()
            return myvar
        timenow=datetime.utcnow()
        timediff=(timenow-myvar.SourceTimestamp).total_seconds()
    #    print(timediff)
        if not(force) and timediff<90: return myvar;
        res=P1.GetVarValue(vname,oldvalue)
        print("Read callback",vname,": Result:",res,oldvalue)
        if res:
            myvar.Value.Value=(oldvalue[0] if len(oldvalue)==1 else oldvalue)
            myvar.SourceTimestamp = datetime.utcnow()
    #        Vars_R[nodeid.Identifier][3]=oldvalue
        return myvar
    
    
    
    class SubHandler(object):
        """
        Subscription Handler. To receive events from server for a subscription
        """
    
        def datachange_notification(self, node, val, data):
    #        print("Python: New data change event", node, val,data)
            if not(running): return
            vname,myvar=Vars_W[node.nodeid.Identifier]
            val=(val if isinstance(val, list) else [val] )
            print("Write callback",vname,val)
            P1.SetVarValue(vname,val)
            #readback
    #        if True:
    #        print(Vars_R,Vars_R.values())
            for vname2,myvar2,oldvalue in Vars_R.values():
                if vname2==vname:
                  if args.simulator:
                    res=True
                    print("Simulating fallthrough _RW->_R for",vname,": Result:",res,oldvalue)
                  else:
                    res=P1.GetVarValue(vname,val)
                    print("Read callback",vname,": Result:",res,oldvalue)
                  if res:
                    myvar2.Value.Value=(val[0] if len(val)==1 else val)
                    myvar2.SourceTimestamp = datetime.utcnow()
    
    
        def event_notification(self, event):
            print("Python: New event", event)
    
    
    def CallMethod(ObjectID,name):
            print("Callmethod",name)
            P1.CallMethod(name,None)
    
    
    
    #P1.RunMethod("RCU_on",None)
    #P1.SetVarValue("Band1",3)
    Vars_R={}
    Vars_W={}
    def AddVar(name,dtype=0,RW=0,cnt=1):
       if dtype==1:
              varvalue2=([0.0] if cnt<=1 else cnt*[0.0])
       else:
              varvalue2=([0] if cnt<=1 else cnt*[0])
       if RW in [1,3]:
            vname=name+"_R";
            if dtype==1:
                myvar = (PCCobj.add_variable(idx, vname, 0.0) if cnt<=1 else PCCobj.add_variable(idx, vname, cnt*[0.0]))
            else:
                myvar = (PCCobj.add_variable(idx, vname, 0)   if cnt<=1 else PCCobj.add_variable(idx, vname, cnt*[0]))
            print("Variable added: ",vname)
            Vars_R[myvar.nodeid.Identifier]=[name,myvar.get_data_value(),varvalue2]
            ValCallback(myvar.nodeid,force=True)
    #        print(myvar.get_value())
            if not args.no_lib_hack:
                server.set_attribute_callback(myvar.nodeid, ValCallback)
    #        varvalue=myvar.get_data_value().Value
    #        Vars_R[myvar.nodeid.Identifier][2]=varvalue
    #        print(varvalue2,varvalue)
    
       if RW in [2,3]:
            vname=name+"_RW";
            print("Variable added: ",vname)#,'=',varvalue2)
            myvar2 = PCCobj.add_variable(idx, vname, (varvalue2[0] if len(varvalue2)==1 else varvalue2))
            myvar2.set_writable()
            Vars_W[myvar2.nodeid.Identifier]=[name,myvar2.get_data_value()]
            handle = sub.subscribe_data_change(myvar2)
    
    
    def AddMethod(name):
            vname=name;
            myvar = PCCobj.add_method(idx, vname, lambda ObjectId : CallMethod(ObjectId,name), [],[] )
            print("AddMethod:",vname)
    
    print("Start server");
    server.start()
    handler = SubHandler()
    sub = server.create_subscription(500, handler)
    running=False;
    print("Add variables:")
    P1.GetVarNames("",AddVar);
    
    print("Add modes:")
    P1.GetMethodNames("",AddMethod);
    
    time.sleep(1)
    running=True;
    
    print("Server started")   
        
    try:
        while True:
            time.sleep(1)
    finally:
            print("Stop server");
            server.stop()
    
    
    
    
    
    
    #RCU.changemode(0)
    
    
    #RCU.Setvar2('HBA_PwrX0',16*(1,))
    #RCU.Setvar2('HBA_LED0',16*(1,))
    #RCU.Setvar2('HBA_PwrX0',16*(0,))
    #RCU.Setvar2('HBA_LED0',16*(0,))
    
    
    #RCU.Setvar2('Amp_Gain0',[11])
    #RCU.Setvar2('Amp_Gain1',[10])
    #RCU.Setvar2('Power_Ant0',[1])
    #RCU.Setvar2('Power_Ant1',[1])
    #RCU.Setvar2('Power_Ant0',[0])
    #RCU.Setvar2('Power_Ant1',[0])
    
    
    #print(RCU.Getvar2('Amp_Gain0'))
    
    
    #print(RCU.Getvar2('ID'))
    
    
    #print(RCU.Getvar2('ADC_lock1'))
    
    
    #print(RCU.Getvar2('HBA_DelayX1',element=1))