Skip to content
Snippets Groups Projects
Commit 67fe6133 authored by Jan David Mol's avatar Jan David Mol
Browse files

Auto-reconnect to OPC-UA server

parent b55e3f0b
No related branches found
No related tags found
No related merge requests found
...@@ -26,6 +26,8 @@ import sys ...@@ -26,6 +26,8 @@ import sys
import opcua import opcua
import traceback import traceback
import numpy import numpy
import time
from threading import Thread
from functools import wraps from functools import wraps
import socket import socket
...@@ -41,10 +43,10 @@ def fault_on_opcua_error(func): ...@@ -41,10 +43,10 @@ def fault_on_opcua_error(func):
def opcua_error_wrapper(self, *args, **kwargs): def opcua_error_wrapper(self, *args, **kwargs):
try: try:
return func(self, *args, **kwargs) return func(self, *args, **kwargs)
except socket.error as e: except Exception as e:
self.error_stream("Communication with the OPC-UA server %s:%d failed. Trace: %s" % (self.OPC_Server_Name, self.OPC_Server_Port, traceback.format_exc())) self.error_stream("Communication with the OPC-UA server %s:%d failed. Reconnecting. Trace: %s" % (self.OPC_Server_Name, self.OPC_Server_Port, traceback.format_exc()))
self.set_state(DevState.FAULT) self.set_state(DevState.FAULT)
self._disconnect() self._reconnect_opcua()
return None return None
...@@ -53,6 +55,35 @@ def fault_on_opcua_error(func): ...@@ -53,6 +55,35 @@ def fault_on_opcua_error(func):
__all__ = ["RCUSCC", "main"] __all__ = ["RCUSCC", "main"]
class BackgroundConnector(Thread):
"""
Run a function in the background, until it succeeds.
"""
def __init__(self, connect_func, try_interval=2, debug_stream=print):
super().__init__(daemon=True)
self.connect_func = connect_func
self.try_interval = try_interval
self.debug_stream = debug_stream
self.connecting = True
self.connected = False
def try_connect(self):
try:
self.connect_func()
self.connected = True
except socket.error as e:
self.debug_stream("Could not connect: %s" % (e,))
finally:
self.connecting = False
def run(self):
while not self.connected:
self.try_connect()
if not self.connected:
time.sleep(self.try_interval)
class RCUSCC(Device): class RCUSCC(Device):
""" """
...@@ -174,8 +205,41 @@ class RCUSCC(Device): ...@@ -174,8 +205,41 @@ class RCUSCC(Device):
return DummyNode() return DummyNode()
def _reconnect_opcua(self):
"""
Connect or reconnect to the OPC-UA server.
"""
if self.opcua_connector:
if self.opcua_connector.connecting:
# don't try twice in parallel
return
# reap previous instance
self.debug_stream("reaping previous connector")
self.opcua_connector.join()
self.debug_stream("disconnecting (if connected)")
self._disconnect()
self.debug_stream("connecting")
self.opcua_connector = BackgroundConnector(self._init_opcua)
self.opcua_connector.start()
def _connect(self): def _wait_opcua_connect(self):
"""
Wait until an OPC-UA connection is established.
"""
if not self.opcua_connector:
# start connecting if not already
self._reconnect_opcua()
self.opcua_connector.join()
if not self.opcua_connector.connected:
raise Exception("Could not connect to OPC-UA server")
def _init_opcua(self):
self.debug_stream("Connecting to OPC-UA server %s:%d...", self.OPC_Server_Name, self.OPC_Server_Port) self.debug_stream("Connecting to OPC-UA server %s:%d...", self.OPC_Server_Name, self.OPC_Server_Port)
self.client = opcua.Client("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), self.OPC_Time_Out) # timeout in seconds self.client = opcua.Client("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), self.OPC_Time_Out) # timeout in seconds
...@@ -191,7 +255,6 @@ class RCUSCC(Device): ...@@ -191,7 +255,6 @@ class RCUSCC(Device):
self.obj_node = self.client.get_objects_node() self.obj_node = self.client.get_objects_node()
self.pcc_node = self.obj_node.get_child(["{}:PCC".format(self.name_space_index)]) self.pcc_node = self.obj_node.get_child(["{}:PCC".format(self.name_space_index)])
def _extract_pcc_nodes(self):
self.debug_stream("Mapping OPC-UA MP/CP to attributes...") self.debug_stream("Mapping OPC-UA MP/CP to attributes...")
# 2020-11-27, thomas # 2020-11-27, thomas
...@@ -242,6 +305,9 @@ class RCUSCC(Device): ...@@ -242,6 +305,9 @@ class RCUSCC(Device):
# PROTECTED REGION ID(RCUSCC.init_device) ENABLED START # # PROTECTED REGION ID(RCUSCC.init_device) ENABLED START #
self.set_state(DevState.INIT) self.set_state(DevState.INIT)
# Thread to connect to OPC-UA in the background
self.opcua_connector = None
# Init the dict that contains attribute to OPC-UA MP/CP mappings. # Init the dict that contains attribute to OPC-UA MP/CP mappings.
self.attribute_mapping = {} self.attribute_mapping = {}
...@@ -284,8 +350,7 @@ class RCUSCC(Device): ...@@ -284,8 +350,7 @@ class RCUSCC(Device):
# Set defaults to property values. # Set defaults to property values.
try: try:
self._connect() self._wait_opcua_connect()
self._extract_pcc_nodes()
self.set_state(DevState.ON) self.set_state(DevState.ON)
except Exception as e: except Exception as e:
...@@ -306,6 +371,8 @@ class RCUSCC(Device): ...@@ -306,6 +371,8 @@ class RCUSCC(Device):
self.client.close_secure_channel() self.client.close_secure_channel()
except Exception as e: except Exception as e:
self.warn_stream("Disconnect from OPC-UA server %s:%d failed. Trace: %s" % (self.OPC_Server_Name, self.OPC_Server_Port, traceback.format_exc())) self.warn_stream("Disconnect from OPC-UA server %s:%d failed. Trace: %s" % (self.OPC_Server_Name, self.OPC_Server_Port, traceback.format_exc()))
finally:
self.client = None
def delete_device(self): def delete_device(self):
"""Hook to delete resources allocated in init_device. """Hook to delete resources allocated in init_device.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment