Address="opc.tcp://localhost:4840/" #Address="opc.tcp://LTSpi.astron.nl:4842/" import sys sys.path.insert(0, "..") import logging import time from opcua import Client from opcua import ua #import numpy as np def connect(Address=Address): global client,root # logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.WARN) # client = Client("opc.tcp://localhost:4840/freeopcua/server/") client = Client(Address) client.connect() client.load_type_definitions() # load definition of server specific structures/extension objects root = client.get_root_node() return root #root=connect() def disconnect(): client.disconnect() def get_value(name): var1 = root.get_child(["0:Objects", "2:"+name]) return var1.get_value() def get_value_type(name): var1 = root.get_child(["0:Objects", "2:"+name]) print(var1) return var1.get_value(),var1.get_data_value().Value.VariantType def set_value(name,value,tp=None): var1 = root.get_child(["0:Objects", "2:"+name]) if tp is None: var1.set_value(value) else: var1.set_value(value,tp) def get_all_variables(): vars0=[] obj0=root.get_child(["0:Objects"]) for nodeid in obj0.get_children(): ch = client.get_node(nodeid) name=ch.get_display_name().Text if ch.get_node_class()==ua.NodeClass.Variable: vars0.append(name) return vars0 def get_debug_value(name): var1 = root.get_child(["0:Objects", "2:DEBUG", "2:"+name]) return var1.get_value() def set_debug_value(name,value): var1 = root.get_child(["0:Objects", "2:DEBUG", "2:"+name]) var1.set_value(value) def setRCUmask(rcu=[]): name="RCU_mask_RW" M=get_value(name) # print(name," old:",M) M=[False for m in M] for r in rcu: M[r]=True set_value(name,M) # print(name," new:",get_value(name)) def setAntmask(rcu=[],ant=[True,True,True]): name="ANT_mask_RW" M=get_value(name) # print(name," old:",M) for i,j in enumerate(M): M[i]=False for r in rcu: for i in range(3): M[r*3+i]=ant[i] set_value(name,M) # print(name," new:",get_value(name)) def callmethod(name): try: obj = root.get_child(["0:Objects"])# return obj.call_method("2:"+name) except: # print("error") return None def call_debug_method(name): try: obj = root.get_child(["0:Objects","2:DEBUG"])# return obj.call_method("2:"+name) except: print("error") return None def wait_not_busy(var1,timeout_sec=1): for x in range(int(timeout_sec*10)): busy=get_value(var1) if not(busy): # print("Wait time=%f s" %(x/10)) return True time.sleep(0.1) print("Timeout waiting for translator:",var1) return False