Skip to content
Snippets Groups Projects
Select Git revision
  • 2ceac589e3540050b8a062d9f45c9c6721c81ee7
  • main default protected
  • L2SS-2357-expand-linting-checks
  • L2SS-2302
  • track-call-duration
  • v0.0.10 protected
  • v0.0.9 protected
  • v0.0.8 protected
  • v0.0.7 protected
  • v0.0.6 protected
  • v0.0.5 protected
  • v0.0.4 protected
  • v0.0.3 protected
  • v0.0.2 protected
14 results

pre-commit.sh

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    MnC_client.py 5.87 KiB
    #! /usr/bin/env python3
    
    import signal
    import sys
    import time
    import datetime
    import logging
    
    host = "localhost"
    port = 62000
    
    sys.path.insert(0, "..")
    
    
    try:
        from IPython import embed
    except ImportError:
        import code
    
        def embed():
            vars = globals()
            vars.update(locals())
            shell = code.InteractiveConsole(vars)
            shell.interact()
    
    from opcua import Client
    from opcua import ua
    
    class SubHandler(object):
    
        """
        Subscription Handler. To receive events from server for a subscription
        data_change and event methods are called directly from receiving thread.
        Do not do expensive, slow or network operation there. Create another
        thread if you need to do such a thing
        """
        def event_notification(self, event):
            print("New event received: ", event)
    
    
    def datachange_notification(self, node, val, data):
        print("Python: New data change event", node, val)
    
    
    def event_notification(self, event):
        print("Python: New event", event)
    
    
    ''' This function unnececarily checks what chidren a node has and what name the node has.
        Only neccecary once, then just read the sensor values only
    '''
    
    
    def explore(node, depth, node_cnt):
        children = node.get_children()      # in the future, read once, store and use it to read the nodes
        node_cnt = node_cnt + len(children)
    
        for i in range(len(children)):
            browse_name = children[i].get_browse_name()
            try:
                value = children[i].get_value()
                value += 1
            except:
                print("", end='')
            node_cnt = explore(children[i], depth + 1, node_cnt)
        return node_cnt
    
    
    def explore_slim(node, depth, node_cnt):
        children = node.get_children()      # in the future, read once, store and use it to rad the nodes
        node_cnt = node_cnt + len(children)
    
        for i in range(len(children)):
            try:
                value = children[i].get_value()
                value += 1
            except:
                value = 0
            node_cnt = explore_slim(children[i], depth + 1, node_cnt)
        return node_cnt
    
    
    def explore_verbose(node, depth, node_cnt):
        children = node.get_children()      # in the future, read once, store and use it to rad the nodes
        # print("node has", len(children), "children")
        node_cnt = node_cnt + len(children)
    
        for i in range(len(children)):
            for j in range(depth):
                print("\t", end='')
            browse_name = children[i].get_browse_name()
            print("node ", i, ": ", browse_name, end='')
            try:
                for j in range(depth):
                    print("\t", end='')
                value = children[i].get_value()
                print("has a value of: ", value)
            except:
                print("")
            node_cnt = explore_verbose(children[i], depth + 1, node_cnt)
        return node_cnt
    
    if __name__ == "__main__":
        logging.basicConfig(level=logging.WARNING) #INFO ERROR WARNING NOTSET DEBUG
        # logger = logging.getLogger("KeepAlive")
        # logger.setLevel(logging.DEBUG)
    
        client = Client("opc.tcp://{}:{}/".format(host, port))
        # I hope this is secure, because I have zero clue about security
        # client.set_security_string("Basic256Sha256,SignAndEncrypt,certificate-example.der,private-key-example.pem")
    
    
    
        while 1:
            try:
                client.connect()
                print("I'm in")
    
                while 1:
                    print("")
                    print("=============================")
    
                    root = client.get_root_node()
                    print("Root node is: ", root)
                    print("name of root is: ", root.get_display_name())
    
                    # get the objects nodes NodeID
                    Object = client.get_objects_node()
                    print("\tObject node: ", Object)
                    print("\tname of object node is: ", Object.get_browse_name())
    
                    children = Object.get_children()
                    for i in range(len(children)):
                        print("\tnode ", i, ": ", children[i].get_browse_name())
    
                    I2C_unit = children[2]
                    status_nodes = children[3]
    
                    total_cnt = 0
                    chunk_cnt = 1
                    start_time = time.time()
    
                    result = Object.call_method(children[4], ua.Variant(5, ua.VariantType.Byte), ua.Variant(6, ua.VariantType.Byte), ua.Variant(7, ua.VariantType.UInt16))
    
    
    
                    while True:
                        nodes = 0
    
                        #print("node ", 0, ": ", children[3].get_browse_name())
                        nodes = explore\
                            (I2C_unit, 1, nodes)
                        total_cnt += nodes
                        print("time: ", datetime.datetime.now(), end='')
                        print("\t runtime:", datetime.datetime.fromtimestamp(time.time()-start_time-3600), end='')
                        print("\tjust went through: ", nodes,  "\ttotal node count: ", total_cnt)
    
                        nodes = 0
                        nodes = explore_verbose(status_nodes, 1, nodes)
                        nodes = 0
    
                        print("")
                        I2C_children = I2C_unit.get_children()
                        nof_subnodes = len(I2C_children)
                        nodes = explore_verbose(I2C_children[nof_subnodes-1], 1, nodes)
    
                        # there seems to be an issue after reading the same thing for a certain time. this solves it
                        if total_cnt > (10000 * chunk_cnt):
                            client.disconnect()
                            time.sleep(1)
                            client.connect()
                            chunk_cnt += 1
    
                        time.sleep(0.01)
    
                # sub.unsubscribe(handle)
                # sub.delete()
            except KeyboardInterrupt:
                print(" user hit ctrl-c")
                client.disconnect()
                clientRunning = False
            except:
                client.disconnect()
                print("err")
                time.sleep(1)
                client.connect()
            finally:
                print(" disconnect")
                client.disconnect()