diff --git a/tangostationcontrol/tangostationcontrol/test/clients/test_snmp_client.py b/tangostationcontrol/tangostationcontrol/test/clients/test_snmp_client.py index 005713fd6e0a384b95587380b02fbdd862f8fa78..f061e38cedc7cefefeb72976454edecd7b647259 100644 --- a/tangostationcontrol/tangostationcontrol/test/clients/test_snmp_client.py +++ b/tangostationcontrol/tangostationcontrol/test/clients/test_snmp_client.py @@ -7,84 +7,85 @@ from tangostationcontrol.test import base from tangostationcontrol.clients.snmp_client import SNMP_client, snmp_attribute, annotation_wrapper -# conversion dict -snmp_to_numpy_dict = { - hlapi.Integer32: numpy.int64, - hlapi.TimeTicks: numpy.int64, - str: str, - hlapi.Counter32: numpy.int64, - hlapi.Gauge32: numpy.int64, - hlapi.IpAddress: str, -} - -# shortcut for testing dimensionality -dim_list = { - "scalar": (1, 0), - "spectrum": (4, 0), -} - -def get_return_val(snmp_type : type, dims : tuple): - """ - provides the return value for the set/get functions. - """ - - if dims == dim_list["scalar"]: - if snmp_type is hlapi.ObjectIdentity: - read_val = (None, snmp_type("1.3.6.1.2.1.1.1.0")) - elif snmp_type is hlapi.IpAddress: - read_val = (None, snmp_type("1.1.1.1")) - else: - read_val = (None, snmp_type(1)) - - - elif dims == dim_list["spectrum"]: - if snmp_type is hlapi.ObjectIdentity: - read_val = [] - for _i in range(dims[0]): - read_val.append((None, snmp_type(f"1.3.6.1.2.1.1.1.0.1"))) - elif snmp_type is hlapi.IpAddress: - read_val = [] - for _i in range(dims[0]): - read_val.append((None, snmp_type(f"1.1.1.1"))) - else: - read_val = [] - for _i in range(dims[0]): - read_val.append((None, snmp_type(1))) - else: - raise Exception("Image not yet supported :(") - - return read_val - - -def val_check(snmp_type : type, dims : tuple): - """ - provides the expected value for the get function to compare against after it has been converted to buildin/numpy types again - also provides the values for the set function to set - """ - - if dims == dim_list["scalar"]: - if snmp_type is hlapi.ObjectIdentity: - check_val = "1.3.6.1.2.1.1.1.0.1" - elif snmp_type is hlapi.IpAddress: - check_val = "1.1.1.1" - elif snmp_type is str: - check_val = "1" + +class server_imitator: + # conversion dict + snmp_to_numpy_dict = { + hlapi.Integer32: numpy.int64, + hlapi.TimeTicks: numpy.int64, + str: str, + hlapi.Counter32: numpy.int64, + hlapi.Gauge32: numpy.int64, + hlapi.IpAddress: str, + } + + # shortcut for testing dimensionality + dim_list = { + "scalar": (1, 0), + "spectrum": (4, 0), + } + + def get_return_val(self, snmp_type : type, dims : tuple): + """ + provides the return value for the set/get functions that an actual server would return. + """ + + if dims == self.dim_list["scalar"]: + if snmp_type is hlapi.ObjectIdentity: + read_val = (None, snmp_type("1.3.6.1.2.1.1.1.0")) + elif snmp_type is hlapi.IpAddress: + read_val = (None, snmp_type("1.1.1.1")) + else: + read_val = (None, snmp_type(1)) + + + elif dims == self.dim_list["spectrum"]: + if snmp_type is hlapi.ObjectIdentity: + read_val = [] + for _i in range(dims[0]): + read_val.append((None, snmp_type(f"1.3.6.1.2.1.1.1.0.1"))) + elif snmp_type is hlapi.IpAddress: + read_val = [] + for _i in range(dims[0]): + read_val.append((None, snmp_type(f"1.1.1.1"))) + else: + read_val = [] + for _i in range(dims[0]): + read_val.append((None, snmp_type(1))) else: - check_val = 1 - elif dims == dim_list["spectrum"]: - if snmp_type is hlapi.ObjectIdentity: - check_val = ["1.3.6.1.2.1.1.1.0.1"] * dims[0] - - elif snmp_type is hlapi.IpAddress: - check_val = ["1.1.1.1"] * dims[0] - elif snmp_type is str: - check_val = ["1"] * dims[0] + raise Exception("Image not yet supported :(") + + return read_val + + + def val_check(self, snmp_type : type, dims : tuple): + """ + provides the values we expect and would provide to the attribute after converting the + """ + + if dims == self.dim_list["scalar"]: + if snmp_type is hlapi.ObjectIdentity: + check_val = "1.3.6.1.2.1.1.1.0.1" + elif snmp_type is hlapi.IpAddress: + check_val = "1.1.1.1" + elif snmp_type is str: + check_val = "1" + else: + check_val = 1 + elif dims == self.dim_list["spectrum"]: + if snmp_type is hlapi.ObjectIdentity: + check_val = ["1.3.6.1.2.1.1.1.0.1"] * dims[0] + + elif snmp_type is hlapi.IpAddress: + check_val = ["1.1.1.1"] * dims[0] + elif snmp_type is str: + check_val = ["1"] * dims[0] + else: + check_val = [1] * dims[0] else: - check_val = [1] * dims[0] - else: - raise Exception("Image not yet supported :(") + raise Exception("Image not yet supported :(") - return check_val + return check_val class TestSNMP(base.TestCase): @@ -145,7 +146,9 @@ class TestSNMP(base.TestCase): test_oid = "1.1.1.1" - x, y = dim_list['scalar'] + server = server_imitator() + + x, y = server.dim_list['scalar'] # we just need the object to call another function wrapper = annotation_wrapper(annotation = {"oids": "Not None lol"}) @@ -161,9 +164,10 @@ class TestSNMP(base.TestCase): Results should basically be an incrementing list of oids with the final number incremented by 1 each time. So "1.1" with dims of 3x1 might become ["1.1.1", "1.1.2", "1.1.3"] """ + server = server_imitator() test_oid = "1.1.1.1" - x, y = dim_list['spectrum'] + x, y = server.dim_list['spectrum'] # we just need the object to call another function wrapper = annotation_wrapper(annotation={"oids": "Not None lol"}) @@ -181,18 +185,21 @@ class TestSNMP(base.TestCase): Attempts to read a fake SNMP variable and checks whether it got what it expected """ - for j in dim_list: - for i in snmp_to_numpy_dict: - m_next.return_value = (None, None, None, get_return_val(i, dim_list[j])) + server = server_imitator() + + for j in server.dim_list: + for i in server.snmp_to_numpy_dict: + m_next.return_value = (None, None, None, server.get_return_val(i, server.dim_list[j])) m_client = mock.Mock() + wrapper = annotation_wrapper(annotation={"oids": "1.3.6.1.2.1.2.2.1.2.31"}) - snmp_attr = snmp_attribute(client=m_client, wrapper=wrapper, dtype=snmp_to_numpy_dict[i], dim_x=dim_list[j][0], dim_y=dim_list[j][1]) + snmp_attr = snmp_attribute(client=m_client, wrapper=wrapper, dtype=server.snmp_to_numpy_dict[i], dim_x=server.dim_list[j][0], dim_y=server.dim_list[j][1]) val = snmp_attr.read_function() - checkval = val_check(i, dim_list[j]) + checkval = server.val_check(i, server.dim_list[j]) self.assertEqual(checkval, val, f"Expected: {checkval}, got: {val}") @mock.patch('pysnmp.hlapi.ObjectIdentity') @@ -202,21 +209,23 @@ class TestSNMP(base.TestCase): """ Attempts to write a value to an SNMP server, but instead intercepts it and compared whether the values is as expected. """ + server = server_imitator() + - for j in dim_list: - for i in snmp_to_numpy_dict: - m_next.return_value = (None, None, None, get_return_val(i, dim_list[j])) + for j in server.dim_list: + for i in server.snmp_to_numpy_dict: + m_next.return_value = (None, None, None, server.get_return_val(i, server.dim_list[j])) m_client = mock.Mock() - set_val = val_check(i, dim_list[j]) + set_val = server.val_check(i, server.dim_list[j]) wrapper = annotation_wrapper(annotation={"oids": "1.3.6.1.2.1.2.2.1.2.31"}) - snmp_attr = snmp_attribute(client=m_client, wrapper=wrapper, dtype=snmp_to_numpy_dict[i], dim_x=dim_list[j][0], dim_y=dim_list[j][1]) + snmp_attr = snmp_attribute(client=m_client, wrapper=wrapper, dtype=server.snmp_to_numpy_dict[i], dim_x=server.dim_list[j][0], dim_y=server.dim_list[j][1]) res_lst = [] def test(*value): res_lst.append(value[1]) - return None, None, None, get_return_val(i, dim_list[j]) + return None, None, None, server.get_return_val(i, server.dim_list[j]) hlapi.ObjectType = test @@ -225,7 +234,7 @@ class TestSNMP(base.TestCase): if len(res_lst) == 1: res_lst = res_lst[0] - checkval = val_check(i, dim_list[j]) + checkval = server.val_check(i, server.dim_list[j]) self.assertEqual(checkval, res_lst, f"Expected: {checkval}, got: {res_lst}")