diff --git a/SAS/MoM/MoMQueryService/MoMQueryServiceClient/momqueryrpc.py b/SAS/MoM/MoMQueryService/MoMQueryServiceClient/momqueryrpc.py index 01a59bcb525a19bbf07202092e6f7d5a8ac74356..c692dba3732f5e2447c73e6418dccb9a0d2cfd3d 100644 --- a/SAS/MoM/MoMQueryService/MoMQueryServiceClient/momqueryrpc.py +++ b/SAS/MoM/MoMQueryService/MoMQueryServiceClient/momqueryrpc.py @@ -119,6 +119,38 @@ class MoMQueryRPC(RPCWrapper): logger.info("Received trigger_id: %s", result) return result + def get_trigger_quota(self, project_name): + """returns trigger quota as (current,max) tuple for project with given name + :param project_name + :return: (Integer, Integer) + """ + logger.info("Requesting GetTriggerQuota for project: %s", project_name) + result = self.rpc('GetTriggerId', project_name=project_name) + logger.info("Received trigger quota: %s", result) + return result + + def update_trigger_quota(self, project_name): + """ + count all the accepted triggers that are not cancelled, and update the trigger quota field in mom accordingly + returns updated quota as (current, max) tuple (same as get_trigger_quota) + :param project_name + :return: (Integer, Integer) + """ + logger.info("Requesting UpdateTriggerQuota for project: %s", project_name) + result = self.rpc('UpdateTriggerQuota', project_name=project_name) + logger.info("Received updated trigger quota: %s", result) + return result + + def cancel_trigger(self, trigger_id, reason): + """ flags trigger as canceled and returns updated trigger quota as (current, max) tuple + :param trigger_id, reason + :return (Integer, Integer) + """ + logger.info("Requesting CancelTrigger for trigger id: %s | reason: %s", trigger_id, reason) + result = self.rpc('CancelTrigger', trigger_id=trigger_id, reason=reason) + logger.info("Requesting CancelTrigger for trigger id %s returned updated project trigger quota: %s", trigger_id, result) + return result + def get_project_details(self, mom_id): """returns email addresses of pi and contact author for a project mom id :param mom_id @@ -143,7 +175,6 @@ class MoMQueryRPC(RPCWrapper): logger.info("Received project priorities for %s mom objects" % (len(result))) return result - def getObjectDetails(self, ids): '''get the object details for one or more mom ids :param ids single or list of mom ids diff --git a/SAS/MoM/MoMQueryService/MoMQueryServiceServer/momqueryservice.py b/SAS/MoM/MoMQueryService/MoMQueryServiceServer/momqueryservice.py index 329530cf56c2562f1bcd4293137b13ae43bc23d2..d8a2c33868b03ed586cf227de875a27b5105dc29 100755 --- a/SAS/MoM/MoMQueryService/MoMQueryServiceServer/momqueryservice.py +++ b/SAS/MoM/MoMQueryService/MoMQueryServiceServer/momqueryservice.py @@ -139,7 +139,7 @@ class MoMDatabaseWrapper: def _executeInsertQuery(self, query, data=None): # try to execute query on flaky lofar mysql connection - # max of 3 tries, on success return result + # max of 3 tries, on success return the row id # use new connection for every query, # because on the flaky lofar network a connection may appear functional but returns improper results. maxtries = 3 @@ -156,6 +156,26 @@ class MoMDatabaseWrapper: if i+1 == maxtries: raise e + def _executeUpdateQuery(self, query, data=None): + # try to execute query on flaky lofar mysql connection + # max of 3 tries, on success return number of affected rows + # use new connection for every query, + # because on the flaky lofar network a connection may appear functional but returns improper results. + maxtries = 3 + + for i in range(maxtries): + try: + self._connect() + cursor = self.conn.cursor(dictionary=True) + cursor.execute(query, data) + rowcount = cursor.rowcount + self.conn.commit() + return rowcount + except (OperationalError, AttributeError) as e: + logger.error(str(e)) + + if i + 1 == maxtries: raise e + def add_trigger(self, user_name, host_name, project_name, meta_data): logger.info("add_trigger for user_name: %s, host_name: %s, project_name: %s, meta_data: %s", user_name, host_name, project_name, meta_data) @@ -171,6 +191,49 @@ values (%s, %s, %s, %s)""" return row_id + def cancel_trigger(self, trigger_id, reason): + logger.info("cancel_trigger for trigger_id: %s, reason: %s", trigger_id, reason) + + query = """UPDATE lofar_trigger + SET cancelled=1, cancelled_at=NOW(), cancelled_reason=%s + WHERE id = %s""" + parameters = (reason, str(trigger_id)) + rowcount = self._executeUpdateQuery(query, parameters) + + if rowcount < 1: + raise ValueError("cancel_trigger for trigger_id: %s returned affected row count of %s" % (trigger_id, rowcount)) + + logger.info("cancel_trigger for trigger_id: %s done. Affected rows: %s", trigger_id, rowcount) + + def update_trigger_quota(self, project_name): + logger.info("update_trigger_quota for project: %s", project_name) + + query = """SELECT * FROM lofar_trigger + WHERE projectname = %s + AND cancelled = 0""" + + parameters = (project_name,) + rows = self._executeSelectQuery(query, parameters) + if len(rows) == 0: + raise ValueError('No trigger quota found for project %s' % project_name) + + numtriggers = len(rows) + + query = """UPDATE resource + JOIN resourcetype ON resource.resourcetypeid = resourcetype.id + JOIN mom2object ON resource.projectid=mom2object.mom2id + SET resource.used=%s + WHERE resourcetype.type = 'LOFAR_TRIGGERS' + AND mom2object.name = %s""" + + parameters = (numtriggers, project_name) + rowcount = self._executeUpdateQuery(query, parameters) + + if rowcount < 1: + raise ValueError("update_trigger_quota for project %s returned returned affected row count of %s" % (project_name, rowcount)) + + logger.info("update_trigger_quota for project %s done. Affected rows: %s", project_name, rowcount) + def get_project_priority(self, project_name): logger.info("get_project_priority for project_name: %s", project_name) @@ -379,6 +442,46 @@ where mom2object.name = %s""" logger.info("get_trigger_id for mom_id (%s): %s", mom_id, trigger_id) return trigger_id + def get_trigger_quota(self, project_name): + """ returns trigger quota as tuple (current, max) + :param project_name + :return: (Integer, Integer) + """ + logger.info("get_trigger_quota for project_name: %s", project_name) + + query = """SELECT used, allocation FROM resource + JOIN resourcetype ON resource.resourcetypeid = resourcetype.id + JOIN mom2object ON resource.projectid=mom2object.mom2id + WHERE resourcetype.type = 'LOFAR_TRIGGERS' + AND mom2object.name = %s""" + + parameters = (project_name,) + rows = self._executeSelectQuery(query, parameters) + if len(rows) == 0: + raise ValueError("no trigger quota found for project_name %s in MoM database" % project_name) + + quota = (rows[0]['used'],rows[0]['allocation']) + logger.info("get_trigger_quota for project_name %s: %s", project_name, quota) + return quota + + def get_projectname_for_trigger(self, trigger_id): + """ returns project id for given trigger id + :param trigger_id + :return: String + """ + logger.info("get_projectname_for_trigger: %s", trigger_id) + + query = """SELECT projectname FROM lofar_trigger WHERE id = %s""" + parameters = (str(trigger_id),) + + rows = self._executeSelectQuery(query, parameters) + if len(rows) == 0: + raise ValueError("trigger_id (%s) not found in MoM database" % trigger_id) + + projectname = rows[0]['projectname'] + logger.info("get_projectname_for_trigger %s: %s", trigger_id, projectname) + return projectname + def get_project_details(self, mom_id): """get the pi and contact author email addresses for a project mom id :param a project mom id @@ -1031,6 +1134,7 @@ class ProjectDetailsQueryHandler(MessageHandlerInterface): self.service2MethodMap = { 'AddTrigger': self.add_trigger, + 'CancelTrigger': self.cancel_trigger, 'GetProjectPriority': self.get_project_priority, 'AllowsTriggers': self.allows_triggers, 'AutorizedAddWithStatus': self.authorized_add_with_status, @@ -1052,14 +1156,17 @@ class ProjectDetailsQueryHandler(MessageHandlerInterface): 'GetTaskIdsGraph': self.getTaskIdsGraph, 'GetProjectPrioritiesForObjects': self.get_project_priorities_for_objects, 'GetStationSelection': self.get_station_selection, - 'GetTimeRestrictions': self.get_time_restrictions - } + 'GetTimeRestrictions': self.get_time_restrictions, + 'GetTriggerQuota': self.get_trigger_quota, + 'UpdateTriggerQuota': self.update_trigger_quota, + } def prepare_loop(self): self.momdb = MoMDatabaseWrapper(self.dbcreds) def add_trigger(self, user_name, host_name, project_name, meta_data): row_id = self.momdb.add_trigger(user_name, host_name, project_name, meta_data) + self.momdb.update_trigger_quota(project_name) return {"row_id": row_id} def get_project_priority(self, project_name): @@ -1090,6 +1197,23 @@ class ProjectDetailsQueryHandler(MessageHandlerInterface): else: return {"trigger_id": None, "status": "Error", "errors": ["No trigger_id for mom_id: " + str(mom_id)]} + def update_trigger_quota(self, project_name): + self.momdb.update_trigger_quota(project_name) + current, max = self.momdb.get_trigger_quota(project_name) + return {"used_triggers": current, "allocated_triggers": max} + + def get_trigger_quota(self, project_name): + quota = self.momdb.get_trigger_quota(project_name) + current, max = quota + return {"used_triggers": current, "allocated_triggers": max} + + def cancel_trigger(self, trigger_id, reason): + self.momdb.cancel_trigger(trigger_id, reason) + project_name = self.momdb.get_projectname_for_trigger(trigger_id) + self.momdb.update_trigger_quota(project_name) + current, max = self.momdb.get_trigger_quota(project_name) + return {"used_triggers": current, "allocated_triggers": max} + def get_project_details(self, mom_id): return self.momdb.get_project_details(mom_id) diff --git a/SAS/MoM/MoMQueryService/test/t_momqueryservice.py b/SAS/MoM/MoMQueryService/test/t_momqueryservice.py index 452e017618fe3203e46280f5a28cb4dd5f11a88a..83fd044ec9e53c7a8ebdcae0bf3e7d38326b7023 100755 --- a/SAS/MoM/MoMQueryService/test/t_momqueryservice.py +++ b/SAS/MoM/MoMQueryService/test/t_momqueryservice.py @@ -23,6 +23,7 @@ import uuid from mysql import connector import logging import json +import datetime logger = logging.getLogger(__name__) @@ -176,6 +177,9 @@ def populate_db(mysqld): "arrivaltime timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, " "projectname varchar(100) NOT NULL DEFAULT '', " "metadata TEXT NOT NULL, " + "cancelled BOOLEAN NOT NULL DEFAULT 0, " + "cancelled_at timestamp NULL, " + "cancelled_reason char(255), " "PRIMARY KEY (id), " "FOREIGN KEY (username) REFERENCES useradministration.useraccount(username)" ") ") @@ -289,6 +293,28 @@ def populate_db(mysqld): "KEY demixing_parameters_FK (demixing_parameters_id)," "KEY bbs_parameters_FK (bbs_parameters_id)" ") ENGINE=InnoDB AUTO_INCREMENT=75471 DEFAULT CHARSET=latin1") + cursor.execute("CREATE TABLE resource (" + "id int(11) NOT NULL auto_increment," + "projectid int(11) default NULL," + "resourcetypeid int(11) NOT NULL," + "allocation double default NULL," + "used double default NULL," + "unit varchar(50) NOT NULL DEFAULT ''," + "projectpath varchar(255) default NULL," + "PRIMARY KEY (id)," + "KEY resourcetype_resource_IND (resourcetypeid)," + "KEY mom2object_resource_FK (projectid)" + #"CONSTRAINT mom2object_resource_FK FOREIGN KEY (projectid) REFERENCES mom2object (id) ON DELETE CASCADE ON UPDATE NO ACTION," + #"CONSTRAINT resourcetype_resource_FK FOREIGN KEY (resourcetypeid) REFERENCES resourcetype (id)" + ") ENGINE=InnoDB DEFAULT CHARSET=latin1") + cursor.execute("CREATE TABLE resourcetype (" + "id int(11) NOT NULL auto_increment, " + "name varchar(255) NOT NULL, " + "hosturi varchar(255) default NULL," + "type varchar(50) NOT NULL," + "PRIMARY KEY (id)," + "KEY resourcetype_name_IND (name)" + ") ENGINE=InnoDB DEFAULT CHARSET=latin1") # mom privilege cursor.execute("CREATE DATABASE momprivilege") cursor.execute("CREATE TABLE momprivilege.statustransitionrole ( " @@ -422,6 +448,17 @@ class TestProjectDetailsQueryHandler(unittest.TestCase): self.assertEqual(return_value['row_id'], row_id) + def test_add_trigger_calls_update_trigger_quota_with_correct_projectname(self): + project_name = "project" + host_name = "host name" + user_name = "user name" + meta_data = "meta data" + row_id = 44 + + self.project_details_query_handler.add_trigger(user_name, host_name, project_name, meta_data) + + self.mom_database_wrapper_mock().update_trigger_quota.assert_called_with(project_name) + def test_get_trigger_id_returns_trigger_id_when_mom_wrapper_returns_an_id(self): trigger_id = 1234 @@ -505,6 +542,42 @@ class TestProjectDetailsQueryHandler(unittest.TestCase): self.assertEqual(result[0]["min"], rg_min) self.assertEqual(result[0]["max"], rg_max) + def test_get_trigger_quota_returns_what_the_mom_wrapper_returns(self): + used = 5 + max = 10 + self.mom_database_wrapper_mock().get_trigger_quota.return_value = (used, max) + result = self.project_details_query_handler.get_trigger_quota(self.project_name) + self.assertEqual(result["used_triggers"], used) + self.assertEqual(result["allocated_triggers"], max) + + def test_update_trigger_quota_returns_what_get_trigger_quota_returns(self): + used = 5 + max = 10 + self.mom_database_wrapper_mock().get_trigger_quota.return_value = (used, max) + + result = self.project_details_query_handler.get_trigger_quota(self.project_name) + self.assertEqual(result["used_triggers"], used) + self.assertEqual(result["allocated_triggers"], max) + + def test_cancel_trigger_calls_update_trigger_quota_with_correct_projectname(self): + project = 'myproject' + trigger = 1234 + self.mom_database_wrapper_mock().get_projectname_for_trigger.return_value = project + self.mom_database_wrapper_mock().get_trigger_quota.return_value = (1, 10) + + self.project_details_query_handler.cancel_trigger(trigger, "That's why!") + self.mom_database_wrapper_mock().get_projectname_for_trigger.assert_called_with(trigger) + self.mom_database_wrapper_mock().update_trigger_quota.assert_called_with(project) + + def test_cancel_trigger_returns_what_get_trigger_quota_returns(self): + used = 5 + max = 10 + self.mom_database_wrapper_mock().get_trigger_quota.return_value = (used, max) + + result = self.project_details_query_handler.cancel_trigger(1234, 'no reason') + self.assertEqual(result["used_triggers"], used) + self.assertEqual(result["allocated_triggers"], max) + class TestMomQueryRPC(unittest.TestCase): test_id = 1234 @@ -633,6 +706,15 @@ class TestMomQueryRPC(unittest.TestCase): "MessageId": message_id, "status": "OK" }) + used_triggers = 1 + allocated_triggers = 10 + qpid_message_get_trigger_quota = QpidMessage({"used_triggers": used_triggers, "allocated_triggers": allocated_triggers}, + properties={ + "SystemName": "LOFAR", + "MessageType": "ReplyMessage", + "MessageId": message_id, + "status": "OK" + }) def setUp(self): # the mock library had difficulty to mock ToBus and FromBus probably to some weir naming issue. @@ -1018,6 +1100,45 @@ class TestMomQueryRPC(unittest.TestCase): self.assertEqual(result[0]["min"], self.rg_min) self.assertEqual(result[0]["max"], self.rg_max) + @mock.patch('lofar.messaging.messagebus.qpid.messaging') + def test_get_trigger_quota_query(self, qpid_mock): + self.receiver_mock.fetch.return_value = self.qpid_message_get_trigger_quota + + qpid_mock.Message = QpidMessage + qpid_mock.Connection().session().senders = [self.sender_mock] + qpid_mock.Connection().session().next_receiver.return_value = self.receiver_mock + + result = self.momrpc.get_trigger_quota(self.test_id) + + self.assertEqual(result["used_triggers"], self.used_triggers) + self.assertEqual(result["allocated_triggers"], self.allocated_triggers) + + @mock.patch('lofar.messaging.messagebus.qpid.messaging') + def test_update_trigger_quota(self, qpid_mock): + self.receiver_mock.fetch.return_value = self.qpid_message_get_trigger_quota # returns get quota after update + + qpid_mock.Message = QpidMessage + qpid_mock.Connection().session().senders = [self.sender_mock] + qpid_mock.Connection().session().next_receiver.return_value = self.receiver_mock + + result = self.momrpc.get_trigger_quota(self.test_id) + + self.assertEqual(result["used_triggers"], self.used_triggers) + self.assertEqual(result["allocated_triggers"], self.allocated_triggers) + + @mock.patch('lofar.messaging.messagebus.qpid.messaging') + def test_cancel_trigger(self, qpid_mock): + self.receiver_mock.fetch.return_value = self.qpid_message_get_trigger_quota # returns get quota after update + + qpid_mock.Message = QpidMessage + qpid_mock.Connection().session().senders = [self.sender_mock] + qpid_mock.Connection().session().next_receiver.return_value = self.receiver_mock + + result = self.momrpc.get_trigger_quota(self.test_id) + + self.assertEqual(result["used_triggers"], self.used_triggers) + self.assertEqual(result["allocated_triggers"], self.allocated_triggers) + class TestMoMDatabaseWrapper(unittest.TestCase): database_credentials = Credentials() @@ -1402,8 +1523,52 @@ class TestMoMDatabaseWrapper(unittest.TestCase): with self.assertRaises(ValueError): self.mom_database_wrapper.get_time_restrictions(1234) + def test_get_trigger_quota_throws_ValueError_if_query_returns_no_rows(self): + details_result = [] + self.mysql_mock.connect().cursor().fetchall.return_value = details_result + + with self.assertRaises(ValueError): + self.mom_database_wrapper.get_trigger_quota(1234) + + def test_get_trigger_quota_returns_values_from_query_result(self): + used_t = 5 + max_t = 10 + details_result = [{u"used":used_t, u"allocation": max_t}] + expected_result = (used_t, max_t) + self.mysql_mock.connect().cursor().fetchall.return_value = details_result + + result = self.mom_database_wrapper.get_trigger_quota(1234) + self.assertEqual(result, expected_result) + + def test_cancel_trigger_throws_ValueError_if_update_does_not_affect_rows(self): + self.mysql_mock.connect().cursor().rowcount = 0 + with self.assertRaises(ValueError): + self.mom_database_wrapper.cancel_trigger(1234, 'no reason') + + def test_cancel_trigger_does_not_raise_exception_if_queries_affect_rows(self): + self.mysql_mock.connect().cursor().rowcount = 1 + self.mom_database_wrapper.cancel_trigger(1234, 'no reason') + + def test_update_trigger_quota_throws_ValueError_if_select_query_returns_empty_result(self): + # select active trigger count + self.mysql_mock.connect().cursor().fetchall.return_value = [] + with self.assertRaises(ValueError): + self.mom_database_wrapper.update_trigger_quota('myproject') + + def test_update_trigger_quota_throws_ValueError_if_update_query_cannot_modify_any_rows(self): + # update resource use + self.mysql_mock.connect().cursor().fetchall.return_value = [7] # let select pass, to see if update fails + self.mysql_mock.connect().cursor().rowcount = None + with self.assertRaises(ValueError): + self.mom_database_wrapper.update_trigger_quota('myproject') + + def test_update_trigger_quota_does_not_raise_exception_if_select_is_not_empty_and_update_affected_rows(self): + self.mysql_mock.connect().cursor().fetchall.return_value = [7] # let select pass, to see if update fails + self.mysql_mock.connect().cursor().rowcount = 1 + self.mom_database_wrapper.update_trigger_quota('myproject') + -@unittest.skip("Skipping integration test") +#@unittest.skip("Skipping integration test") class IntegrationTestMoMDatabaseWrapper(unittest.TestCase): database_credentials = Credentials() database_credentials.host = "localhost" @@ -1933,6 +2098,88 @@ class IntegrationTestMoMDatabaseWrapper(unittest.TestCase): self.assertEqual(result[1]["resourceGroup"], resource_group2) self.assertEqual(result[1]["min"], rg_min2) + def test_get_trigger_quota_throws_ValueError_on_empty_database(self): + with self.assertRaises(ValueError): + self.mom_database_wrapper.get_trigger_quota(self.project_name) + + def test_get_trigger_quota_returns_correct_quota(self): + used = 5 + allocation = 10 + self.execute("insert into mom2object values(1, NULL, NULL, 2, 'PROJECT', '%(project_name)s', 'test-lofar', " + "NULL, 1704653, NULL, NULL, 0, 0, 0)" % {"project_name": self.project_name}) + self.execute("INSERT INTO resourcetype (id, name, type) VALUES (1, 'Lofar Triggers','LOFAR_TRIGGERS');") + self.execute("insert into resource (resourcetypeid, projectid, used, allocation) " + "values(1, 2, %s, %s)" % (used, allocation)) + + used_t, max_t = self.mom_database_wrapper.get_trigger_quota(self.project_name) + self.assertEqual(used_t, used) + self.assertEqual(allocation, max_t) + + def test_cancel_trigger_throws_ValueError_on_empty_database(self): + with self.assertRaises(ValueError): + self.mom_database_wrapper.cancel_trigger(self.trigger_id, 'because I can') + + def test_cancel_trigger_cancels_trigger(self): + self.execute("insert into useradministration.useraccount " + "values(1, 1, '%s', '26dcf77e2de89027e8895baea8e45057', 'sNgmwwN7fk')" % self.user_name) + self.execute("insert into lofar_trigger (id, username, hostname, projectname, metadata) " + "values (%s, '%s', 'host', 'myproject', 'meta')" % (self.trigger_id, self.user_name)) + + reason = 'because I can' + self.mom_database_wrapper.cancel_trigger(self.trigger_id, reason) + + result = self.execute("SELECT cancelled, cancelled_at, cancelled_reason " + "FROM lofar_trigger WHERE id = %s" % self.trigger_id, fetch=True) + + self.assertEqual(result[0]['cancelled'], 1) + self.assertTrue(type(result[0]['cancelled_at']) is datetime.datetime) + self.assertEqual(result[0]['cancelled_reason'], reason) + + + def test_update_trigger_quota_throws_ValueError_on_empty_database(self): + with self.assertRaises(ValueError): + self.mom_database_wrapper.update_trigger_quota(self.project_name) + + def test_update_trigger_quota_updates_trigger_quota(self): + # add project with trigger resource: + used = 5 + self.execute("insert into mom2object values(1, NULL, NULL, 2, 'PROJECT', '%(project_name)s', 'test-lofar', " + "NULL, 1704653, NULL, NULL, 0, 0, 0)" % {"project_name": self.project_name}) + self.execute("INSERT INTO resourcetype (id, name, type) VALUES (1, 'Lofar Triggers','LOFAR_TRIGGERS');") + self.execute("insert into resource (resourcetypeid, projectid, used, allocation) " + "values(1, 2, %s, 10)" % used) + + # add 3 triggers: + self.execute("insert into useradministration.useraccount " + "values(1, 1, '%s', '26dcf77e2de89027e8895baea8e45057', 'sNgmwwN7fk')" % self.user_name) + self.execute("insert into lofar_trigger (id, username, hostname, projectname, metadata) " + "values (%s, '%s', 'host', '%s', 'meta')" % (self.trigger_id, self.user_name, self.project_name)) + self.execute("insert into lofar_trigger (id, username, hostname, projectname, metadata) " + "values (%s, '%s', 'host', '%s', 'meta')" % (self.trigger_id+1, self.user_name, self.project_name)) + self.execute("insert into lofar_trigger (id, username, hostname, projectname, metadata) " + "values (%s, '%s', 'host', '%s', 'meta')" % (self.trigger_id+2, self.user_name, self.project_name)) + + # check initial value + used_t, max_t = self.mom_database_wrapper.get_trigger_quota(self.project_name) + self.assertEqual(used_t, used) + + # call update + self.mom_database_wrapper.update_trigger_quota(self.project_name) + + # check updated value + used_t, max_t = self.mom_database_wrapper.get_trigger_quota(self.project_name) + self.assertEqual(used_t, 3) + + # cancel one trigger, to see if flagged triggers are not considered in use any more + self.mom_database_wrapper.cancel_trigger(self.trigger_id+1, 'Because.') + + # call update + self.mom_database_wrapper.update_trigger_quota(self.project_name) + + # check updated value + used_t, max_t = self.mom_database_wrapper.get_trigger_quota(self.project_name) + self.assertEqual(used_t, 2) + if __name__ == "__main__": logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) diff --git a/SAS/TriggerServices/lib/trigger_service.py b/SAS/TriggerServices/lib/trigger_service.py index 0c071209fb74a6ceb44008925401890a294317f1..6e9639fd6cdd802ee9b2b054666eef8333430052 100644 --- a/SAS/TriggerServices/lib/trigger_service.py +++ b/SAS/TriggerServices/lib/trigger_service.py @@ -55,6 +55,13 @@ def _auth_allows_triggers(project): response = momqueryrpc.allows_triggers(project) return response['allows'] +def _quota_allows_triggers(project): + response = momqueryrpc.get_trigger_quota(project) + if response['used_triggers'] < response['allocated_triggers']: + return True + else: + return False + def _validate_trigger(trigger_xml): response = validationrpc.validate_trigger_specification(trigger_xml) if not response["valid"]: @@ -130,12 +137,18 @@ class TriggerHandler(MessageHandlerInterface): logger.debug('project priority is ->' + str(priority)) if _auth_allows_triggers(project): - logger.info("trigger is authorized, adding to trigger and specification") - trigger_id = _add_trigger(str(user), host, project, trigger_xml) # todo: How to determine hostname from Qpid message? - logger.debug("Trigger was assigned id -> "+str(trigger_id)) - lofar_xml = _translate_trigger_to_specification(trigger_xml, trigger_id, priority) - logger.debug("Lofar specification is valid!") - _add_specification(user, lofar_xml) + logger.info("trigger is authorized") + if _quota_allows_triggers(project): + logger.info("trigger quota allows adding to trigger and specification") + trigger_id = _add_trigger(str(user), host, project, trigger_xml) # todo: How to determine hostname from Qpid message? + logger.debug("Trigger was assigned id -> "+str(trigger_id)) + lofar_xml = _translate_trigger_to_specification(trigger_xml, trigger_id, priority) + logger.debug("Lofar specification is valid!") + _add_specification(user, lofar_xml) + else: + msg = "Trigger quota exceeded!" + logger.error(msg) + raise Exception(msg) else: msg = "Trigger authorization failed!" logger.error(msg) diff --git a/SAS/TriggerServices/test/t_trigger_service.py b/SAS/TriggerServices/test/t_trigger_service.py index f42340b14c0ec4779b23c257c1c72640a57daf2f..cc825515db91f7b4d8362e63dfe68ec17aa82c30 100644 --- a/SAS/TriggerServices/test/t_trigger_service.py +++ b/SAS/TriggerServices/test/t_trigger_service.py @@ -44,18 +44,14 @@ class TestTriggerHandler(unittest.TestCase): cls.trigger_xml = f.read() with mock.patch('lofar.triggerservices.trigger_service.notification_bus'): - cls.handler = TriggerHandler() - - def test_add_trigger_should_send_notification(self): with mock.patch('lofar.triggerservices.trigger_service._send_notification') as m, \ mock.patch('lofar.triggerservices.trigger_service.momqueryrpc') as momrpc: serv._add_trigger(TEST_USER, TEST_HOST, TEST_PROJECT, self.trigger_xml) m.assert_called_once() - def test_valid_trigger_should_add_specification_and_return_trigger_id(self): with mock.patch('lofar.triggerservices.trigger_service.validationrpc') as valrpc, \ mock.patch('lofar.triggerservices.trigger_service.momqueryrpc') as momrpc, \ @@ -68,6 +64,7 @@ class TestTriggerHandler(unittest.TestCase): #valrpc.validate_specification.return_value = {'valid': True} momrpc.get_project_priority.return_value = {'priority':1} momrpc.add_trigger.return_value = {'row_id': tid} + momrpc.get_trigger_quota.return_value = {'used_triggers': 5, 'allocated_triggers': 6} transrpc.trigger_to_specification.return_value = {'specification':"<specification />"} response = self.handler.handle_trigger(TEST_USER, TEST_HOST, self.trigger_xml) @@ -75,14 +72,12 @@ class TestTriggerHandler(unittest.TestCase): specrpc.add_specification.assert_called_once() self.assertEqual(response['trigger-id'], tid) - def test_invalid_trigger_should_raise_exception(self): with mock.patch('lofar.triggerservices.trigger_service.validationrpc') as valrpc: valrpc.validate_trigger_specification.return_value = {'valid': False} with self.assertRaises(Exception) as exception: self.handler.handle_trigger(TEST_USER, TEST_HOST, self.trigger_xml) - def test_unauthorized_trigger_should_raise_exception(self): with mock.patch('lofar.triggerservices.trigger_service.validationrpc') as valrpc, \ mock.patch('lofar.triggerservices.trigger_service.momqueryrpc') as momrpc, \ @@ -93,6 +88,17 @@ class TestTriggerHandler(unittest.TestCase): with self.assertRaises(Exception) as exception: self.handler.handle_trigger(TEST_USER, TEST_HOST, self.trigger_xml) + def test_trigger_exceeding_quota_should_raise_exception(self): + with mock.patch('lofar.triggerservices.trigger_service.validationrpc') as valrpc, \ + mock.patch('lofar.triggerservices.trigger_service.momqueryrpc') as momrpc, \ + mock.patch('lofar.triggerservices.trigger_service.translationrpc') as transrpc, \ + mock.patch('lofar.triggerservices.trigger_service.specificationrpc') as specrpc: + valrpc.validate_specification.return_value = {'valid': True} + momrpc.allows_triggers.return_value = {'allows': True} + momrpc.get_trigger_quota.return_value = {'used_triggers': 5, 'allocated_triggers': 5} + with self.assertRaises(Exception) as exception: + self.handler.handle_trigger(TEST_USER, TEST_HOST, self.trigger_xml) + if __name__ == '__main__': unittest.main()