Skip to content
Snippets Groups Projects
Commit a0f13b97 authored by Taya Snijder's avatar Taya Snijder
Browse files

Merge branch 'L2SS-1220-expose-whether-opcua-hardware-can-be-reached' into 'master'

L2SS-1220 added connection test attribute

Closes L2SS-1220

See merge request !576
parents 58047a9d c5167959
Branches
No related tags found
1 merge request!576L2SS-1220 added connection test attribute
......@@ -8,11 +8,12 @@ import socket
import asyncua
import numpy
from asyncua import Client
from tangostationcontrol.clients.comms_client import AsyncCommClient
logger = logging.getLogger()
__all__ = ["OPCUAConnection", "ProtocolAttribute"]
__all__ = ["OPCUAConnection", "ProtocolAttribute", "test_OPCUA_connection"]
numpy_to_OPCua_dict = {
numpy.int8: asyncua.ua.VariantType.SByte,
......@@ -366,3 +367,18 @@ class ProtocolAttribute:
raise TypeError(
f"Cannot write value to OPC-UA attribute '{attribute_name}': tried to convert data type {our_type} to expected server type {expected_server_type}, server reports type {actual_server_type}"
) from e
async def test_OPCUA_connection(server, port):
"""
This function exist in order to test whether an OPC ua server can be reached
"""
try:
client = Client(f"opc.tcp://{server}:{port}/", 2)
await client.connect()
await client.disconnect()
return True
except BaseException as e:
logger.debug("Testing OPCua connection failed", exc_info=True)
return False
......@@ -303,3 +303,29 @@ class MIBLoader:
def load_pymib(self, mib_name):
self.mibBuilder.loadModules(mib_name)
def test_SNMP_connection(community, host, port=161):
"""
Attempts to read a single SNMP point with the given parameters
sysUpTime is chosen because it is pretty much Ubiquitous point.
"""
iterator = hlapi.getCmd(
hlapi.SnmpEngine(),
hlapi.CommunityData(community, mpModel=1),
hlapi.UdpTransportTarget((host, port), timeout=2, retries=1),
hlapi.ContextData(),
hlapi.ObjectType(hlapi.ObjectIdentity("SNMPv2-MIB", "sysUpTime", 0)),
)
errorIndication, errorStatus, errorIndex, _ = next(iterator)
if errorIndication or errorStatus or errorIndex:
# if not (None, 0, 0) there has been some error
logger.debug(
f"Connection test failed. errorIndication: {errorIndication}, errorStatus: {errorStatus}, errorIndex: {errorIndex}"
)
return False
else:
return True
......@@ -12,7 +12,10 @@ import numpy
# PyTango imports
from tango.server import device_property, attribute
from tangostationcontrol.clients.opcua_client import OPCUAConnection
from tangostationcontrol.clients.opcua_client import (
OPCUAConnection,
test_OPCUA_connection,
)
from tangostationcontrol.common.lofar_logging import log_exceptions
from tangostationcontrol.devices.interfaces.lofar_device import LOFARDevice
......@@ -67,6 +70,25 @@ class OPCUADevice(LOFARDevice):
doc="OPC-UA attributes that this device requested, but which are not exposed on the server. These attributes are replaced with a no-op and thus do not function as expected.",
)
can_connect_R = attribute(
dtype=bool,
)
def read_can_connect_R(self):
"""
Test whether we can connect with the OPC-ua server
Sets up an OPC ua connection from scratch, as it cannot be assumed
that self.opcua_connection has been created or functions
"""
try:
loop = asyncio.get_running_loop()
except RuntimeError as _e:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop.run_until_complete(
test_OPCUA_connection(self.OPC_Server_Name, self.OPC_Server_Port)
)
# --------
# overloaded functions
# --------
......@@ -77,7 +99,7 @@ class OPCUADevice(LOFARDevice):
# set up the OPC ua client
self.opcua_connection = OPCUAConnection(
"opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port),
f"opc.tcp://{self.OPC_Server_Name}:{self.OPC_Server_Port}/",
self.OPC_namespace,
self.OPC_Time_Out,
self.Fault,
......
......@@ -10,8 +10,12 @@ import os
import pkg_resources
from pysmi import debug
from tango.server import device_property
from tangostationcontrol.clients.snmp_client import SNMPClient, MIBLoader
from tango.server import device_property, attribute
from tangostationcontrol.clients.snmp_client import (
SNMPClient,
MIBLoader,
test_SNMP_connection,
)
from tangostationcontrol.common.lofar_logging import (
device_logging_to_python,
log_exceptions,
......@@ -42,6 +46,16 @@ class SNMPDevice(LOFARDevice):
SNMP_version = device_property(dtype="DevULong", mandatory=True)
can_connect_R = attribute(
dtype=bool,
)
def read_can_connect_R(self):
"""
Test whether we can connect with the SNMP server
"""
return test_SNMP_connection(self.SNMP_community, self.SNMP_host)
@log_exceptions()
def configure_for_initialise(self):
# set up the SNMP client
......
......@@ -4,7 +4,10 @@
import asyncua
import numpy
from tangostationcontrol.clients.opcua_client import OPCUAConnection
from tangostationcontrol.clients.opcua_client import (
OPCUAConnection,
test_OPCUA_connection,
)
from tangostationcontrol.integration_test import base
......@@ -199,3 +202,16 @@ class TestClientServer(base.IntegrationAsyncTestCase):
await test_client._call_method(["throws"])
finally:
await test_client.stop()
async def test_connection(self):
port = 14846
# test connection while server does not yet exist
result = await test_OPCUA_connection("127.0.0.1", port)
self.assertFalse(result)
await self.setup_server(port)
# test connection while server has been started
result = await test_OPCUA_connection("127.0.0.1", port)
self.assertTrue(result)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment