diff --git a/tangostationcontrol/tangostationcontrol/clients/snmp_client.py b/tangostationcontrol/tangostationcontrol/clients/snmp_client.py index 80638665d2c2885acbe3fe336d378511f7dc531d..ecb84074cc8c3c8487af2e27600456f14a833e4b 100644 --- a/tangostationcontrol/tangostationcontrol/clients/snmp_client.py +++ b/tangostationcontrol/tangostationcontrol/clients/snmp_client.py @@ -52,11 +52,24 @@ class SNMP_client(CommClient): def _setup_annotation(self, annotation): """ - parses the annotation this attribute received for its initialisation. + The SNMP client uses a dict and takes the following keys: + mib: the mib name + name: name of the value to read + index (optional) the index if the value thats being read from is a table. """ - wrapper = annotation_wrapper(annotation) - return wrapper + # check if the 'mib' and 'name' keys are present + if 'mib' in annotation and 'name' in annotation: + mib = annotation["mib"] + name = annotation["name"] + + # SNMP has tables that require an index number to access them. regular non-table variable have an index of 0 + idx = annotation.get('index', 0) + + else: + raise ValueError(f"SNMP attribute annotation requires a dict argument with both a 'name' and 'mib' key. Instead got: {annotation}") + + return mib, name, idx def setup_value_conversion(self, attribute): """ @@ -79,11 +92,11 @@ class SNMP_client(CommClient): """ # process the annotation - wrapper = self._setup_annotation(annotation) + mib, name, idx = self._setup_annotation(annotation) # get all the necessary data to set up the read/write functions from the attribute_wrapper dim_x, dim_y, dtype = self.setup_value_conversion(attribute) - snmp_attr = snmp_attribute(self, wrapper, dtype, dim_x, dim_y) + snmp_attr = snmp_attribute(self, mib, name, idx, dtype, dim_x, dim_y) # return the read/write functions def read_function(): @@ -104,33 +117,19 @@ class annotation_wrapper: def __init__(self, annotation): """ The SNMP client uses a dict and takes the following keys: - - either - oids: Required. An oid string of the object - or mib: the mib name name: name of the value to read index (optional) the index if the value thats being read from is a table. """ - # values start as None because we have a way too complicated interface - self.oids = None + # values start as None self.mib = None self.name = None self.idx = None - # check if the 'oids' key is used and not the 'mib' and 'name' keys - - if 'oids' in annotation and 'mib' not in annotation and 'name' not in annotation: - self.oids = annotation["oids"] - # checks to make sure this isn't present - if 'index' in annotation: - raise ValueError(f"SNMP attribute annotation doesn't support oid type declarations with an index present.") - - - # check if the 'oids' key is NOT used but instead the 'mib' and 'name' keys - elif 'oids' not in annotation and 'mib' in annotation and 'name' in annotation: + # check if the 'mib' and 'name' keys are present + if 'mib' in annotation and 'name' in annotation: self.mib = annotation["mib"] self.name = annotation["name"] @@ -138,86 +137,23 @@ class annotation_wrapper: self.idx = annotation.get('index', 0) else: - raise ValueError( - f"SNMP attribute annotation requires a dict argument with either a 'oids' key or both a 'name' and 'mib' key. Not both. Instead got: {annotation}") - - def create_objID(self, x, y): - is_scalar = (x + y) == 1 - - # if oids are used - if self.oids is not None: - # get a list of str of the oids - self.oids = self._get_oids(x, y, self.oids) - - # turn the list of oids in to a tuple of pysnmp object identities. These are used for the - objID = tuple(hlapi.ObjectIdentity(self.oids[i]) for i in range(len(self.oids))) - - # if mib + name is used - else: - - # only scalars can be used at the present time. - if not is_scalar: - # tuple(hlapi.ObjectIdentity(mib, name, idx) for i in range(len(oids))) - - raise ValueError(f"MIB + name type attributes can only be scalars, got dimensions of: ({x}, {y})") - else: - objID = hlapi.ObjectIdentity(self.mib, self.name, self.idx) - - return objID - - def _get_oids(self, x, y, in_oid): - """ - This function expands oids depending on dimensionality. - if its a scalar its left alone, but if its an array it creates a list of sequential oids if not already provided - - scalar "1.1.1.1" -> stays the same - spectrum: "1.1.1.1" -> ["1.1.1.1.1", "1.1.1.1.2, ..."] - """ - - if x == 0: - x = 1 - if y == 0: - y = 1 - - is_scalar = (x * y) == 1 - nof_oids = x * y - - # if scalar - if is_scalar: - if type(in_oid) is str: - # for ease of handling put single oid in a 1 element list - in_oid = [in_oid] - - return in_oid - - else: - # if we got a single str oid, make a list of sequential oids - if type(in_oid) is str: - return ["{}.{}".format(in_oid, i + 1) for i in range(nof_oids)] - - # if its an already expanded list of all oids - elif type(in_oid) is list and len(in_oid) == nof_oids: - return in_oid - - # if its a list of attributes with the wrong length. - else: - raise ValueError( - "SNMP oids need to either be a single value or an array the size of the attribute dimensions. got: {} expected: {}x{}={}".format( - len(in_oid), x, y, x * y)) + raise ValueError(f"SNMP attribute annotation requires a dict argument with both a 'name' and 'mib' key. Instead got: {annotation}") class snmp_attribute: - def __init__(self, client : SNMP_client, wrapper, dtype, dim_x, dim_y): + def __init__(self, client : SNMP_client, mib, name, idx, dtype, dim_x, dim_y): self.client = client - self.wrapper = wrapper + self.mib = mib + self.name = name + self.idx = idx self.dtype = dtype self.dim_x = dim_x self.dim_y = dim_y self.is_scalar = (self.dim_x + self.dim_y) == 1 - self.objID = self.wrapper.create_objID(self.dim_x, self.dim_y) + self.objID = self.create_objID() def next_wrap(self, cmd): """ @@ -285,4 +221,16 @@ class snmp_attribute: return vals + def create_objID(self): + + # only scalars can be used at the present time. + if not self.is_scalar: + # tuple(hlapi.ObjectIdentity(mib, name, idx) for i in range(len(oids))) + + raise ValueError(f"MIB + name type attributes can only be scalars, got dimensions of: ({x}, {y})") + else: + objID = hlapi.ObjectIdentity(self.mib, self.name, self.idx) + + return objID + diff --git a/tangostationcontrol/tangostationcontrol/test/clients/test_snmp_client.py b/tangostationcontrol/tangostationcontrol/test/clients/test_snmp_client.py index f061e38cedc7cefefeb72976454edecd7b647259..be185cdd3bc098625c44957ba2756d18b5d6acc4 100644 --- a/tangostationcontrol/tangostationcontrol/test/clients/test_snmp_client.py +++ b/tangostationcontrol/tangostationcontrol/test/clients/test_snmp_client.py @@ -22,7 +22,8 @@ class server_imitator: # shortcut for testing dimensionality dim_list = { "scalar": (1, 0), - "spectrum": (4, 0), + #NOTE: spectrum not supported currently + #"spectrum": (4, 0), } def get_return_val(self, snmp_type : type, dims : tuple): @@ -89,39 +90,6 @@ class server_imitator: class TestSNMP(base.TestCase): - - def test_annotation_success(self): - """ - unit test for the processing of annotation. Has 2 lists. 1 with things that should succeed and 1 with things that should fail. - """ - - client = SNMP_client(community='public', host='localhost', timeout=10, fault_func=None, try_interval=2) - - test_list = [ - # test name nad MIB type annotation - {"mib": "SNMPv2-MIB", "name": "sysDescr"}, - - # test name nad MIB type annotation with index - {"mib": "RFC1213-MIB", "name": "ipAdEntAddr", "index": (127, 0, 0, 1)}, - {"mib": "random-MIB", "name": "aName", "index": 2}, - - #oid - {"oids": "1.3.6.1.2.1.2.2.1.2.31"} - ] - - - for i in test_list: - wrapper = client._setup_annotation(annotation=i) - - if wrapper.oids is not None: - self.assertEqual(wrapper.oids, i["oids"]) - - else: - self.assertEqual(wrapper.mib, i["mib"], f"expected mib with: {i['mib']}, got: {wrapper.idx} from: {i}") - self.assertEqual(wrapper.name, i["name"], f"expected name with: {i['name']}, got: {wrapper.idx} from: {i}") - self.assertEqual(wrapper.idx, i.get('index', 0), f"expected idx with: {i.get('index', 0)}, got: {wrapper.idx} from: {i}") - - def test_annotation_fail(self): """ unit test for the processing of annotation. Has 2 lists. 1 with things that should succeed and 1 with things that should fail. @@ -130,53 +98,16 @@ class TestSNMP(base.TestCase): client = SNMP_client(community='public', host='localhost', timeout=10, fault_func=None, try_interval=2) fail_list = [ - # OIDS cant use the index - {"oids": "1.3.6.1.2.1.2.2.1.2.31", "index": 2}, - # mixed annotation is not allowed - {"oids": "1.3.6.1.2.1.2.2.1.2.31", "name": "thisShouldFail"}, # no 'name' {"mib": "random-MIB", "index": 2}, + # no MIB + {"name": "random-name", "index": 2}, ] for i in fail_list: with self.assertRaises(ValueError): client._setup_annotation(annotation=i) - def test_oids_scalar(self): - - test_oid = "1.1.1.1" - - server = server_imitator() - - x, y = server.dim_list['scalar'] - - # we just need the object to call another function - wrapper = annotation_wrapper(annotation = {"oids": "Not None lol"}) - # scalar - scalar_expected = [test_oid] - ret_oids = wrapper._get_oids(x, y, test_oid) - self.assertEqual(ret_oids, scalar_expected, f"Expected: {scalar_expected}, got: {ret_oids}") - - def test_oids_spectrum(self): - """ - Tests the "get_oids" function, which is for getting lists of sequential oids. - - Results should basically be an incrementing list of oids with the final number incremented by 1 each time. - So "1.1" with dims of 3x1 might become ["1.1.1", "1.1.2", "1.1.3"] - """ - server = server_imitator() - - test_oid = "1.1.1.1" - x, y = server.dim_list['spectrum'] - - # we just need the object to call another function - wrapper = annotation_wrapper(annotation={"oids": "Not None lol"}) - - # spectrum - spectrum_expected = [test_oid + ".1", test_oid + ".2", test_oid + ".3", test_oid + ".4"] - ret_oids = wrapper._get_oids(x, y, test_oid) - self.assertListEqual(ret_oids, spectrum_expected, f"Expected: {spectrum_expected}, got: {ret_oids}") - @mock.patch('pysnmp.hlapi.ObjectIdentity') @mock.patch('pysnmp.hlapi.ObjectType') @mock.patch('tangostationcontrol.clients.snmp_client.snmp_attribute.next_wrap') @@ -193,9 +124,7 @@ class TestSNMP(base.TestCase): m_client = mock.Mock() - - wrapper = annotation_wrapper(annotation={"oids": "1.3.6.1.2.1.2.2.1.2.31"}) - snmp_attr = snmp_attribute(client=m_client, wrapper=wrapper, dtype=server.snmp_to_numpy_dict[i], dim_x=server.dim_list[j][0], dim_y=server.dim_list[j][1]) + snmp_attr = snmp_attribute(client=m_client, mib="test", name="test", idx=0, dtype=server.snmp_to_numpy_dict[i], dim_x=server.dim_list[j][0], dim_y=server.dim_list[j][1]) val = snmp_attr.read_function() @@ -219,8 +148,7 @@ class TestSNMP(base.TestCase): m_client = mock.Mock() set_val = server.val_check(i, server.dim_list[j]) - wrapper = annotation_wrapper(annotation={"oids": "1.3.6.1.2.1.2.2.1.2.31"}) - snmp_attr = snmp_attribute(client=m_client, wrapper=wrapper, dtype=server.snmp_to_numpy_dict[i], dim_x=server.dim_list[j][0], dim_y=server.dim_list[j][1]) + snmp_attr = snmp_attribute(client=m_client, mib="test", name="test", idx=0, dtype=server.snmp_to_numpy_dict[i], dim_x=server.dim_list[j][0], dim_y=server.dim_list[j][1]) res_lst = [] def test(*value):