Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
schedulechecker.py 6.02 KiB
#!/usr/bin/env python

#
# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.    See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#

import qpid.messaging
import logging
from datetime import datetime, timedelta
from time import sleep
from threading import Thread

from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME as DEFAULT_RADB_BUSNAME
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_SERVICENAME as DEFAULT_RADB_SERVICENAME

from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC
from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME

from lofar.sas.resourceassignment.resourceassigner.config import PIPELINE_CHECK_INTERVAL

logger = logging.getLogger(__name__)

class ScheduleChecker():
    def __init__(self,
                 radb_busname=DEFAULT_RADB_BUSNAME,
                 radb_servicename=DEFAULT_RADB_SERVICENAME,
                 mom_busname=DEFAULT_MOMQUERY_BUSNAME,
                 mom_servicename=DEFAULT_MOMQUERY_SERVICENAME,
                 broker=None):
        """
        """
        self._thread = None
        self._running = False
        self._radbrpc = RARPC(servicename=radb_servicename, busname=radb_busname, broker=broker)
        self._momrpc = MoMQueryRPC(servicename=mom_servicename, busname=mom_busname, broker=broker)

    def __enter__(self):
        """Internal use only. (handles scope 'with')"""
        self.start()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Internal use only. (handles scope 'with')"""
        self.stop()

    def start(self):
        """Open rpc connections to radb service and resource estimator service"""
        self._radbrpc.open()
        self._momrpc.open()
        self._running = True
        self._thread = Thread(target=self._check_loop)
        self._thread.daemon = True
        self._thread.start()

    def stop(self):
        """Close rpc connections to radb service and resource estimator service"""
        self._radbrpc.close()
        self._momrpc.close()
        self._running = False
        self._thread.join(60)


    def checkRunningPipelines(self):
        try:
            now = datetime.utcnow()

            active_pipelines = self._radbrpc.getTasks(task_status='active', task_type='pipeline')

            for task in active_pipelines:
                if task['endtime'] <= now:
                    new_endtime=now+timedelta(seconds=PIPELINE_CHECK_INTERVAL)
                    logger.info("Extending endtime to %s for pipeline radb_id=%s otdb_id=%s", new_endtime, task['id'], task['otdb_id'])
                    self._radbrpc.updateTaskAndResourceClaims(task['id'], endtime=new_endtime)
        except Exception as e:
            logger.error("Error while checking running pipelines: %s", e)

    def checkScheduledAndQueuedPipelines(self):
        try:
            now = datetime.utcnow()

            scheduled_pipelines = self._radbrpc.getTasks(task_status='scheduled', task_type='pipeline')
            queued_pipelines = self._radbrpc.getTasks(task_status='queued', task_type='pipeline')
            sq_pipelines = scheduled_pipelines + queued_pipelines

            for task in sq_pipelines:
                if task['starttime'] <= now:
                    logger.info("Moving ahead scheduled pipeline radb_id=%s otdb_id=%s to %s seconds from now", task['id'], task['otdb_id'], PIPELINE_CHECK_INTERVAL)
                    self._radbrpc.updateTaskAndResourceClaims(task['id'], starttime=now+timedelta(seconds=PIPELINE_CHECK_INTERVAL), endtime=now+timedelta(seconds=PIPELINE_CHECK_INTERVAL+task['duration']))
                    updated_task = self._radbrpc.getTask(task['id'])
                    if updated_task['status'] != u'scheduled':
                        logger.warn("Moving of pipeline radb_id=%s otdb_id=%s caused the status to change to %s", updated_task['id'], updated_task['otdb_id'], updated_task['status'])
                        #TODO: automatically resolve conflict status by moved pipeline in first free time slot.
        except Exception as e:
            logger.error("Error while checking scheduled pipelines: %s", e)

    def checkApprovedTasks(self):
        try:
            now = datetime.utcnow()

            approved_tasks = self._radbrpc.getTasks(task_status='approved')

            for task in approved_tasks:
                mom_task_details = self._momrpc.getProjectDetails(task['mom_id'])

                if (not mom_task_details or
                    str(task['mom_id']) not in mom_task_details or
                    mom_task_details[str(task['mom_id'])]['object_status'] == 'opened'):
                    logger.info('task %s mom_id=%s otdb_id=%s was removed or set to status opened. removing task from rabd', task['id'], task['mom_id'], task['otdb_id'])
                    self._radbrpc.deleteSpecification(task['specification_id'])

        except Exception as e:
            logger.error("Error while checking scheduled pipelines: %s", e)

    def _check_loop(self):
        while self._running:
            self.checkRunningPipelines()
            self.checkScheduledAndQueuedPipelines()

            for i in range(PIPELINE_CHECK_INTERVAL):
                sleep(1)

                if not self._running:
                    break