#!/usr/bin/env python # # Copyright (C) 2015 # ASTRON (Netherlands Institute for Radio Astronomy) # P.O.Box 2, 7990 AA Dwingeloo, The Netherlands # # This file is part of the LOFAR software suite. # The LOFAR software suite is free software: you can redistribute it # and/or modify it under the terms of the GNU General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # The LOFAR software suite is distributed in the hope that it will be # useful, but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. # import qpid.messaging import logging from datetime import datetime, timedelta from time import sleep from threading import Thread from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME as DEFAULT_RADB_BUSNAME from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_SERVICENAME as DEFAULT_RADB_SERVICENAME from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME from lofar.sas.resourceassignment.resourceassigner.config import PIPELINE_CHECK_INTERVAL logger = logging.getLogger(__name__) class ScheduleChecker(): def __init__(self, radb_busname=DEFAULT_RADB_BUSNAME, radb_servicename=DEFAULT_RADB_SERVICENAME, mom_busname=DEFAULT_MOMQUERY_BUSNAME, mom_servicename=DEFAULT_MOMQUERY_SERVICENAME, broker=None): """ """ self._thread = None self._running = False self._radbrpc = RARPC(servicename=radb_servicename, busname=radb_busname, broker=broker) self._momrpc = MoMQueryRPC(servicename=mom_servicename, busname=mom_busname, broker=broker) def __enter__(self): """Internal use only. (handles scope 'with')""" self.start() return self def __exit__(self, exc_type, exc_val, exc_tb): """Internal use only. (handles scope 'with')""" self.stop() def start(self): """Open rpc connections to radb service and resource estimator service""" self._radbrpc.open() self._momrpc.open() self._running = True self._thread = Thread(target=self._check_loop) self._thread.daemon = True self._thread.start() def stop(self): """Close rpc connections to radb service and resource estimator service""" self._radbrpc.close() self._momrpc.close() self._running = False self._thread.join(60) def checkRunningPipelines(self): try: now = datetime.utcnow() active_pipelines = self._radbrpc.getTasks(task_status='active', task_type='pipeline') for task in active_pipelines: if task['endtime'] <= now: new_endtime=now+timedelta(seconds=PIPELINE_CHECK_INTERVAL) logger.info("Extending endtime to %s for pipeline radb_id=%s otdb_id=%s", new_endtime, task['id'], task['otdb_id']) self._radbrpc.updateTaskAndResourceClaims(task['id'], endtime=new_endtime) except Exception as e: logger.error("Error while checking running pipelines: %s", e) def checkScheduledAndQueuedPipelines(self): try: now = datetime.utcnow() scheduled_pipelines = self._radbrpc.getTasks(task_status='scheduled', task_type='pipeline') queued_pipelines = self._radbrpc.getTasks(task_status='queued', task_type='pipeline') sq_pipelines = scheduled_pipelines + queued_pipelines for task in sq_pipelines: if task['starttime'] <= now: logger.info("Moving ahead scheduled pipeline radb_id=%s otdb_id=%s to %s seconds from now", task['id'], task['otdb_id'], PIPELINE_CHECK_INTERVAL) self._radbrpc.updateTaskAndResourceClaims(task['id'], starttime=now+timedelta(seconds=PIPELINE_CHECK_INTERVAL), endtime=now+timedelta(seconds=PIPELINE_CHECK_INTERVAL+task['duration'])) updated_task = self._radbrpc.getTask(task['id']) if updated_task['status'] != u'scheduled': logger.warn("Moving of pipeline radb_id=%s otdb_id=%s caused the status to change to %s", updated_task['id'], updated_task['otdb_id'], updated_task['status']) #TODO: automatically resolve conflict status by moved pipeline in first free time slot. except Exception as e: logger.error("Error while checking scheduled pipelines: %s", e) def checkApprovedTasks(self): try: now = datetime.utcnow() approved_tasks = self._radbrpc.getTasks(task_status='approved') for task in approved_tasks: mom_task_details = self._momrpc.getProjectDetails(task['mom_id']) if (not mom_task_details or str(task['mom_id']) not in mom_task_details or mom_task_details[str(task['mom_id'])]['object_status'] == 'opened'): logger.info('task %s mom_id=%s otdb_id=%s was removed or set to status opened. removing task from rabd', task['id'], task['mom_id'], task['otdb_id']) self._radbrpc.deleteSpecification(task['specification_id']) except Exception as e: logger.error("Error while checking scheduled pipelines: %s", e) def _check_loop(self): while self._running: self.checkRunningPipelines() self.checkScheduledAndQueuedPipelines() for i in range(PIPELINE_CHECK_INTERVAL): sleep(1) if not self._running: break