diff --git a/SAS/MoM/MoMQueryService/momqueryservice.py b/SAS/MoM/MoMQueryService/momqueryservice.py index 179d568706e0388e00f88370defd3f3abd4e1e7f..af8fdb6b26e335de389b6191cd8a534e398849cf 100755 --- a/SAS/MoM/MoMQueryService/momqueryservice.py +++ b/SAS/MoM/MoMQueryService/momqueryservice.py @@ -92,7 +92,7 @@ class MoMDatabaseWrapper: def getProjectDetails(self, mom_ids): ''' get the project details (project_mom2id, project_name, project_description, object_mom2id, object_name, object_description, - object_type, object_group_id, object_group_name) for given mom object mom_ids + object_type, object_group_id, object_group_name, object_status) for given mom object mom_ids :param mixed mom_ids comma seperated string of mom2object id's, or list of ints :rtype list of dict's key value pairs with the project details ''' @@ -114,10 +114,13 @@ class MoMDatabaseWrapper: # TODO: make a view for this query in momdb! query = '''SELECT project.mom2id as project_mom2id, project.id as project_mom2objectid, project.name as project_name, project.description as project_description, - object.mom2id as object_mom2id, object.id as object_mom2objectid, object.name as object_name, object.description as object_description, object.mom2objecttype as object_type, object.group_id as object_group_id, grp.id as object_group_mom2objectid, grp.name as object_group_name + object.mom2id as object_mom2id, object.id as object_mom2objectid, object.name as object_name, object.description as object_description, object.mom2objecttype as object_type, object.group_id as object_group_id, grp.id as object_group_mom2objectid, grp.name as object_group_name, + status.code as object_status FROM mom2object as object left join mom2object as project on project.id = object.ownerprojectid left join mom2object as grp on grp.mom2id = object.group_id + left join mom2objectstatus as mostatus on object.currentstatusid = mostatus.id + inner join status on mostatus.statusid = status.id where object.mom2id in (%s) order by project_mom2id ''' % (ids_str,) diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/raservice.py b/SAS/ResourceAssignment/ResourceAssigner/lib/raservice.py index 18aa2f2733628416e3d026e2ea51147567c85ec8..775d484b03e631d9eaab18b680353d8f88056aa5 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/raservice.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/raservice.py @@ -85,6 +85,7 @@ def main(): from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICE_BUSNAME, DEFAULT_OTDB_SERVICENAME from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_RA_NOTIFICATION_BUSNAME from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_RA_NOTIFICATION_PREFIX + from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME # Check the invocation arguments parser = OptionParser("%prog [options]", @@ -118,6 +119,12 @@ def main(): parser.add_option("--ra_notification_prefix", dest="ra_notification_prefix", type="string", default=DEFAULT_RA_NOTIFICATION_PREFIX, help="Prefix for the subject of the by the resourceassigner published notification messages. [default: %default]") + parser.add_option('--mom_query_busname', dest='mom_query_busname', type='string', + default=DEFAULT_MOMQUERY_BUSNAME, + help='Name of the bus exchange on the qpid broker on which the momqueryservice listens, default: %default') + parser.add_option('--mom_query_servicename', dest='mom_query_servicename', type='string', + default=DEFAULT_MOMQUERY_SERVICENAME, + help='Name of the momqueryservice, default: %default') parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging') (options, args) = parser.parse_args() @@ -133,6 +140,8 @@ def main(): otdb_servicename=options.otdb_servicename, ra_notification_busname=options.ra_notification_busname, ra_notification_prefix=options.ra_notification_prefix, + mom_busname=options.mom_query_busname, + mom_servicename=options.mom_query_servicename, broker=options.broker) as assigner: with SpecifiedTaskListener(busname=options.notification_busname, subject=options.notification_subject, @@ -140,6 +149,8 @@ def main(): assigner=assigner) as listener: with ScheduleChecker(radb_busname=options.radb_busname, radb_servicename=options.radb_servicename, + mom_busname=options.mom_query_busname, + mom_servicename=options.mom_query_servicename, broker=options.broker) as schedulechecker: waitForInterrupt() diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/schedulechecker.py b/SAS/ResourceAssignment/ResourceAssigner/lib/schedulechecker.py index c56572dfb8cc4df464b22a0f1d96db9b2254158f..5a52e3e14a4f9712f642cc4b1324d10faaa75a3d 100644 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/schedulechecker.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/schedulechecker.py @@ -30,6 +30,9 @@ from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME as DEFAULT_RADB_BUSNAME from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_SERVICENAME as DEFAULT_RADB_SERVICENAME +from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC +from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME + from lofar.sas.resourceassignment.resourceassigner.config import PIPELINE_CHECK_INTERVAL logger = logging.getLogger(__name__) @@ -38,12 +41,15 @@ class ScheduleChecker(): def __init__(self, radb_busname=DEFAULT_RADB_BUSNAME, radb_servicename=DEFAULT_RADB_SERVICENAME, + mom_busname=DEFAULT_MOMQUERY_BUSNAME, + mom_servicename=DEFAULT_MOMQUERY_SERVICENAME, broker=None): """ """ self._thread = None self._running = False self._radbrpc = RARPC(servicename=radb_servicename, busname=radb_busname, broker=broker) + self._momrpc = MoMQueryRPC(servicename=mom_servicename, busname=mom_busname, broker=broker) def __enter__(self): """Internal use only. (handles scope 'with')""" @@ -57,6 +63,7 @@ class ScheduleChecker(): def start(self): """Open rpc connections to radb service and resource estimator service""" self._radbrpc.open() + self._momrpc.open() self._running = True self._thread = Thread(target=self._check_loop) self._thread.daemon = True @@ -65,39 +72,66 @@ class ScheduleChecker(): def stop(self): """Close rpc connections to radb service and resource estimator service""" self._radbrpc.close() + self._momrpc.close() self._running = False self._thread.join(60) + + def checkRunningPipelines(self): + try: + now = datetime.utcnow() + + active_pipelines = self._radbrpc.getTasks(task_status='active', task_type='pipeline') + + for task in active_pipelines: + if task['endtime'] <= now: + new_endtime=now+timedelta(seconds=PIPELINE_CHECK_INTERVAL) + logger.info("Extending endtime to %s for pipeline radb_id=%s otdb_id=%s", new_endtime, task['id'], task['otdb_id']) + self._radbrpc.updateTaskAndResourceClaims(task['id'], endtime=new_endtime) + except Exception as e: + logger.error("Error while checking running pipelines: %s", e) + + def checkScheduledAndQueuedPipelines(self): + try: + now = datetime.utcnow() + + scheduled_pipelines = self._radbrpc.getTasks(task_status='scheduled', task_type='pipeline') + queued_pipelines = self._radbrpc.getTasks(task_status='queued', task_type='pipeline') + sq_pipelines = scheduled_pipelines + queued_pipelines + + for task in sq_pipelines: + if task['starttime'] <= now: + logger.info("Moving ahead scheduled pipeline radb_id=%s otdb_id=%s to %s seconds from now", task['id'], task['otdb_id'], PIPELINE_CHECK_INTERVAL) + self._radbrpc.updateTaskAndResourceClaims(task['id'], starttime=now+timedelta(seconds=PIPELINE_CHECK_INTERVAL), endtime=now+timedelta(seconds=PIPELINE_CHECK_INTERVAL+task['duration'])) + updated_task = self._radbrpc.getTask(task['id']) + if updated_task['status'] != u'scheduled': + logger.warn("Moving of pipeline radb_id=%s otdb_id=%s caused the status to change to %s", updated_task['id'], updated_task['otdb_id'], updated_task['status']) + #TODO: automatically resolve conflict status by moved pipeline in first free time slot. + except Exception as e: + logger.error("Error while checking scheduled pipelines: %s", e) + + def checkApprovedTasks(self): + try: + now = datetime.utcnow() + + approved_tasks = self._radbrpc.getTasks(task_status='approved') + + for task in approved_tasks: + mom_task_details = self._momrpc.getProjectDetails(task['mom_id']) + + if (not mom_task_details or + str(task['mom_id']) not in mom_task_details or + mom_task_details[str(task['mom_id'])]['object_status'] == 'opened'): + logger.info('task %s mom_id=%s otdb_id=%s was removed or set to status opened. removing task from rabd', task['id'], task['mom_id'], task['otdb_id']) + self._radbrpc.deleteSpecification(task['specification_id']) + + except Exception as e: + logger.error("Error while checking scheduled pipelines: %s", e) + def _check_loop(self): while self._running: - try: - now = datetime.utcnow() - - active_pipelines = self._radbrpc.getTasks(task_status='active', task_type='pipeline') - - for task in active_pipelines: - if task['endtime'] <= now: - new_endtime=now+timedelta(seconds=PIPELINE_CHECK_INTERVAL) - logger.info("Extending endtime to %s for pipeline radb_id=%s otdb_id=%s", new_endtime, task['id'], task['otdb_id']) - self._radbrpc.updateTaskAndResourceClaims(task['id'], endtime=new_endtime) - except Exception as e: - logger.error("Error while checking running pipelines: %s", e) - - try: - scheduled_pipelines = self._radbrpc.getTasks(task_status='scheduled', task_type='pipeline') - queued_pipelines = self._radbrpc.getTasks(task_status='queued', task_type='pipeline') - sq_pipelines = scheduled_pipelines + queued_pipelines - - for task in sq_pipelines: - if task['starttime'] <= now: - logger.info("Moving ahead scheduled pipeline radb_id=%s otdb_id=%s to %s seconds from now", task['id'], task['otdb_id'], PIPELINE_CHECK_INTERVAL) - self._radbrpc.updateTaskAndResourceClaims(task['id'], starttime=now+timedelta(seconds=PIPELINE_CHECK_INTERVAL), endtime=now+timedelta(seconds=PIPELINE_CHECK_INTERVAL+task['duration'])) - updated_task = self._radbrpc.getTask(task['id']) - if updated_task['status'] != u'scheduled': - logger.warn("Moving of pipeline radb_id=%s otdb_id=%s caused the status to change to %s", updated_task['id'], updated_task['otdb_id'], updated_task['status']) - #TODO: automatically resolve conflict status by moved pipeline in first free time slot. - except Exception as e: - logger.error("Error while checking scheduled pipelines: %s", e) + self.checkRunningPipelines() + self.checkScheduledAndQueuedPipelines() for i in range(PIPELINE_CHECK_INTERVAL): sleep(1)