#!/usr/bin/env python # # Copyright (C) 2015 # ASTRON (Netherlands Institute for Radio Astronomy) # P.O.Box 2, 7990 AA Dwingeloo, The Netherlands # # This file is part of the LOFAR software suite. # The LOFAR software suite is free software: you can redistribute it # and/or modify it under the terms of the GNU General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # The LOFAR software suite is distributed in the hope that it will be # useful, but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. # import qpid.messaging import logging from datetime import datetime, timedelta from time import sleep from threading import Thread from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME as DEFAULT_RADB_BUSNAME from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_SERVICENAME as DEFAULT_RADB_SERVICENAME from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME from lofar.sas.datamanagement.cleanup.rpc import CleanupRPC from lofar.sas.datamanagement.cleanup.config import DEFAULT_BUSNAME as DEFAULT_CLEANUP_BUSNAME from lofar.sas.datamanagement.cleanup.config import DEFAULT_SERVICENAME as DEFAULT_CLEANUP_SERVICENAME from lofar.sas.resourceassignment.resourceassigner.config import PIPELINE_CHECK_INTERVAL logger = logging.getLogger(__name__) def movePipelineAfterItsPredecessors(task, radbrpc, min_start_timestamp=None): try: #only reschedule pipelines which run on cep4 if task and task['type'] == 'pipeline' and task.get('cluster') == 'CEP4': logger.info("checking pipeline starttime radb_id=%s otdb_id=%s", task['id'], task['otdb_id']) predecessor_tasks = radbrpc.getTasks(task_ids=task['predecessor_ids']) predecessor_endtimes = [t['endtime'] for t in predecessor_tasks] if min_start_timestamp: predecessor_endtimes.append(min_start_timestamp) max_pred_endtime = max(predecessor_endtimes) if (task['starttime'] < max_pred_endtime) or (min_start_timestamp and task['starttime'] > min_start_timestamp): shift = max_pred_endtime - task['starttime'] newStartTime = task['starttime']+shift newEndTime = task['endtime']+shift # move pipeline even further ahead in case there are more than 2 overlapping scheduled/queued pipelines while True: overlapping_pipelines = radbrpc.getTasks(lower_bound=newStartTime, upper_bound=newEndTime, task_type='pipeline', task_status=['scheduled', 'queued', 'active', 'completing'], cluster='CEP4') #exclude self overlapping_pipelines = [pl for pl in overlapping_pipelines if pl['id'] != task['id']] if len(overlapping_pipelines) >= 2: max_overlapping_pipeline_endtime = max([t['endtime'] for t in overlapping_pipelines]) shift = max_overlapping_pipeline_endtime + timedelta(minutes=1) - task['starttime'] newStartTime = task['starttime']+shift newEndTime = task['endtime']+shift else: break if shift != timedelta(seconds=0): logger.info("Moving %s pipeline radb_id=%s otdb_id=%s by %s from \'%s\' to \'%s\'", task['status'], task['id'], task['otdb_id'], shift, task['starttime'], newStartTime) radbrpc.updateTaskAndResourceClaims(task['id'], starttime=newStartTime, endtime=newEndTime) updated_task = radbrpc.getTask(task['id']) if updated_task['status'] != task['status']: logger.warn("Moving of pipeline radb_id=%s otdb_id=%s caused the status to change from %s to %s", updated_task['id'], updated_task['otdb_id'], task['status'], updated_task['status']) #TODO: automatically resolve conflict status by moved pipeline in first free time slot. except Exception as e: logger.error("Error while checking pipeline starttime: %s", e) class ScheduleChecker(): def __init__(self, radb_busname=DEFAULT_RADB_BUSNAME, radb_servicename=DEFAULT_RADB_SERVICENAME, mom_busname=DEFAULT_MOMQUERY_BUSNAME, mom_servicename=DEFAULT_MOMQUERY_SERVICENAME, cleanup_busname=DEFAULT_CLEANUP_BUSNAME, cleanup_servicename=DEFAULT_CLEANUP_SERVICENAME, broker=None): """ """ self._thread = None self._running = False self._radbrpc = RARPC(servicename=radb_servicename, busname=radb_busname, broker=broker) self._momrpc = MoMQueryRPC(servicename=mom_servicename, busname=mom_busname, broker=broker) self._curpc = CleanupRPC(busname=cleanup_busname, servicename=cleanup_servicename, broker=broker) def __enter__(self): """Internal use only. (handles scope 'with')""" self.start() return self def __exit__(self, exc_type, exc_val, exc_tb): """Internal use only. (handles scope 'with')""" self.stop() def start(self): """Open rpc connections to radb service and resource estimator service""" self._radbrpc.open() self._momrpc.open() self._curpc.open() self._running = True self._thread = Thread(target=self._check_loop) self._thread.daemon = True self._thread.start() def stop(self): """Close rpc connections to radb service and resource estimator service""" self._radbrpc.close() self._momrpc.close() self._curpc.close() self._running = False self._thread.join(60) def checkRunningPipelines(self): try: now = datetime.utcnow() active_pipelines = self._radbrpc.getTasks(task_status='active', task_type='pipeline') if active_pipelines: logger.info('checking endtime of running pipelines') for task in active_pipelines: if task['endtime'] <= now: new_endtime=now+timedelta(seconds=PIPELINE_CHECK_INTERVAL) logger.info("Extending endtime to %s for pipeline radb_id=%s otdb_id=%s", new_endtime, task['id'], task['otdb_id']) self._radbrpc.updateTaskAndResourceClaims(task['id'], endtime=new_endtime) except Exception as e: logger.error("Error while checking running pipelines: %s", e) def checkScheduledAndQueuedPipelines(self): try: now = datetime.utcnow() min_start_timestamp = now + timedelta(seconds=PIPELINE_CHECK_INTERVAL) pipelines = self._radbrpc.getTasks(task_status=['scheduled', 'queued'], task_type='pipeline', cluster='CEP4') if pipelines: logger.info('checking starttime of %s scheduled/queued cep4 pipelines min_start_timestamp=%s', len(pipelines), min_start_timestamp) pipelines.sort(key=lambda pl: pl['starttime'], reverse=True) for task in pipelines: # moving pipelines might take a while # so this task might have changed status to active # in that case we don't want to move it uptodate_task = self._radbrpc.getTask(task['id']) if uptodate_task['status'] in ['scheduled', 'queued']: movePipelineAfterItsPredecessors(uptodate_task, self._radbrpc, min_start_timestamp) except Exception as e: logger.error("Error while checking scheduled pipelines: %s", e) def checkUnRunTasksForMoMOpenedStatus(self): try: logger.info('checking approved tasks for status in mom') unrun_tasks = self._radbrpc.getTasks(task_status=['approved', 'scheduled', 'prescheduled', 'queued']) mom_ids = [t['mom_id'] for t in unrun_tasks] mom_details = self._momrpc.getProjectDetails(mom_ids) for task in unrun_tasks: mom_id = int(task['mom_id']) if (mom_id not in mom_details or mom_details[mom_id]['object_status'] in ['opened', 'described', 'suspended']): logger.info('task %s mom_id=%s otdb_id=%s has radb_status=%s and mom_status=%s => removing task from radb', task['id'], task['mom_id'], task['otdb_id'], task['status'], mom_details[mom_id]['object_status']) if mom_details[mom_id]['object_status'] in ['opened', 'described']: # auto delete data for tasks which went back to opened in mom (for pipeline restarts for example) # The reason to delete it here is because otherwise the cleanupservice tries to get it's info from an already deleted task in radb/otdb path_result = self._curpc.getPathForOTDBId(task['otdb_id']) if path_result['found']: logger.info("removing data on disk from previous run for otdb_id %s", task['otdb_id']) result = self._curpc.removeTaskData(task['otdb_id']) if not result['deleted']: logger.warning("could not remove all data on disk from previous run for otdb_id %s: %s", task['otdb_id'], result['message']) # delete the spec (and task/claims etc via cascading delete) from radb to get it in sync again with mom self._radbrpc.deleteSpecification(task['specification_id']) except Exception as e: logger.error("Error while checking scheduled pipelines: %s", e) def _check_loop(self): while self._running: self.checkRunningPipelines() self.checkScheduledAndQueuedPipelines() self.checkUnRunTasksForMoMOpenedStatus() for i in range(PIPELINE_CHECK_INTERVAL): sleep(1) if not self._running: break