Skip to content
Snippets Groups Projects
Commit 62ba0213 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

Task #9607: check for obsolete radb tasks and delete them

parent 1a5dcb61
No related branches found
No related tags found
No related merge requests found
......@@ -92,7 +92,7 @@ class MoMDatabaseWrapper:
def getProjectDetails(self, mom_ids):
''' get the project details (project_mom2id, project_name,
project_description, object_mom2id, object_name, object_description,
object_type, object_group_id, object_group_name) for given mom object mom_ids
object_type, object_group_id, object_group_name, object_status) for given mom object mom_ids
:param mixed mom_ids comma seperated string of mom2object id's, or list of ints
:rtype list of dict's key value pairs with the project details
'''
......@@ -114,10 +114,13 @@ class MoMDatabaseWrapper:
# TODO: make a view for this query in momdb!
query = '''SELECT project.mom2id as project_mom2id, project.id as project_mom2objectid, project.name as project_name, project.description as project_description,
object.mom2id as object_mom2id, object.id as object_mom2objectid, object.name as object_name, object.description as object_description, object.mom2objecttype as object_type, object.group_id as object_group_id, grp.id as object_group_mom2objectid, grp.name as object_group_name
object.mom2id as object_mom2id, object.id as object_mom2objectid, object.name as object_name, object.description as object_description, object.mom2objecttype as object_type, object.group_id as object_group_id, grp.id as object_group_mom2objectid, grp.name as object_group_name,
status.code as object_status
FROM mom2object as object
left join mom2object as project on project.id = object.ownerprojectid
left join mom2object as grp on grp.mom2id = object.group_id
left join mom2objectstatus as mostatus on object.currentstatusid = mostatus.id
inner join status on mostatus.statusid = status.id
where object.mom2id in (%s)
order by project_mom2id
''' % (ids_str,)
......
......@@ -85,6 +85,7 @@ def main():
from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICE_BUSNAME, DEFAULT_OTDB_SERVICENAME
from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_RA_NOTIFICATION_BUSNAME
from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_RA_NOTIFICATION_PREFIX
from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME
# Check the invocation arguments
parser = OptionParser("%prog [options]",
......@@ -118,6 +119,12 @@ def main():
parser.add_option("--ra_notification_prefix", dest="ra_notification_prefix", type="string",
default=DEFAULT_RA_NOTIFICATION_PREFIX,
help="Prefix for the subject of the by the resourceassigner published notification messages. [default: %default]")
parser.add_option('--mom_query_busname', dest='mom_query_busname', type='string',
default=DEFAULT_MOMQUERY_BUSNAME,
help='Name of the bus exchange on the qpid broker on which the momqueryservice listens, default: %default')
parser.add_option('--mom_query_servicename', dest='mom_query_servicename', type='string',
default=DEFAULT_MOMQUERY_SERVICENAME,
help='Name of the momqueryservice, default: %default')
parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging')
(options, args) = parser.parse_args()
......@@ -133,6 +140,8 @@ def main():
otdb_servicename=options.otdb_servicename,
ra_notification_busname=options.ra_notification_busname,
ra_notification_prefix=options.ra_notification_prefix,
mom_busname=options.mom_query_busname,
mom_servicename=options.mom_query_servicename,
broker=options.broker) as assigner:
with SpecifiedTaskListener(busname=options.notification_busname,
subject=options.notification_subject,
......@@ -140,6 +149,8 @@ def main():
assigner=assigner) as listener:
with ScheduleChecker(radb_busname=options.radb_busname,
radb_servicename=options.radb_servicename,
mom_busname=options.mom_query_busname,
mom_servicename=options.mom_query_servicename,
broker=options.broker) as schedulechecker:
waitForInterrupt()
......
......@@ -30,6 +30,9 @@ from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME as DEFAULT_RADB_BUSNAME
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_SERVICENAME as DEFAULT_RADB_SERVICENAME
from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC
from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME
from lofar.sas.resourceassignment.resourceassigner.config import PIPELINE_CHECK_INTERVAL
logger = logging.getLogger(__name__)
......@@ -38,12 +41,15 @@ class ScheduleChecker():
def __init__(self,
radb_busname=DEFAULT_RADB_BUSNAME,
radb_servicename=DEFAULT_RADB_SERVICENAME,
mom_busname=DEFAULT_MOMQUERY_BUSNAME,
mom_servicename=DEFAULT_MOMQUERY_SERVICENAME,
broker=None):
"""
"""
self._thread = None
self._running = False
self._radbrpc = RARPC(servicename=radb_servicename, busname=radb_busname, broker=broker)
self._momrpc = MoMQueryRPC(servicename=mom_servicename, busname=mom_busname, broker=broker)
def __enter__(self):
"""Internal use only. (handles scope 'with')"""
......@@ -57,6 +63,7 @@ class ScheduleChecker():
def start(self):
"""Open rpc connections to radb service and resource estimator service"""
self._radbrpc.open()
self._momrpc.open()
self._running = True
self._thread = Thread(target=self._check_loop)
self._thread.daemon = True
......@@ -65,39 +72,66 @@ class ScheduleChecker():
def stop(self):
"""Close rpc connections to radb service and resource estimator service"""
self._radbrpc.close()
self._momrpc.close()
self._running = False
self._thread.join(60)
def checkRunningPipelines(self):
try:
now = datetime.utcnow()
active_pipelines = self._radbrpc.getTasks(task_status='active', task_type='pipeline')
for task in active_pipelines:
if task['endtime'] <= now:
new_endtime=now+timedelta(seconds=PIPELINE_CHECK_INTERVAL)
logger.info("Extending endtime to %s for pipeline radb_id=%s otdb_id=%s", new_endtime, task['id'], task['otdb_id'])
self._radbrpc.updateTaskAndResourceClaims(task['id'], endtime=new_endtime)
except Exception as e:
logger.error("Error while checking running pipelines: %s", e)
def checkScheduledAndQueuedPipelines(self):
try:
now = datetime.utcnow()
scheduled_pipelines = self._radbrpc.getTasks(task_status='scheduled', task_type='pipeline')
queued_pipelines = self._radbrpc.getTasks(task_status='queued', task_type='pipeline')
sq_pipelines = scheduled_pipelines + queued_pipelines
for task in sq_pipelines:
if task['starttime'] <= now:
logger.info("Moving ahead scheduled pipeline radb_id=%s otdb_id=%s to %s seconds from now", task['id'], task['otdb_id'], PIPELINE_CHECK_INTERVAL)
self._radbrpc.updateTaskAndResourceClaims(task['id'], starttime=now+timedelta(seconds=PIPELINE_CHECK_INTERVAL), endtime=now+timedelta(seconds=PIPELINE_CHECK_INTERVAL+task['duration']))
updated_task = self._radbrpc.getTask(task['id'])
if updated_task['status'] != u'scheduled':
logger.warn("Moving of pipeline radb_id=%s otdb_id=%s caused the status to change to %s", updated_task['id'], updated_task['otdb_id'], updated_task['status'])
#TODO: automatically resolve conflict status by moved pipeline in first free time slot.
except Exception as e:
logger.error("Error while checking scheduled pipelines: %s", e)
def checkApprovedTasks(self):
try:
now = datetime.utcnow()
approved_tasks = self._radbrpc.getTasks(task_status='approved')
for task in approved_tasks:
mom_task_details = self._momrpc.getProjectDetails(task['mom_id'])
if (not mom_task_details or
str(task['mom_id']) not in mom_task_details or
mom_task_details[str(task['mom_id'])]['object_status'] == 'opened'):
logger.info('task %s mom_id=%s otdb_id=%s was removed or set to status opened. removing task from rabd', task['id'], task['mom_id'], task['otdb_id'])
self._radbrpc.deleteSpecification(task['specification_id'])
except Exception as e:
logger.error("Error while checking scheduled pipelines: %s", e)
def _check_loop(self):
while self._running:
try:
now = datetime.utcnow()
active_pipelines = self._radbrpc.getTasks(task_status='active', task_type='pipeline')
for task in active_pipelines:
if task['endtime'] <= now:
new_endtime=now+timedelta(seconds=PIPELINE_CHECK_INTERVAL)
logger.info("Extending endtime to %s for pipeline radb_id=%s otdb_id=%s", new_endtime, task['id'], task['otdb_id'])
self._radbrpc.updateTaskAndResourceClaims(task['id'], endtime=new_endtime)
except Exception as e:
logger.error("Error while checking running pipelines: %s", e)
try:
scheduled_pipelines = self._radbrpc.getTasks(task_status='scheduled', task_type='pipeline')
queued_pipelines = self._radbrpc.getTasks(task_status='queued', task_type='pipeline')
sq_pipelines = scheduled_pipelines + queued_pipelines
for task in sq_pipelines:
if task['starttime'] <= now:
logger.info("Moving ahead scheduled pipeline radb_id=%s otdb_id=%s to %s seconds from now", task['id'], task['otdb_id'], PIPELINE_CHECK_INTERVAL)
self._radbrpc.updateTaskAndResourceClaims(task['id'], starttime=now+timedelta(seconds=PIPELINE_CHECK_INTERVAL), endtime=now+timedelta(seconds=PIPELINE_CHECK_INTERVAL+task['duration']))
updated_task = self._radbrpc.getTask(task['id'])
if updated_task['status'] != u'scheduled':
logger.warn("Moving of pipeline radb_id=%s otdb_id=%s caused the status to change to %s", updated_task['id'], updated_task['otdb_id'], updated_task['status'])
#TODO: automatically resolve conflict status by moved pipeline in first free time slot.
except Exception as e:
logger.error("Error while checking scheduled pipelines: %s", e)
self.checkRunningPipelines()
self.checkScheduledAndQueuedPipelines()
for i in range(PIPELINE_CHECK_INTERVAL):
sleep(1)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment