diff --git a/SAS/OTDB_Services/TreeService.py b/SAS/OTDB_Services/TreeService.py index ecfa83eaecdbcae74f794889889d4de04b781a52..78f3e15fcf5a0344ad820dc5a4c1f7b39b996144 100755 --- a/SAS/OTDB_Services/TreeService.py +++ b/SAS/OTDB_Services/TreeService.py @@ -25,9 +25,10 @@ Daemon that sets-up a set of servicess for the OTDB database. RPC functions that allow access to (VIC) trees in OTDB. -TaskSpecificationRequest: get the specification(parset) of a tree as dict. +TaskGetSpecification : get the specification(parset) of a task as dict. +TaskSetSpecification : set the specification(parset) of a task. KeyUpdateCommand : function to update the value of multiple (existing) keys. -StatusUpdateCommand : finction to update the status of a tree. +StatusUpdateCommand : finction to update the status of a task. """ import sys, time, pg @@ -48,8 +49,56 @@ class DatabaseError(Exception): "Connection with the database could not be made" pass -# Task Specification Request -def TaskSpecificationRequest(input_dict, db_connection): +# Task Get IDs +def TaskGetIDs(input_dict, db_connection, return_tuple=True): + """ + RPC function that returns the MoMID and OTDBid from the user input, verifies the input in the database. + + Input : dict with either the key OtdbID (integer) or the key MoMID (integer). + This key is used to search for the specified tree. + return_tuple (bool) : Tuples can not be send with QPID, but for internal use we prefer them + + Output: (task_found, otdb_id, mom_id) : + When task does not exist than otdb_id and mom_id contain the user defined values + When task exists then otdb_id and mom_id contain the real ids (retrieved from the database) + + Exceptions: + AttributeError: Input not conform the specs + """ + # Get task identifier: OtdbId or MoMID + if not isinstance(input_dict, dict): + raise AttributeError("Expected a dict as input") + + if 'OtdbID' not in input_dict and 'MoMID' not in input_dict: + raise AttributeError("Need 'OtdbID' or 'MoMID' key for task retrieval") + + otdb_id = input_dict.get('OtdbID', None) + mom_id = input_dict.get('MoMID', None) + + # Try to get the taskInfo (to find out whether or not the task already exists. + if otdb_id is not None: + try: + (real_otdb_id, real_mom_id) =\ + db_connection.query("select treeid,momid from getTreeInfo({}, False)".format(otdb_id)).getresult()[0] + return (True, real_otdb_id, real_mom_id) if return_tuple else [True, real_otdb_id, real_mom_id] + except QUERY_EXCEPTIONS: + pass + + # Task not found on otdb_id, try mom_id + if mom_id is not None: + try: + (real_otdb_id, real_mom_id) =\ + db_connection.query("select treeid,momid from getTreeInfo({}, True)".format(mom_id)).getresult()[0] + return (True, real_otdb_id, real_mom_id) if return_tuple else [True, real_otdb_id, real_mom_id] + except QUERY_EXCEPTIONS: + pass + + # Task not found in any way return input values to the user + return (False, otdb_id, mom_id) if return_tuple else [False, otdb_id, mom_id] + + +# Task Get Specification +def TaskGetSpecification(input_dict, db_connection): """ RPC function that retrieves the task specification from a tree. @@ -63,15 +112,15 @@ def TaskSpecificationRequest(input_dict, db_connection): """ # Check the input if not isinstance(input_dict, dict): - raise AttributeError("TaskSpecificationRequest: Expected a dict as input") + raise AttributeError("TaskGetSpecification: Expected a dict as input") try: tree_id = input_dict['OtdbID'] except KeyError, info: - raise AttributeError("TaskSpecificationRequest: Key %s is missing in the input" % info) + raise AttributeError("TaskGetSpecification: Key %s is missing in the input" % info) # Try to get the specification information try: - logger.info("TaskSpecificationRequest:%s" % input_dict) + logger.info("TaskGetSpecification:%s" % input_dict) top_node = db_connection.query("select nodeid from getTopNode('%s')" % tree_id).getresult()[0][0] treeinfo = db_connection.query("select exportTree(1, '%s', '%s')" % (tree_id, top_node)).getresult()[0][0] except QUERY_EXCEPTIONS, exc_info: @@ -103,6 +152,7 @@ def TaskSetSpecification(input_dict, db_connection): Input : dict with either the key OtdbID (integer) or the key MoMID (integer). This key is used to search for the specified tree. + TemplateName - Optional: Needed when the task doesn't exist and has to be created. Updates (dict) - The key-value pairs that must be updated. Implemented workflow (all checks for errors are left out): @@ -122,29 +172,16 @@ def TaskSetSpecification(input_dict, db_connection): FunctionError: An error occurred during the execution of the function. The text of the exception explains what is wrong. """ - # Get task identifier: OtdbId or MoMID - if not isinstance(input_dict, dict): - raise AttributeError("TaskSetSpecification: Expected a dict as input") - if 'OtdbID' not in input_dict and 'MoMID' not in input_dict: - raise AttributeError("TaskSetSpecificationRequest: Need 'OtdbID' or 'MoMID' key for task retrieval") - otdb_id = input_dict.get('OtdbID', None) - mom_id = input_dict.get('MoMID', None) - logger.info("TaskSetSpecification: otdb_id={}, mom_id={}".format(otdb_id, mom_id)) - - # Try to get the taskInfo (to find out whether or not the task already exists. - # but first unify information we have. - tree_id = otdb_id if otdb_id is not None else mom_id - is_mom_id = mom_id is not None - try: - treeid = db_connection.query("select treeid from getTreeInfo({}, {})".format(tree_id, is_mom_id)).getresult()[0][0] - # tree exists, no extra actions to take. + # Solve ID(s) that the user may have specified and return the validated values. + (found_task, otdb_id, mom_id) = TaskGetIDs(input_dict, db_connection) # throws on missing input - except QUERY_EXCEPTIONS, exc_info: - # tree does not exist, we have to create it from a default template when the tree was specified with a MoMID! - # Search for the name... - if not is_mom_id: - raise FunctionError("Task with OTDBid {} does not exist".format(otdb_id)) + # when otdb_id = None task is not in the database + # if we searched on OtdbID and the task is not found then is it end-of-story + if not found_task and otdb_id is not None: + raise FunctionError("Task with OtdbID/MoMID {}/{} does not exist".format(otdb_id, mom_id)) + # if we searched on MomID and the task is not found that we try to create a task(template) + if not found_task and mom_id is not None: selected_template = input_dict.get('TemplateName', None) if selected_template is None: raise AttributeError("TaskSetSpecification: Need 'TemplateName' key to create a task") @@ -160,58 +197,28 @@ def TaskSetSpecification(input_dict, db_connection): if len(template_ids) != 1: raise FunctionError("Programming error: matching task_ids for template {} are {}".\ format(selected_template, template_ids)) - treeid = db_connection.query("select copyTree(1,{})".format(template_ids[0])).getresult()[0][0] - print "###NEW_TREE=",treeid + otdb_id = db_connection.query("select copyTree(1,{})".format(template_ids[0])).getresult()[0][0] + print "###NEW_TREE=",otdb_id # give new tree the mom_id when mom_id was specified by the user. if is_mom_id: - success = db_connection.query("select setMomInfo(1,{},{},0,'no campaign')".format(treeid, mom_id)).getresult()[0] + db_connection.query("select setMomInfo(1,{},{},0,'no campaign')".format(treeid, mom_id)) except QUERY_EXCEPTIONS, exc_info: raise FunctionError("Error while create task from template {}: {}".format(selected_template, exc_info)) - # Do the key updates + # When we are here we always have a task, so do the key updates try: - update_result = KeyUpdateCommand({'OtdbID':treeid, 'Updates':input_dict['Updates']}, db_connection, + update_result = KeyUpdateCommand({'OtdbID':otdb_id, 'Updates':input_dict['Updates']}, db_connection, always_return_result_dict=True) except (AttributeError, FunctionError), exc_info: update_result = exc_info answer = {} - answer['OtdbID'] = treeid + answer['OtdbID'] = otdb_id answer['MoMID'] = mom_id answer['Errors'] = update_result return answer - return { 'result': "not completely implemented yet, treeid={}, type={}, state={}".format(treeid, treetype, treestate)} - - - # Try to get the specification information - try: - logger.info("TaskSpecificationRequest:%s" % input_dict) - top_node = db_connection.query("select nodeid from getTopNode('%s')" % tree_id).getresult()[0][0] - treeinfo = db_connection.query("select exportTree(1, '%s', '%s')" % (tree_id, top_node)).getresult()[0][0] - except QUERY_EXCEPTIONS, exc_info: - raise FunctionError("Error while requesting specs of tree %d: %s"% (tree_id, exc_info)) - # When the query was succesfull 'treeinfo' is now a string that contains many 'key = value' lines seperated - # with newlines. To make it more usable for the user we convert that into a dict... - - # Note: a PIC tree is a list of keys while a Template tree and a VIC tree is a list of key-values. - # Since we don't know what kind of tree was requested we assume a Template/VIC tree (most likely) - # but if this ends in exceptions we fall back to a PIC tree. - answer_dict = {} - answer_list = [] - for line in treeinfo.split('\n'): # make seperate lines of it. - try: - # assume a 'key = value' line - (key, value) = line.split("=", 1) - answer_dict[key] = value - except ValueError: - # oops, no '=' on the line, must be a PIC tree that was queried: make a list iso a dict - answer_list.append(line) - if len(answer_list) > 1: # there is always one empty line, ignore that one... - answer_dict["tree"] = answer_list - return answer_dict - # Status Update Command def StatusUpdateCommand(input_dict, db_connection): """ @@ -318,6 +325,83 @@ def KeyUpdateCommand(input_dict, db_connection, always_return_result_dict=False) raise FunctionError(("Not all key were updated:", errors)) return errors +# Task Prepare For Scheduling +def TaskPrepareForScheduling(input_dict, db_connection): + """ + RPC function to close the definition fase and make the task schedulable (by converting the template task to an VIC task) + + Input : OtdbID (integer) - ID of the task to change the status of. + StartTime (string) - Proposes starttime (optional) + StopTime (string) - Proposed endtime (optional) + + Output: OtdbID (integer) - ID of 'schedulable' task + + Exceptions: + AttributeError: There is something wrong with the given input values. + FunctionError: An error occurred during the execution of the function. + The text of the exception explains what is wrong. + """ + # Solve ID(s) that the user may have specified and return the validated values. + (found_task, otdb_id, mom_id) = TaskGetIDs(input_dict, db_connection) # throws on missing input + + # if task i not found it is end of story. + if not found_task: + raise FunctionError("Task with OtdbID/MoMID {}/{} does not exist".format(otdb_id, mom_id)) + + # get the information of the task + try: + (task_id,task_type,task_state) = db_connection.query("select treeid,type,state from getTreeInfo({},False)"\ + .format(otdb_id)).getresult()[0] + except QUERY_EXCEPTIONS, exc_info: + raise FunctionError("TaskPrepareForScheduling: {}".format(exc_info)) + + # Check type + # Get list of defined types + type_names = {} + type_nrs = {} + try: + for (nr, name) in db_connection.query("select id,name from treetype").getresult(): + type_names[name] = nr + type_nrs[nr] = name + except QUERY_EXCEPTIONS, exc_info: + raise FunctionError("Error while getting list of task types for tree {}: {}".format(otdb_id, exc_info)) + + # Tree may not be of the type 'hardware' + if task_type == type_names['hardware']: + raise FunctionError("TaskPrepareForScheduling: Task {} has the wrong type ('hardware')".format(task_id)) + + # If task is of the type VItemplate convert it to a VHtree + if task_type == type_names['VItemplate']: + try: + # create executable task + new_task_id = db_connection.query("select instanciateVHtree(1,{})".format(task_id)).getresult()[0][0] + # get the characteristics + (task_id, task_type, task_state) = db_connection.query("select treeid,type,state from getTreeInfo({},False)"\ + .format(new_task_id)).getresult()[0] + except QUERY_EXCEPTIONS, exc_info: + raise FunctionError("TaskPrepareForScheduling: failed for task {}: {}"\ + .format(otdb_id, exc_info)) + + # Get list of defines tree states + state_names = {} + state_nrs = {} + try: + for (nr, name) in db_connection.query("select id,name from treestate").getresult(): + state_names[name] = nr + state_nrs[nr] = name + except QUERY_EXCEPTIONS, exc_info: + raise FunctionError("Error while getting list of task states for tree {}: {}".format(otdb_id, exc_info)) + + # make sure the tree is in the right state + if task_state != state_names['approved']: + try: + db_connection.query("select setTreeState(1,{},{}::INT2,True)".format(task_id, state_names['approved'])) + except QUERY_EXCEPTIONS, exc_info: + raise FunctionError("Error while setting task {} to 'approved': {}".format(task_id, exc_info)) + # QPID can't return an integer, make a list of it. + return [task_id] + + class PostgressMessageHandler(MessageHandlerInterface): """ @@ -337,10 +421,12 @@ class PostgressMessageHandler(MessageHandlerInterface): self.connected = False self.service2MethodMap = { - "TaskSpecification": self._TaskSpecificationRequest, - "StatusUpdateCmd": self._StatusUpdateCommand, - "KeyUpdateCmd": self._KeyUpdateCommand, - "TaskSetSpecification": self._TaskSetSpecification + "TaskGetSpecification": self._TaskGetSpecification, + "TaskSetSpecification": self._TaskSetSpecification, + "StatusUpdateCmd": self._StatusUpdateCommand, + "KeyUpdateCmd": self._KeyUpdateCommand, + "TaskPrepareForScheduling": self._TaskPrepareForScheduling, + "TaskGetIDs": self._TaskGetIDs } def prepare_receive(self): @@ -357,9 +443,14 @@ class PostgressMessageHandler(MessageHandlerInterface): logger.error("Not connected to database %s, retry in 5 seconds: %s" % (self.dbcreds, e)) time.sleep(5) - def _TaskSpecificationRequest(self, **kwargs): - logger.info("_TaskSpecificationRequest({})".format(kwargs)) - return TaskSpecificationRequest(kwargs, self.connection) + # The following functions are called from the Service code. + def _TaskGetSpecification(self, **kwargs): + logger.info("_TaskGetSpecification({})".format(kwargs)) + return TaskGetSpecification(kwargs, self.connection) + + def _TaskSetSpecification(self, **kwargs): + logger.info("_TaskSetSpecification({})".format(kwargs)) + return TaskSetSpecification(kwargs, self.connection) def _StatusUpdateCommand(self, **kwargs): logger.info("_StatusUpdateCommand({})".format(kwargs)) @@ -369,22 +460,13 @@ class PostgressMessageHandler(MessageHandlerInterface): logger.info("_KeyUpdateCommand({})".format(kwargs)) return KeyUpdateCommand(kwargs, self.connection) - def _TaskSetSpecification(self, **kwargs): - logger.info("_TaskSetSpecification({})".format(kwargs)) - return TaskSetSpecification(kwargs, self.connection) + def _TaskPrepareForScheduling(self, **kwargs): + logger.info("_TaskPrepareForScheduling({})".format(kwargs)) + return TaskPrepareForScheduling(kwargs, self.connection) - - -#class PostgressKeyUpdateCommand(PostgressMessageHandlerInterface): -# """ -# Embedding of the TaskSpecificationRequest function in the postgress service class. -# """ -# def __init__(self, **kwargs): -# super(PostgressKeyUpdateCommand, self).__init__(**kwargs) -# -# def handle_message(self, **msg): -# " Connect to the right function" -# return KeyUpdateCommand(msg, self.connection) + def _TaskGetIDs(self, **kwargs): + logger.info("_TaskGetIDs({})".format(kwargs)) + return TaskGetIDs(kwargs, self.connection, return_tuple=False) if __name__ == "__main__":