-
Jorrit Schaap authoredJorrit Schaap authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
schedulechecker.py 6.02 KiB
#!/usr/bin/env python
#
# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
import qpid.messaging
import logging
from datetime import datetime, timedelta
from time import sleep
from threading import Thread
from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME as DEFAULT_RADB_BUSNAME
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_SERVICENAME as DEFAULT_RADB_SERVICENAME
from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC
from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME
from lofar.sas.resourceassignment.resourceassigner.config import PIPELINE_CHECK_INTERVAL
logger = logging.getLogger(__name__)
class ScheduleChecker():
def __init__(self,
radb_busname=DEFAULT_RADB_BUSNAME,
radb_servicename=DEFAULT_RADB_SERVICENAME,
mom_busname=DEFAULT_MOMQUERY_BUSNAME,
mom_servicename=DEFAULT_MOMQUERY_SERVICENAME,
broker=None):
"""
"""
self._thread = None
self._running = False
self._radbrpc = RARPC(servicename=radb_servicename, busname=radb_busname, broker=broker)
self._momrpc = MoMQueryRPC(servicename=mom_servicename, busname=mom_busname, broker=broker)
def __enter__(self):
"""Internal use only. (handles scope 'with')"""
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Internal use only. (handles scope 'with')"""
self.stop()
def start(self):
"""Open rpc connections to radb service and resource estimator service"""
self._radbrpc.open()
self._momrpc.open()
self._running = True
self._thread = Thread(target=self._check_loop)
self._thread.daemon = True
self._thread.start()
def stop(self):
"""Close rpc connections to radb service and resource estimator service"""
self._radbrpc.close()
self._momrpc.close()
self._running = False
self._thread.join(60)
def checkRunningPipelines(self):
try:
now = datetime.utcnow()
active_pipelines = self._radbrpc.getTasks(task_status='active', task_type='pipeline')
for task in active_pipelines:
if task['endtime'] <= now:
new_endtime=now+timedelta(seconds=PIPELINE_CHECK_INTERVAL)
logger.info("Extending endtime to %s for pipeline radb_id=%s otdb_id=%s", new_endtime, task['id'], task['otdb_id'])
self._radbrpc.updateTaskAndResourceClaims(task['id'], endtime=new_endtime)
except Exception as e:
logger.error("Error while checking running pipelines: %s", e)
def checkScheduledAndQueuedPipelines(self):
try:
now = datetime.utcnow()
scheduled_pipelines = self._radbrpc.getTasks(task_status='scheduled', task_type='pipeline')
queued_pipelines = self._radbrpc.getTasks(task_status='queued', task_type='pipeline')
sq_pipelines = scheduled_pipelines + queued_pipelines
for task in sq_pipelines:
if task['starttime'] <= now:
logger.info("Moving ahead scheduled pipeline radb_id=%s otdb_id=%s to %s seconds from now", task['id'], task['otdb_id'], PIPELINE_CHECK_INTERVAL)
self._radbrpc.updateTaskAndResourceClaims(task['id'], starttime=now+timedelta(seconds=PIPELINE_CHECK_INTERVAL), endtime=now+timedelta(seconds=PIPELINE_CHECK_INTERVAL+task['duration']))
updated_task = self._radbrpc.getTask(task['id'])
if updated_task['status'] != u'scheduled':
logger.warn("Moving of pipeline radb_id=%s otdb_id=%s caused the status to change to %s", updated_task['id'], updated_task['otdb_id'], updated_task['status'])
#TODO: automatically resolve conflict status by moved pipeline in first free time slot.
except Exception as e:
logger.error("Error while checking scheduled pipelines: %s", e)
def checkApprovedTasks(self):
try:
now = datetime.utcnow()
approved_tasks = self._radbrpc.getTasks(task_status='approved')
for task in approved_tasks:
mom_task_details = self._momrpc.getProjectDetails(task['mom_id'])
if (not mom_task_details or
str(task['mom_id']) not in mom_task_details or
mom_task_details[str(task['mom_id'])]['object_status'] == 'opened'):
logger.info('task %s mom_id=%s otdb_id=%s was removed or set to status opened. removing task from rabd', task['id'], task['mom_id'], task['otdb_id'])
self._radbrpc.deleteSpecification(task['specification_id'])
except Exception as e:
logger.error("Error while checking scheduled pipelines: %s", e)
def _check_loop(self):
while self._running:
self.checkRunningPipelines()
self.checkScheduledAndQueuedPipelines()
for i in range(PIPELINE_CHECK_INTERVAL):
sleep(1)
if not self._running:
break