#!/usr/bin/python # Copyright (C) 2012-2015 ASTRON (Netherlands Institute for Radio Astronomy) # P.O. Box 2, 7990 AA Dwingeloo, The Netherlands # # This file is part of the LOFAR software suite. # The LOFAR software suite is free software: you can redistribute it and/or # modify it under the terms of the GNU General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # The LOFAR software suite is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. # $Id$ '''ResourceAssignmentEditor webservice serves a interactive html5 website for viewing and editing lofar resources.''' import sys import os import json import time from optparse import OptionParser from threading import Condition from datetime import datetime import time import logging import subprocess from dateutil import parser, tz from flask import Flask from flask import render_template from flask import request from flask import abort from flask import url_for from flask.json import jsonify from flask.json import JSONEncoder from lofar.sas.resourceassignment.resourceassignmenteditor.utils import gzipped from lofar.sas.resourceassignment.resourceassignmenteditor.fakedata import * from lofar.sas.resourceassignment.resourceassignmenteditor.changeshandler import ChangesHandler, CHANGE_DELETE_TYPE from lofar.sas.datamanagement.common.config import DEFAULT_DM_NOTIFICATION_BUSNAME, DEFAULT_DM_NOTIFICATION_SUBJECTS from lofar.sas.resourceassignment.database.config import DEFAULT_NOTIFICATION_BUSNAME as DEFAULT_RADB_CHANGES_BUSNAME from lofar.sas.resourceassignment.database.config import DEFAULT_NOTIFICATION_SUBJECTS as DEFAULT_RADB_CHANGES_SUBJECTS from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME as DEFAULT_RADB_BUSNAME from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_SERVICENAME as DEFAULT_RADB_SERVICENAME from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME from lofar.mom.momqueryservice.momrpc import MoMRPC from lofar.mom.momqueryservice.config import DEFAULT_MOM_BUSNAME, DEFAULT_MOM_SERVICENAME from lofar.sas.resourceassignment.resourceassignmenteditor.mom import updateTaskMomDetails from lofar.sas.resourceassignment.resourceassignmenteditor.storage import updateTaskStorageDetails from lofar.sas.datamanagement.cleanup.rpc import CleanupRPC from lofar.sas.datamanagement.cleanup.config import DEFAULT_BUSNAME as DEFAULT_CLEANUP_BUSNAME from lofar.sas.datamanagement.cleanup.config import DEFAULT_SERVICENAME as DEFAULT_CLEANUP_SERVICENAME from lofar.sas.datamanagement.storagequery.rpc import StorageQueryRPC from lofar.sas.datamanagement.storagequery.config import DEFAULT_BUSNAME as DEFAULT_STORAGEQUERY_BUSNAME from lofar.sas.datamanagement.storagequery.config import DEFAULT_SERVICENAME as DEFAULT_STORAGEQUERY_SERVICENAME from lofar.sas.otdb.otdbrpc import OTDBRPC from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICE_BUSNAME, DEFAULT_OTDB_SERVICENAME from lofar.common import isProductionEnvironment, isTestEnvironment from lofar.common.util import humanreadablesize logger = logging.getLogger(__name__) def asDatetime(isoString): if isoString[-1] == 'Z': isoString = isoString[:-1] if isoString[-4] == '.': isoString += '000' return datetime.strptime(isoString, '%Y-%m-%dT%H:%M:%S.%f') def asIsoFormat(timestamp): return datetime.strftime(timestamp, '%Y-%m-%dT%H:%M:%S.%fZ') class CustomJSONEncoder(JSONEncoder): def default(self, obj): try: if isinstance(obj, datetime): return asIsoFormat(obj) iterable = iter(obj) except TypeError: pass else: return list(iterable) return JSONEncoder.default(self, obj) __root_path = os.path.dirname(os.path.realpath(__file__)) '''The flask webservice app''' app = Flask('Scheduler', instance_path=__root_path, template_folder=os.path.join(__root_path, 'templates'), static_folder=os.path.join(__root_path, 'static'), instance_relative_config=True) # Load the default configuration app.config.from_object('lofar.sas.resourceassignment.resourceassignmenteditor.config.default') app.json_encoder = CustomJSONEncoder rarpc = None momrpc = None otdbrpc = None curpc = None sqrpc = None momqueryrpc = None changeshandler = None @app.route('/') @app.route('/index.htm') @app.route('/index.html') @gzipped def index(): '''Serves the ResourceAssignmentEditor's index page''' return render_template('index.html', title='Scheduler') @app.route('/rest/config') @gzipped def config(): config = {'mom_base_url':'', 'lta_base_url':'', 'inspection_plots_base_url':'https://proxy.lofar.eu/inspect/HTML/'} if isProductionEnvironment(): config['mom_base_url'] = 'https://lofar.astron.nl/mom3' config['lta_base_url'] = 'http://lofar.target.rug.nl/' elif isTestEnvironment(): config['mom_base_url'] = 'http://lofartest.control.lofar:8080/mom3' config['lta_base_url'] = 'http://lofar-test.target.rug.nl/' return jsonify({'config': config}) @app.route('/rest/resources') @gzipped def resources(): result = rarpc.getResources(include_availability=True) return jsonify({'resources': result}) @app.route('/rest/resourcegroups') @gzipped def resourcegroups(): result = rarpc.getResourceGroups() return jsonify({'resourcegroups': result}) @app.route('/rest/resourcegroupmemberships') @gzipped def resourcegroupsmemberships(): result = rarpc.getResourceGroupMemberships() return jsonify({'resourcegroupmemberships': result}) @app.route('/rest/resourceclaims') def resourceclaims(): return resourceclaimsFromUntil(None, None) @app.route('/rest/resourceclaims/<string:fromTimestamp>') def resourceclaimsFrom(fromTimestamp=None): return resourceclaimsFromUntil(fromTimestamp, None) @app.route('/rest/resourceclaims/<string:fromTimestamp>/<string:untilTimestamp>') @gzipped def resourceclaimsFromUntil(fromTimestamp=None, untilTimestamp=None): if fromTimestamp and isinstance(fromTimestamp, basestring): fromTimestamp = asDatetime(fromTimestamp) if untilTimestamp and isinstance(untilTimestamp, basestring): untilTimestamp = asDatetime(untilTimestamp) claims = rarpc.getResourceClaims(lower_bound=fromTimestamp, upper_bound=untilTimestamp, include_properties=True) return jsonify({'resourceclaims': claims}) @app.route('/rest/resourceusages') @gzipped def resourceUsages(): result = rarpc.getResourceUsages() return jsonify({'resourceusages': result}) @app.route('/rest/resources/<int:resource_id>/usages', methods=['GET']) @app.route('/rest/resourceusages/<int:resource_id>', methods=['GET']) @gzipped def resourceUsagesForResource(resource_id): result = rarpc.getResourceUsages(resource_ids=[resource_id]) return jsonify({'resourceusages': result}) @app.route('/rest/tasks/<int:task_id>/resourceusages', methods=['GET']) @gzipped def resourceUsagesForTask(task_id): result = rarpc.getResourceUsages(task_ids=[task_id]) return jsonify({'resourceusages': result}) @app.route('/rest/tasks/<int:task_id>/resourceclaims', methods=['GET']) @gzipped def resourceClaimsForTask(task_id): result = rarpc.getResourceClaims(task_ids=[task_id], extended=True, include_properties=True) return jsonify({'resourceclaims': result}) @app.route('/rest/tasks') def getTasks(): return getTasksFromUntil(None, None) @app.route('/rest/tasks/<string:fromTimestamp>') def getTasksFrom(fromTimestamp): return getTasksFromUntil(fromTimestamp, None) @app.route('/rest/tasks/<string:fromTimestamp>/<string:untilTimestamp>') @gzipped def getTasksFromUntil(fromTimestamp=None, untilTimestamp=None): if fromTimestamp and isinstance(fromTimestamp, basestring): fromTimestamp = asDatetime(fromTimestamp) if untilTimestamp and isinstance(untilTimestamp, basestring): untilTimestamp = asDatetime(untilTimestamp) tasks = rarpc.getTasks(fromTimestamp, untilTimestamp) updateTaskMomDetails(tasks, momqueryrpc) updateTaskStorageDetails(tasks, sqrpc) return jsonify({'tasks': tasks}) @app.route('/rest/tasks/<int:task_id>', methods=['GET']) @gzipped def getTask(task_id): try: task = rarpc.getTask(task_id) if not task: abort(404) task['name'] = 'Task %d' % task['id'] updateTaskMomDetails(task, momqueryrpc) updateTaskStorageDetails(task, sqrpc) return jsonify({'task': task}) except Exception as e: abort(404) return jsonify({'task': None}) @app.route('/rest/tasks/otdb/<int:otdb_id>', methods=['GET']) @gzipped def getTaskByOTDBId(otdb_id): try: task = rarpc.getTask(otdb_id=otdb_id) if not task: abort(404) task['name'] = 'Task %d' % task['id'] updateTaskMomDetails(task, momqueryrpc) updateTaskStorageDetails(task, sqrpc) return jsonify({'task': task}) except Exception as e: abort(404) return jsonify({'task': None}) @app.route('/rest/tasks/mom/<int:mom_id>', methods=['GET']) @gzipped def getTaskByMoMId(mom_id): try: task = rarpc.getTask(mom_id=mom_id) if not task: abort(404) task['name'] = 'Task %d' % task['id'] updateTaskMomDetails(task, momqueryrpc) updateTaskStorageDetails(task, sqrpc) return jsonify({'task': task}) except Exception as e: abort(404) return jsonify({'task': None}) @app.route('/rest/tasks/mom/group/<int:mom_group_id>', methods=['GET']) @gzipped def getTasksByMoMGroupId(mom_group_id): try: mom_ids = momqueryrpc.getTaskIdsInGroup(mom_group_id)[str(mom_group_id)] tasks = rarpc.getTasks(mom_ids=mom_ids) updateTaskMomDetails(tasks, momqueryrpc) updateTaskStorageDetails(tasks, sqrpc) return jsonify({'tasks': tasks}) except Exception as e: abort(404) @app.route('/rest/tasks/mom/parentgroup/<int:mom_parent_group_id>', methods=['GET']) @gzipped def getTasksByMoMParentGroupId(mom_parent_group_id): try: mom_ids = momqueryrpc.getTaskIdsInParentGroup(mom_parent_group_id)[str(mom_parent_group_id)] tasks = rarpc.getTasks(mom_ids=mom_ids) updateTaskMomDetails(tasks, momqueryrpc) updateTaskStorageDetails(tasks, sqrpc) return jsonify({'tasks': tasks}) except Exception as e: abort(404) @app.route('/rest/tasks/<int:task_id>', methods=['PUT']) def putTask(task_id): if 'Content-Type' in request.headers and \ request.headers['Content-Type'].startswith('application/json'): updatedTask = json.loads(request.data) try: if task_id != updatedTask['id']: abort(404) if 'starttime' in updatedTask: if isProductionEnvironment(): abort(403, 'Editing of startime of tasks by users is not yet approved') try: updatedTask['starttime'] = asDatetime(updatedTask['starttime']) except ValueError: abort(400, 'timestamp not in iso format: ' + updatedTask['starttime']) if 'endtime' in updatedTask: if isProductionEnvironment(): abort(403, 'Editing of endtime of tasks by users is not yet approved') try: updatedTask['endtime'] = asDatetime(updatedTask['endtime']) except ValueError: abort(400, 'timestamp not in iso format: ' + updatedTask['endtime']) logger.info('putTask: ' + str(updatedTask)) if 'status' in updatedTask: task = rarpc.getTask(task_id) if not task: abort(404, "unknown task %s" % str(updatedTask)) otdbrpc.taskSetStatus(task['otdb_id'], updatedTask['status']) #rarpc.updateTaskAndResourceClaims(task_id, #starttime=updatedTask.get('starttime', None), #endtime=updatedTask.get('endtime', None)) return "", 204 except KeyError: abort(404) abort(406) @app.route('/rest/tasks/<int:task_id>/cleanup', methods=['DELETE']) def cleanupTaskData(task_id): try: delete_params = {} if 'Content-Type' in request.headers and (request.headers['Content-Type'].startswith('application/json') or request.headers['Content-Type'].startswith('text/plain')): delete_params = json.loads(request.data) task = rarpc.getTask(task_id) if not task: abort(404, 'No such task (id=%s)' % task_id) logger.info("cleanup task data id=%s otdb_id=%s delete_params=%s", task_id, task['otdb_id'], delete_params) result = curpc.removeTaskData(task['otdb_id'], delete_is=delete_params.get('delete_is', True), delete_cs=delete_params.get('delete_cs', True), delete_uv=delete_params.get('delete_uv', True), delete_im=delete_params.get('delete_im', True), delete_img=delete_params.get('delete_img', True), delete_pulp=delete_params.get('delete_pulp', True), delete_scratch=delete_params.get('delete_scratch', True)) logger.info(result) return jsonify(result) except Exception as e: abort(500) @app.route('/rest/tasks/<int:task_id>/datapath', methods=['GET']) @gzipped def getTaskDataPath(task_id): try: task = rarpc.getTask(task_id) if not task: abort(404, 'No such task (id=%s)' % task_id) result = sqrpc.getPathForOTDBId(task['otdb_id']) except Exception as e: abort(500, str(e)) if result['found']: return jsonify({'datapath': result['path']}) abort(404, result['message'] if result and 'message' in result else '') @app.route('/rest/tasks/otdb/<int:otdb_id>/diskusage', methods=['GET']) @gzipped def getTaskDiskUsageByOTDBId(otdb_id): try: result = sqrpc.getDiskUsageForTaskAndSubDirectories(otdb_id=otdb_id) except Exception as e: abort(500, str(e)) if result['found']: return jsonify(result) abort(404, result['message'] if result and 'message' in result else '') @app.route('/rest/tasks/<int:task_id>/diskusage', methods=['GET']) @gzipped def getTaskDiskUsage(task_id): try: result = sqrpc.getDiskUsageForTaskAndSubDirectories(radb_id=task_id) except Exception as e: abort(500, str(e)) if result['found']: return jsonify(result) abort(404, result['message'] if result and 'message' in result else '') @app.route('/rest/tasks/<int:task_id>/copy', methods=['PUT']) def copyTask(task_id): if isProductionEnvironment(): abort(403, 'Copying of tasks is by users is not yet approved') try: task = rarpc.getTask(task_id) if not task: logger.error('Could not find task %s' % task_id) abort(404, 'Could not find task %s' % task_id) mom2id = task['mom_id'] new_task_mom2id = momrpc.copyTask(mom2id=mom2id) return jsonify({'copied':True, 'new_task_mom2id':new_task_mom2id, 'old_task_mom2id':mom2id}) except Exception as e: logger.error(e) abort(404, str(e)) return jsonify({'copied':False}) @app.route('/rest/tasks/<int:task_id>/resourceclaims') @gzipped def taskResourceClaims(task_id): return jsonify({'taskResourceClaims': rarpc.getResourceClaims(task_id=task_id, include_properties=True)}) @app.route('/rest/tasktypes') @gzipped def tasktypes(): result = rarpc.getTaskTypes() result = sorted(result, key=lambda q: q['id']) return jsonify({'tasktypes': result}) @app.route('/rest/taskstatustypes') @gzipped def getTaskStatusTypes(): result = rarpc.getTaskStatuses() result = sorted(result, key=lambda q: q['id']) return jsonify({'taskstatustypes': result}) @app.route('/rest/resourcetypes') @gzipped def resourcetypes(): result = rarpc.getResourceTypes() result = sorted(result, key=lambda q: q['id']) return jsonify({'resourcetypes': result}) @app.route('/rest/resourceclaimpropertytypes') @gzipped def resourceclaimpropertytypes(): result = rarpc.getResourceClaimPropertyTypes() result = sorted(result, key=lambda q: q['id']) return jsonify({'resourceclaimpropertytypes': result}) @app.route('/rest/projects') @gzipped def getProjects(): projects = [] try: projects = momqueryrpc.getProjects() projects = [x for x in projects if x['status_id'] in [1, 7]] for project in projects: project['mom_id'] = project.pop('mom2id') except Exception as e: logger.error(e) projects.append({'name':'<unknown>', 'mom_id':-99, 'description': 'Container project for tasks for which we could not find a MoM project'}) projects.append({'name':'OTDB Only', 'mom_id':-98, 'description': 'Container project for tasks which exists only in OTDB'}) return jsonify({'momprojects': projects}) @app.route('/rest/projects/<int:project_mom2id>') @gzipped def getProject(project_mom2id): try: projects = momqueryrpc.getProjects() project = next(x for x in projects if x['mom2id'] == project_mom2id) return jsonify({'momproject': project}) except StopIteration as e: logger.error(e) abort(404, "No project with mom2id %s" % project_mom2id) except Exception as e: logger.error(e) abort(404, str(e)) @app.route('/rest/projects/<int:project_mom2id>/tasks') @gzipped def getProjectTasks(project_mom2id): return getProjectTasksFromUntil(project_mom2id, None, None) @app.route('/rest/projects/<int:project_mom2id>/tasks/<string:fromTimestamp>/<string:untilTimestamp>') @gzipped def getProjectTasksFromUntil(project_mom2id, fromTimestamp=None, untilTimestamp=None): try: if fromTimestamp and isinstance(fromTimestamp, basestring): fromTimestamp = asDatetime(fromTimestamp) if untilTimestamp and isinstance(untilTimestamp, basestring): untilTimestamp = asDatetime(untilTimestamp) task_mom2ids = momqueryrpc.getProjectTaskIds(project_mom2id)['task_mom2ids'] tasks = rarpc.getTasks(mom_ids=task_mom2ids, lower_bound=fromTimestamp, upper_bound=untilTimestamp) updateTaskMomDetails(tasks, momqueryrpc) #updateTaskStorageDetails(tasks, sqrpc) return jsonify({'tasks': tasks}) except Exception as e: logger.error(e) abort(404, str(e)) @app.route('/rest/projects/<int:project_mom2id>/diskusage') @gzipped def getProjectDiskUsageById(project_mom2id): try: project = momqueryrpc.getProject(project_mom2id=project_mom2id) return getProjectDiskUsageByName(project['name']) except StopIteration as e: logger.error(e) abort(404, "No project with mom2id %s" % project_mom2id) except Exception as e: logger.error(e) abort(404, str(e)) @app.route('/rest/projects/<string:project_name>/diskusage') @gzipped def getProjectDiskUsageByName(project_name): try: result = sqrpc.getDiskUsageForProjectDirAndSubDirectories(project_name=project_name) return jsonify(result) except Exception as e: logger.error(e) abort(404, str(e)) @app.route('/rest/projects/diskusage') @gzipped def getProjectsDiskUsage(): try: result = sqrpc.getDiskUsageForProjectsDirAndSubDirectories() return jsonify(result) except Exception as e: logger.error(e) abort(404, str(e)) @app.route('/rest/momobjectdetails/<int:mom2id>') @gzipped def getMoMObjectDetails(mom2id): details = momqueryrpc.getProjectDetails(mom2id) details = details.values()[0] if details else None if details: details['project_mom_id'] = details.pop('project_mom2id') details['object_mom_id'] = details.pop('object_mom2id') return jsonify({'momobjectdetails': details}) @app.route('/rest/updates/<int:sinceChangeNumber>') @gzipped def getUpdateEventsSince(sinceChangeNumber): changesSince = changeshandler.getChangesSince(sinceChangeNumber) return jsonify({'changes': changesSince}) @app.route('/rest/mostRecentChangeNumber') @gzipped def getMostRecentChangeNumber(): mrcn = changeshandler.getMostRecentChangeNumber() return jsonify({'mostRecentChangeNumber': mrcn}) @app.route('/rest/updates') def getUpdateEvents(): return getUpdateEventsSince(-1L) @app.route('/rest/lofarTime') @gzipped def getLofarTime(): return jsonify({'lofarTime': asIsoFormat(datetime.utcnow())}) #ugly method to generate html tables for all tasks @app.route('/tasks.html') @gzipped def getTasksHtml(): tasks = rarpc.getTasks() if not tasks: abort(404) updateTaskMomDetails(tasks, momqueryrpc) updateTaskStorageDetails(tasks, sqrpc) html = '<!DOCTYPE html><html><head><title>Tasks</title><style>table, th, td {border: 1px solid black; border-collapse: collapse; padding: 4px;}</style></head><body><table style="width:100%">\n' props = sorted(tasks[0].keys()) html += '<tr>%s</tr>\n' % ''.join('<th>%s</th>' % prop for prop in props) for task in tasks: html += '<tr>' for prop in props: if prop in task: if prop == 'id': html += '<td><a href="/rest/tasks/%s.html">%s</a></td> ' % (task[prop], task[prop]) else: html += '<td>%s</td> ' % task[prop] html += '</tr>\n' html += '</table></body></html>\n' return html #ugly method to generate html tables for the task and it's claims @app.route('/tasks/<int:task_id>.html', methods=['GET']) @gzipped def getTaskHtml(task_id): task = rarpc.getTask(task_id) if not task: abort(404, 'No such task %s' % task_id) task['name'] = 'Task %d' % task['id'] updateTaskMomDetails(task, momqueryrpc) updateTaskStorageDetails(task, sqrpc) html = '<!DOCTYPE html><html><head><title>Tasks</title><style>table, th, td {border: 1px solid black; border-collapse: collapse; padding: 4px;}</style></head><body><table style="">\n' html += '<h1>Task %s</h1>' % task_id html += '<p><a href="/tasks/%s/log.html">%s log</a></p> ' % (task['id'], task['type']) props = sorted(task.keys()) html += '<tr><th>key</th><th>value</th></tr>\n' for prop in props: html += '<tr><td>%s</td>' % prop if prop == 'id': html += '<td><a href="/tasks/%s.html">%s</a></td> ' % (task[prop], task[prop]) elif prop == 'predecessor_ids' or prop == 'successor_ids': ids = task[prop] if ids: html += '<td>%s</td> ' % ', '.join('<a href="/tasks/%s.html">%s</a>' % (id, id) for id in ids) else: html += '<td></td> ' else: html += '<td>%s</td> ' % task[prop] html += '</tr>' html += '</table>\n<br>' claims = rarpc.getResourceClaims(task_ids=[task_id], extended=True, include_properties=True) if claims: html += '<h1>Claims</h1>' for claim in claims: html += '<table>' for claim_key,claim_value in claim.items(): if claim_key == 'properties': html += '<tr><td>properties</td><td><table>' if claim_value: propnames = sorted(claim_value[0].keys()) html += '<tr>%s</tr>\n' % ''.join('<th>%s</th>' % propname for propname in propnames) for prop in claim_value: html += '<tr>%s</tr>\n' % ''.join('<td>%s</td>' % prop[propname] for propname in propnames) html += '</table></td></tr>' elif claim_key == 'saps': html += '<tr><td>saps</td><td><table>' saps = claim_value if saps: sap_keys = ['sap_nr', 'properties'] html += '<tr>%s</tr>\n' % ''.join('<th>%s</th>' % sap_key for sap_key in sap_keys) for sap in saps: html += '<tr>' for sap_key in sap_keys: if sap_key == 'properties': html += '<td><table>' sap_props = sap[sap_key] if sap_props: propnames = sorted(sap_props[0].keys()) html += '<tr>%s</tr>\n' % ''.join('<th>%s</th>' % propname for propname in propnames) for prop in sap_props: html += '<tr>%s</tr>\n' % ''.join('<td>%s</td>' % prop[propname] for propname in propnames) html += '</table></td>' else: html += '<td>%s</td>' % (sap[sap_key]) html += '</tr>' html += '</table></td></tr>' else: html += '<tr><td>%s</td><td>%s</td></tr>' % (claim_key,claim_value) html += '</table>' html += '<br>' html += '</body></html>\n' return html @app.route('/rest/tasks/<int:task_id>/resourceclaims.html', methods=['GET']) @gzipped def resourceClaimsForTaskHtml(task_id): claims = rarpc.getResourceClaims(task_ids=[task_id], extended=True, include_properties=True) if not claims: abort(404, 'No resource claims for task %s' % task_id) html = '<!DOCTYPE html><html><head><title>Tasks</title><style>table, th, td {border: 1px solid black; border-collapse: collapse; padding: 4px;}</style></head><body><table style="">\n' for claim in claims: html += '<tr><td>%s</td>' % claim html += '</table></body></html>\n' return html @app.route('/tasks/<int:task_id>/log.html', methods=['GET']) @gzipped def getTaskLogHtml(task_id): task = rarpc.getTask(task_id) cmd = [] if task['type'] == 'pipeline': cmd = ['ssh', 'lofarsys@head01.cep4.control.lofar', 'cat /data/log/pipeline-%s-*.log' % task['otdb_id']] else: cmd = ['ssh', 'mcu001.control.lofar', 'cat /opt/lofar/var/log/MCU001\\:ObservationControl\\[0\\]\\{%s\\}.log*' % task['otdb_id']] logger.info(' '.join(cmd)) proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = proc.communicate() if proc.returncode == 0: return out, 200, {'Content-Type': 'text/plain; charset=utf-8'} else: return err, 500, {'Content-Type': 'text/plain; charset=utf-8'} def main(): # make sure we run in UTC timezone import os os.environ['TZ'] = 'UTC' # Check the invocation arguments parser = OptionParser('%prog [options]', description='run the resource assignment editor web service') parser.add_option('-p', '--port', dest='port', type='int', default=5000, help='port number on which to host the webservice, default: %default') parser.add_option('-q', '--broker', dest='broker', type='string', default=None, help='Address of the qpid broker, default: localhost') parser.add_option('--radb_busname', dest='radb_busname', type='string', default=DEFAULT_RADB_BUSNAME, help='Name of the bus exchange on the qpid broker on which the radbservice listens, default: %default') parser.add_option('--radb_servicename', dest='radb_servicename', type='string', default=DEFAULT_RADB_SERVICENAME, help='Name of the radbservice, default: %default') parser.add_option('--radb_notification_busname', dest='radb_notification_busname', type='string', default=DEFAULT_RADB_CHANGES_BUSNAME, help='Name of the notification bus exchange on the qpid broker on which the radb notifications are published, default: %default') parser.add_option('--radb_notification_subjects', dest='radb_notification_subjects', type='string', default=DEFAULT_RADB_CHANGES_SUBJECTS, help='Subject(s) to listen for on the radb notification bus exchange on the qpid broker, default: %default') parser.add_option('--otdb_busname', dest='otdb_busname', type='string', default=DEFAULT_OTDB_SERVICE_BUSNAME, help='Name of the bus exchange on the qpid broker on which the otdbservice listens, default: %default') parser.add_option('--otdb_servicename', dest='otdb_servicename', type='string', default=DEFAULT_OTDB_SERVICENAME, help='Name of the otdbservice, default: %default') parser.add_option('--mom_busname', dest='mom_busname', type='string', default=DEFAULT_MOM_BUSNAME, help='Name of the bus exchange on the qpid broker on which the momservice listens, default: %default') parser.add_option('--mom_servicename', dest='mom_servicename', type='string', default=DEFAULT_MOM_SERVICENAME, help='Name of the momservice, default: %default') parser.add_option('--mom_broker', dest='mom_broker', type='string', default=None, help='Address of the qpid broker for the mom service, default: localhost') parser.add_option('--mom_query_busname', dest='mom_query_busname', type='string', default=DEFAULT_MOMQUERY_BUSNAME, help='Name of the bus exchange on the qpid broker on which the momqueryservice listens, default: %default') parser.add_option('--mom_query_servicename', dest='mom_query_servicename', type='string', default=DEFAULT_MOMQUERY_SERVICENAME, help='Name of the momqueryservice, default: %default') parser.add_option('--cleanup_busname', dest='cleanup_busname', type='string', default=DEFAULT_CLEANUP_BUSNAME, help='Name of the bus exchange on the qpid broker on which the cleanupservice listens, default: %default') parser.add_option('--cleanup_servicename', dest='cleanup_servicename', type='string', default=DEFAULT_CLEANUP_SERVICENAME, help='Name of the cleanupservice, default: %default') parser.add_option('--storagequery_busname', dest='storagequery_busname', type='string', default=DEFAULT_STORAGEQUERY_BUSNAME, help='Name of the bus exchange on the qpid broker on which the storagequeryservice listens, default: %default') parser.add_option('--storagequery_servicename', dest='storagequery_servicename', type='string', default=DEFAULT_STORAGEQUERY_SERVICENAME, help='Name of the storagequeryservice, default: %default') parser.add_option('--dm_notification_busname', dest='dm_notification_busname', type='string', default=DEFAULT_DM_NOTIFICATION_BUSNAME, help='Name of the notification bus exchange on the qpid broker on which the data management notifications are published, default: %default') parser.add_option('--dm_notification_subjects', dest='dm_notification_subjects', type='string', default=DEFAULT_DM_NOTIFICATION_SUBJECTS, help='Subject(s) to listen for on the data management notification bus exchange on the qpid broker, default: %default') parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging') (options, args) = parser.parse_args() logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.DEBUG if options.verbose else logging.INFO) global rarpc rarpc = RARPC(busname=options.radb_busname, servicename=options.radb_servicename, broker=options.broker) global momrpc momrpc = MoMRPC(busname=options.mom_busname, servicename=options.mom_servicename, timeout=10, broker=options.broker) global otdbrpc otdbrpc = OTDBRPC(busname=options.otdb_busname, servicename=options.otdb_servicename, broker=options.broker) global curpc curpc = CleanupRPC(busname=options.cleanup_busname, servicename=options.cleanup_servicename, broker=options.broker) global sqrpc sqrpc = StorageQueryRPC(busname=options.storagequery_busname, servicename=options.storagequery_servicename, broker=options.broker) global momqueryrpc momqueryrpc = MoMQueryRPC(busname=options.mom_query_busname, servicename=options.mom_query_servicename, timeout=10, broker=options.broker) global changeshandler changeshandler = ChangesHandler(radb_busname=options.radb_notification_busname, radb_subjects=options.radb_notification_subjects, dm_busname=options.dm_notification_busname, dm_subjects=options.dm_notification_subjects, broker=options.broker, momqueryrpc=momqueryrpc, radbrpc=rarpc) with changeshandler, rarpc, otdbrpc, curpc, sqrpc, momrpc, momqueryrpc: '''Start the webserver''' app.run(debug=options.verbose, threaded=True, host='0.0.0.0', port=options.port) if __name__ == '__main__': main()