Select Git revision
pre-commit.sh
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
MnC_client.py 5.87 KiB
#! /usr/bin/env python3
import signal
import sys
import time
import datetime
import logging
host = "localhost"
port = 62000
sys.path.insert(0, "..")
try:
from IPython import embed
except ImportError:
import code
def embed():
vars = globals()
vars.update(locals())
shell = code.InteractiveConsole(vars)
shell.interact()
from opcua import Client
from opcua import ua
class SubHandler(object):
"""
Subscription Handler. To receive events from server for a subscription
data_change and event methods are called directly from receiving thread.
Do not do expensive, slow or network operation there. Create another
thread if you need to do such a thing
"""
def event_notification(self, event):
print("New event received: ", event)
def datachange_notification(self, node, val, data):
print("Python: New data change event", node, val)
def event_notification(self, event):
print("Python: New event", event)
''' This function unnececarily checks what chidren a node has and what name the node has.
Only neccecary once, then just read the sensor values only
'''
def explore(node, depth, node_cnt):
children = node.get_children() # in the future, read once, store and use it to read the nodes
node_cnt = node_cnt + len(children)
for i in range(len(children)):
browse_name = children[i].get_browse_name()
try:
value = children[i].get_value()
value += 1
except:
print("", end='')
node_cnt = explore(children[i], depth + 1, node_cnt)
return node_cnt
def explore_slim(node, depth, node_cnt):
children = node.get_children() # in the future, read once, store and use it to rad the nodes
node_cnt = node_cnt + len(children)
for i in range(len(children)):
try:
value = children[i].get_value()
value += 1
except:
value = 0
node_cnt = explore_slim(children[i], depth + 1, node_cnt)
return node_cnt
def explore_verbose(node, depth, node_cnt):
children = node.get_children() # in the future, read once, store and use it to rad the nodes
# print("node has", len(children), "children")
node_cnt = node_cnt + len(children)
for i in range(len(children)):
for j in range(depth):
print("\t", end='')
browse_name = children[i].get_browse_name()
print("node ", i, ": ", browse_name, end='')
try:
for j in range(depth):
print("\t", end='')
value = children[i].get_value()
print("has a value of: ", value)
except:
print("")
node_cnt = explore_verbose(children[i], depth + 1, node_cnt)
return node_cnt
if __name__ == "__main__":
logging.basicConfig(level=logging.WARNING) #INFO ERROR WARNING NOTSET DEBUG
# logger = logging.getLogger("KeepAlive")
# logger.setLevel(logging.DEBUG)
client = Client("opc.tcp://{}:{}/".format(host, port))
# I hope this is secure, because I have zero clue about security
# client.set_security_string("Basic256Sha256,SignAndEncrypt,certificate-example.der,private-key-example.pem")
while 1:
try:
client.connect()
print("I'm in")
while 1:
print("")
print("=============================")
root = client.get_root_node()
print("Root node is: ", root)
print("name of root is: ", root.get_display_name())
# get the objects nodes NodeID
Object = client.get_objects_node()
print("\tObject node: ", Object)
print("\tname of object node is: ", Object.get_browse_name())
children = Object.get_children()
for i in range(len(children)):
print("\tnode ", i, ": ", children[i].get_browse_name())
I2C_unit = children[2]
status_nodes = children[3]
total_cnt = 0
chunk_cnt = 1
start_time = time.time()
result = Object.call_method(children[4], ua.Variant(5, ua.VariantType.Byte), ua.Variant(6, ua.VariantType.Byte), ua.Variant(7, ua.VariantType.UInt16))
while True:
nodes = 0
#print("node ", 0, ": ", children[3].get_browse_name())
nodes = explore\
(I2C_unit, 1, nodes)
total_cnt += nodes
print("time: ", datetime.datetime.now(), end='')
print("\t runtime:", datetime.datetime.fromtimestamp(time.time()-start_time-3600), end='')
print("\tjust went through: ", nodes, "\ttotal node count: ", total_cnt)
nodes = 0
nodes = explore_verbose(status_nodes, 1, nodes)
nodes = 0
print("")
I2C_children = I2C_unit.get_children()
nof_subnodes = len(I2C_children)
nodes = explore_verbose(I2C_children[nof_subnodes-1], 1, nodes)
# there seems to be an issue after reading the same thing for a certain time. this solves it
if total_cnt > (10000 * chunk_cnt):
client.disconnect()
time.sleep(1)
client.connect()
chunk_cnt += 1
time.sleep(0.01)
# sub.unsubscribe(handle)
# sub.delete()
except KeyboardInterrupt:
print(" user hit ctrl-c")
client.disconnect()
clientRunning = False
except:
client.disconnect()
print("err")
time.sleep(1)
client.connect()
finally:
print(" disconnect")
client.disconnect()