Skip to content
Snippets Groups Projects
Commit 5840e0bb authored by Jan David Mol's avatar Jan David Mol
Browse files

Use a single thread & class to connect & reconnect to OPC-UA

parent cd40f4ea
No related branches found
No related tags found
No related merge requests found
<?xml version="1.0" encoding="ASCII"?>
<pogoDsl:PogoSystem xmi:version="2.0" xmlns:xmi="http://www.omg.org/XMI" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:pogoDsl="http://www.esrf.fr/tango/pogo/PogoDsl">
<classes name="RCUSCC" pogoRevision="9.6">
<description description="" title="RCU-SCC Device Server for LOFAR2.0"
sourcePath="/opt/tango/RCUSCC" language="PythonHL"
filestogenerate="XMI file,Code files,Python Package,Protected Regions" license="APACHE"
copyright="" hasMandatoryProperty="true" hasConcreteProperty="true" hasAbstractCommand="false"
hasAbstractAttribute="false">
<inheritances classname="Device_Impl" sourcePath=""/>
<identification contact="at astron.nl - jurges" author="jurges" emailDomain="astron.nl"
classFamily="OtherInstruments" siteSpecific="" platform="Unix Like" bus="Socket"
manufacturer="ASTRON" reference=""/>
</description>
<deviceProperties name="OPC_Server_Name" mandatory="true" description="">
<type xsi:type="pogoDsl:StringType"/>
<status abstract="false" inherited="false" concrete="true" concreteHere="true"/>
</deviceProperties>
<deviceProperties name="OPC_Server_Port" mandatory="true" description="">
<type xsi:type="pogoDsl:UIntType"/>
<status abstract="false" inherited="false" concrete="true" concreteHere="true"/>
</deviceProperties>
<deviceProperties name="OPC_Time_Out" mandatory="true" description="">
<type xsi:type="pogoDsl:DoubleType"/>
<status abstract="false" inherited="false" concrete="true" concreteHere="true"/>
</deviceProperties>
<commands name="State" description="This command gets the device state (stored in its device_state data member) and returns it to the caller." execMethod="dev_state" displayLevel="OPERATOR" polledPeriod="0">
<argin description="none">
<type xsi:type="pogoDsl:VoidType"/>
</argin>
<argout description="Device state">
<type xsi:type="pogoDsl:StateType"/>
</argout>
<status abstract="true" inherited="true" concrete="true"/>
</commands>
<commands name="Status" description="This command gets the device status (stored in its device_status data member) and returns it to the caller." execMethod="dev_status" displayLevel="OPERATOR" polledPeriod="0">
<argin description="none">
<type xsi:type="pogoDsl:VoidType"/>
</argin>
<argout description="Device status">
<type xsi:type="pogoDsl:ConstStringType"/>
</argout>
<status abstract="true" inherited="true" concrete="true"/>
</commands>
<attributes name="time_offset_rw" attType="Scalar" rwType="READ_WRITE" displayLevel="OPERATOR" polledPeriod="0" maxX="" maxY="" allocReadMember="true" isDynamic="false">
<dataType xsi:type="pogoDsl:LongType"/>
<changeEvent fire="false" libCheckCriteria="false"/>
<archiveEvent fire="false" libCheckCriteria="false"/>
<dataReadyEvent fire="false" libCheckCriteria="true"/>
<status abstract="false" inherited="false" concrete="true" concreteHere="true"/>
<properties description="" label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/>
</attributes>
<attributes name="time_offset_r" attType="Scalar" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="" maxY="" allocReadMember="true" isDynamic="false">
<dataType xsi:type="pogoDsl:LongType"/>
<changeEvent fire="false" libCheckCriteria="false"/>
<archiveEvent fire="false" libCheckCriteria="false"/>
<dataReadyEvent fire="false" libCheckCriteria="true"/>
<status abstract="false" inherited="false" concrete="true" concreteHere="true"/>
<properties description="" label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/>
</attributes>
<states name="ON" description="">
<status abstract="false" inherited="false" concrete="true" concreteHere="true"/>
</states>
<states name="OFF" description="">
<status abstract="false" inherited="false" concrete="true" concreteHere="true"/>
</states>
<states name="FAULT" description="">
<status abstract="false" inherited="false" concrete="true" concreteHere="true"/>
</states>
<states name="ALARM" description="">
<status abstract="false" inherited="false" concrete="true" concreteHere="true"/>
</states>
<states name="STANDBY" description="">
<status abstract="false" inherited="false" concrete="true" concreteHere="true"/>
</states>
<preferences docHome="./doc_html" makefileHome="/usr/local/share/pogo/preferences"/>
</classes>
</pogoDsl:PogoSystem>
......@@ -69,106 +69,72 @@ def fault_on_opcua_error(func):
__all__ = ["RCUSCC", "main"]
class BackgroundConnector(Thread):
class OPCUAConnection(Thread):
"""
Run a connector function in the background, until it succeeds.
"""
def __init__(self, connect_func, try_interval=2, debug_stream=print):
def __init__(self, client, init_func, fault_func, streams, try_interval=2):
super().__init__(daemon=True)
self.connect_func = connect_func
self.client = client
self.init_func = init_func
self.fault_func = fault_func
self.try_interval = try_interval
self.debug_stream = debug_stream
self.connecting = True
self.connected = False
self.streams = streams
self.stopping = False
def try_connect(self):
try:
self.connect_func()
self.connected = True
except socket.error as e:
self.debug_stream("Could not connect: %s" % (e,))
def run(self):
try:
while not self.connected and not self.stopping:
self.try_connect()
if not self.connected:
time.sleep(self.try_interval)
finally:
self.connecting = False
def stop(self):
self.stopping = True
self.join()
class BackgroundReconnector(object):
"""
Keeps a connection attempt going.
"""
self.start()
def __init__(self, connect_func, disconnect_func, debug_stream=print):
self.connect_func = connect_func
self.disconnect_func = disconnect_func
self.debug_stream = debug_stream
def _servername(self):
return self.client.server_url.geturl()
self._connect()
def try_connect(self):
try:
self.streams.debug_stream("Connecting to server %s", self._servername())
self.client.connect()
self.streams.debug_stream("Connected to server. Initialising.")
def _connect(self):
self.connector = BackgroundConnector(self.connect_func, debug_stream=self.debug_stream)
self.connector.start()
self.init_func()
def stop(self):
self.connector.stop()
def reconnect(self):
"""
Connect or reconnect to the OPC-UA server.
"""
return True
except socket.error as e:
self.streams.error_stream("Could not connect to server %s: %s", self._servername(), e)
if self.connector.connecting:
# don't try twice in parallel
return
self.fault_func()
return False
# reap previous instance
self.debug_stream("reaping previous connector")
self.connector.join()
def try_disconnect(self):
try:
self.client.disconnect()
except Exception as e:
self.streams.error_stream("Disconnect from OPC-UA server %s failed: %s", self._servername(), e)
# disconnect if necessary
self.debug_stream("disconnecting (if connected)")
self.disconnect_func()
def run(self):
while not self.stopping:
# keep trying to connect
while not self.stopping and not self.try_connect():
time.sleep(self.try_interval)
# connect in the background
self.debug_stream("connecting")
self._connect()
# keep checking if the connection is still alive
try:
while not self.stopping:
self.client.send_hello()
time.sleep(self.try_interval)
except Exception as e:
self.streams.error_stream("Lost connection to server %s: %s", self._servername(), e)
def is_connected(self):
return wait_connected(0.0)
self.fault_func()
def wait_connected(self, timeout=None):
def stop(self):
"""
Wait until a connection is established.
`timeout' is an optional timeout, in seconds.
Returns True if the connection was established. False if not.
Stop connecting & disconnect. Can take a few seconds for the timeouts to hit.
"""
# wait for attempt to finish
self.connector.join(timeout)
if self.connector.is_alive():
# timeout happened, so not connected yet
return False
if not self.connector.connected:
# thread finished but somehow didn't connect -> an exception got thrown perhaps?
raise Exception("Could not connect to server")
self.stopping = True
self.join()
return True
self.try_disconnect()
class RCUSCC(Device):
"""
......@@ -292,12 +258,6 @@ class RCUSCC(Device):
def _init_opcua(self):
self.debug_stream("Connecting to OPC-UA server %s:%d...", self.OPC_Server_Name, self.OPC_Server_Port)
self.client = opcua.Client("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), self.OPC_Time_Out) # timeout in seconds
self.client.connect()
self.debug_stream("Connecting to OPC-UA server %s:%d done.", self.OPC_Server_Name, self.OPC_Server_Port)
try:
self.name_space_index = self.client.get_namespace_index("http://lofar.eu")
except Exception as e:
......@@ -399,11 +359,10 @@ class RCUSCC(Device):
self._Temperature_R = (0.0,)
self.attribute_mapping["Temperature_R"] = {}
self.client = opcua.Client("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), self.OPC_Time_Out) # timeout in seconds
# Connect to OPC-UA -- will set ON state on success
self.opcua_connector = BackgroundReconnector(self._init_opcua, self._disconnect, self.debug_stream)
if not self.opcua_connector.wait_connected(5.0): # allow 5 seconds to connect
self.Fault()
self.opcua_connection = OPCUAConnection(self.client, self._init_opcua, self.Fault, self)
# PROTECTED REGION END # // RCUSCC.init_device
......@@ -607,8 +566,7 @@ class RCUSCC(Device):
self.set_state(DevState.OFF)
# stop reconnecting before disconnect
self.opcua_connector.stop()
self._disconnect()
self.opcua_connection.stop()
# PROTECTED REGION END # // RCUSCC.Off
......@@ -625,10 +583,6 @@ class RCUSCC(Device):
:return:None
"""
self.set_state(DevState.FAULT)
# try to reconnect
self.opcua_connector.reconnect()
# PROTECTED REGION END # // RCUSCC..On
@command(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment