Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
webservice.py 45.77 KiB
#!/usr/bin/python

# Copyright (C) 2012-2015  ASTRON (Netherlands Institute for Radio Astronomy)
# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.

# $Id$

'''ResourceAssignmentEditor webservice serves a interactive html5 website for
viewing and editing lofar resources.'''

import sys
import os
import time
from optparse import OptionParser
from threading import Condition, Lock, current_thread, Thread
import _strptime
from datetime import datetime
from json import loads as json_loads
import time
import logging
import subprocess
from dateutil import parser, tz
from flask import Flask
from flask import render_template
from flask import request
from flask import abort
from flask import url_for
from lofar.common.flask_utils import gzipped
from lofar.messaging.RPC import RPCException
from lofar.sas.resourceassignment.resourceassignmenteditor.fakedata import *
from lofar.sas.resourceassignment.resourceassignmenteditor.changeshandler import ChangesHandler, CHANGE_DELETE_TYPE
from lofar.sas.datamanagement.common.config import DEFAULT_DM_NOTIFICATION_BUSNAME, DEFAULT_DM_NOTIFICATION_SUBJECTS
from lofar.sas.resourceassignment.database.config import DEFAULT_NOTIFICATION_BUSNAME as DEFAULT_RADB_CHANGES_BUSNAME
from lofar.sas.resourceassignment.database.config import DEFAULT_NOTIFICATION_SUBJECTS as DEFAULT_RADB_CHANGES_SUBJECTS
from lofar.sas.otdb.config import DEFAULT_OTDB_NOTIFICATION_BUSNAME, DEFAULT_OTDB_NOTIFICATION_SUBJECT
from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME as DEFAULT_RADB_BUSNAME
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_SERVICENAME as DEFAULT_RADB_SERVICENAME
from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC
from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME
from lofar.mom.momqueryservice.momrpc import MoMRPC
from lofar.mom.momqueryservice.config import DEFAULT_MOM_BUSNAME, DEFAULT_MOM_SERVICENAME
from lofar.sas.resourceassignment.resourceassignmenteditor.mom import updateTaskMomDetails
from lofar.sas.resourceassignment.resourceassignmenteditor.storage import updateTaskStorageDetails
from lofar.sas.datamanagement.cleanup.rpc import CleanupRPC
from lofar.sas.datamanagement.cleanup.config import DEFAULT_BUSNAME as DEFAULT_CLEANUP_BUSNAME
from lofar.sas.datamanagement.cleanup.config import DEFAULT_SERVICENAME as DEFAULT_CLEANUP_SERVICENAME
from lofar.sas.datamanagement.storagequery.rpc import StorageQueryRPC
from lofar.sas.datamanagement.storagequery.config import DEFAULT_BUSNAME as DEFAULT_STORAGEQUERY_BUSNAME
from lofar.sas.datamanagement.storagequery.config import DEFAULT_SERVICENAME as DEFAULT_STORAGEQUERY_SERVICENAME
from lofar.sas.otdb.otdbrpc import OTDBRPC
from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICE_BUSNAME, DEFAULT_OTDB_SERVICENAME
from lofar.lta.ingest.common.config import DEFAULT_INGEST_NOTIFICATION_BUSNAME
from lofar.lta.ingest.common.config import DEFAULT_INGEST_NOTIFICATION_SUBJECTS
from lofar.common import isProductionEnvironment, isTestEnvironment
from lofar.common.util import humanreadablesize
from lofar.common import dbcredentials
from lofar.sas.resourceassignment.database.radb import RADatabase

logger = logging.getLogger(__name__)

def asDatetime(isoString):
    if isoString[-1] == 'Z':
        isoString = isoString[:-1]
    if isoString[-4] == '.':
        isoString += '000'
    return datetime.strptime(isoString, '%Y-%m-%dT%H:%M:%S.%f')

def asIsoFormat(timestamp):
    return datetime.strftime(timestamp, '%Y-%m-%dT%H:%M:%S.%fZ')

__root_path = os.path.dirname(os.path.realpath(__file__))

'''The flask webservice app'''
app = Flask('Scheduler',
            instance_path=__root_path,
            template_folder=os.path.join(__root_path, 'templates'),
            static_folder=os.path.join(__root_path, 'static'),
            instance_relative_config=True)

# Load the default configuration
app.config.from_object('lofar.sas.resourceassignment.resourceassignmenteditor.config.default')


try:
    import ujson

    def convertDictDatetimeValuesToString(obj):
        '''recursively convert all string values in the dict to buffer'''
        if isinstance(obj, list):
            return [convertDictDatetimeValuesToString(x) if (isinstance(x, dict) or isinstance(x, list)) else x for x in obj]

        return dict( (k, convertDictDatetimeValuesToString(v) if (isinstance(v, dict) or isinstance(v, list)) else asIsoFormat(v) if isinstance(v, datetime) else v) for k,v in obj.items())

    def jsonify(obj):
        '''faster implementation of flask.json.jsonify using ultrajson and the above datetime->string convertor'''
        json_str = ujson.dumps(dict(convertDictDatetimeValuesToString(obj)))

        return app.response_class(json_str, mimetype='application/json')
except:
    from flask.json import jsonify
    from flask.json import JSONEncoder

    class CustomJSONEncoder(JSONEncoder):
        def default(self, obj):
            try:
                if isinstance(obj, datetime):
                    return asIsoFormat(obj)
                iterable = iter(obj)
            except TypeError:
                pass
            else:
                return list(iterable)
            return JSONEncoder.default(self, obj)

    app.json_encoder = CustomJSONEncoder


rarpc = None
momrpc = None
otdbrpc = None
curpc = None
sqrpc = None
momqueryrpc = None
changeshandler = None

_radb_pool = {}
_radb_pool_lock = Lock()
_radb_dbcreds = None

def radb():
    global _radb_pool, _radb_pool_lock

    if _radb_dbcreds:
        with _radb_pool_lock:
            thread = current_thread()
            tid = thread.ident
            now = datetime.utcnow()
            if tid not in _radb_pool:
                logger.info('creating radb connection for thread %s', tid)
                _radb_pool[tid] = { 'connection': RADatabase(dbcreds=_radb_dbcreds, log_queries=True),
                                    'last_used': now }

            thread_conn_obj = _radb_pool[tid]
            thread_conn_obj['last_used'] = now

            threshold = timedelta(minutes=5)
            obsolete_connections_tids = [tid for tid,tco in _radb_pool.items() if now - tco['last_used'] > threshold]

            for tid in obsolete_connections_tids:
                logger.info('deleting radb connection for thread %s', tid)
                del _radb_pool[tid]

            return thread_conn_obj['connection']

    return rarpc

@app.route('/')
@app.route('/index.htm')
@app.route('/index.html')
@gzipped
def index():
    '''Serves the ResourceAssignmentEditor's index page'''
    return render_template('index.html', title='Scheduler')

@app.route('/projects')
@app.route('/projects.htm')
@app.route('/projects.html')
@gzipped
def projects():
    return render_template('projects.html', title='Projects')

@app.route('/rest/config')
@gzipped
def config():
    config = {'mom_base_url':'',
              'lta_base_url':'',
              'inspection_plots_base_url':'https://proxy.lofar.eu/inspect/HTML/',
              'sky_view_base_url':'http://dop344.astron.nl:5000/uvis/id'}

    if isProductionEnvironment():
        config['mom_base_url'] = 'https://lofar.astron.nl/mom3'
        config['lta_base_url'] = 'http://lofar.target.rug.nl/'
    elif isTestEnvironment():
        config['mom_base_url'] = 'http://lofartest.control.lofar:8080/mom3'
        config['lta_base_url'] = 'http://lofar-test.target.rug.nl/'

    return jsonify({'config': config})

@app.route('/rest/resources')
@gzipped
def resources():
    result = radb().getResources(include_availability=True)
    return jsonify({'resources': result})

@app.route('/rest/resources/<int:resource_id>')
@gzipped
def resource(resource_id):
    result = radb().getResources(resource_ids=[resource_id], include_availability=True)
    if result:
        return jsonify(result[0])
    return jsonify({})

@app.route('/rest/resources/<int:resource_id>/resourceclaims')
@gzipped
def resourceclaimsForResource(resource_id):
    return resourceclaimsForResourceFromUntil(resource_id, None, None)

@app.route('/rest/resources/<int:resource_id>/resourceclaims/<string:fromTimestamp>')
@gzipped
def resourceclaimsForResourceFrom(resource_id, fromTimestamp=None):
    return resourceclaimsForResourceFromUntil(resource_id, fromTimestamp, None)

@app.route('/rest/resources/<int:resource_id>/resourceclaims/<string:fromTimestamp>/<string:untilTimestamp>')
@gzipped
def resourceclaimsForResourceFromUntil(resource_id, fromTimestamp=None, untilTimestamp=None):
    if fromTimestamp and isinstance(fromTimestamp, basestring):
        fromTimestamp = asDatetime(fromTimestamp)

    if untilTimestamp and isinstance(untilTimestamp, basestring):
        untilTimestamp = asDatetime(untilTimestamp)

    claims = radb().getResourceClaims(lower_bound=fromTimestamp,
                                      upper_bound=untilTimestamp,
                                      resource_ids=[resource_id],
                                      extended=False,
                                      include_properties=True)
    return jsonify({'resourceclaims': claims})

@app.route('/rest/resourcegroups')
@gzipped
def resourcegroups():
    result = radb().getResourceGroups()
    return jsonify({'resourcegroups': result})

@app.route('/rest/resourcegroupmemberships')
@gzipped
def resourcegroupsmemberships():
    result = radb().getResourceGroupMemberships()
    return jsonify({'resourcegroupmemberships': result})

@app.route('/rest/resourceclaims')
def resourceclaims():
    return resourceclaimsFromUntil(None, None)

@app.route('/rest/resourceclaims/<string:fromTimestamp>')
def resourceclaimsFrom(fromTimestamp=None):
    return resourceclaimsFromUntil(fromTimestamp, None)

@app.route('/rest/resourceclaims/<string:fromTimestamp>/<string:untilTimestamp>')
@gzipped
def resourceclaimsFromUntil(fromTimestamp=None, untilTimestamp=None):
    if fromTimestamp and isinstance(fromTimestamp, basestring):
        fromTimestamp = asDatetime(fromTimestamp)

    if untilTimestamp and isinstance(untilTimestamp, basestring):
        untilTimestamp = asDatetime(untilTimestamp)

    claims = radb().getResourceClaims(lower_bound=fromTimestamp, upper_bound=untilTimestamp, include_properties=True)
    return jsonify({'resourceclaims': claims})

@app.route('/rest/resourceusages')
@gzipped
def resourceUsages():
    return resourceUsagesFromUntil(None, None)

@app.route('/rest/resourceusages/<string:fromTimestamp>/<string:untilTimestamp>')
@gzipped
def resourceUsagesFromUntil(fromTimestamp=None, untilTimestamp=None):
    if fromTimestamp and isinstance(fromTimestamp, basestring):
        fromTimestamp = asDatetime(fromTimestamp)

    if untilTimestamp and isinstance(untilTimestamp, basestring):
        untilTimestamp = asDatetime(untilTimestamp)

    result = radb().getResourceUsages(lower_bound=fromTimestamp, upper_bound=untilTimestamp)
    return jsonify({'resourceusages': result})

@app.route('/rest/resources/<int:resource_id>/usages', methods=['GET'])
@app.route('/rest/resourceusages/<int:resource_id>', methods=['GET'])
@gzipped
def resourceUsagesForResource(resource_id):
    return resourceUsagesForResourceFromUntil(resource_id, None, None)

@app.route('/rest/resources/<int:resource_id>/usages/<string:fromTimestamp>/<string:untilTimestamp>', methods=['GET'])
@app.route('/rest/resourceusages/<int:resource_id>/<string:fromTimestamp>/<string:untilTimestamp>', methods=['GET'])
@gzipped
def resourceUsagesForResourceFromUntil(resource_id, fromTimestamp=None, untilTimestamp=None):
    if fromTimestamp and isinstance(fromTimestamp, basestring):
        fromTimestamp = asDatetime(fromTimestamp)

    if untilTimestamp and isinstance(untilTimestamp, basestring):
        untilTimestamp = asDatetime(untilTimestamp)

    result = radb().getResourceUsages(resource_ids=[resource_id], lower_bound=fromTimestamp, upper_bound=untilTimestamp)
    return jsonify({'resourceusages': result})

@app.route('/rest/tasks/<int:task_id>/resourceusages', methods=['GET'])
@gzipped
def resourceUsagesForTask(task_id):
    result = radb().getResourceUsages(task_ids=[task_id])
    return jsonify({'resourceusages': result})

@app.route('/rest/tasks/<int:task_id>/resourceclaims', methods=['GET'])
@gzipped
def resourceClaimsForTask(task_id):
    result = radb().getResourceClaims(task_ids=[task_id], extended=True, include_properties=True)
    return jsonify({'resourceclaims': result})

@app.route('/rest/tasks')
def getTasks():
    return getTasksFromUntil(None, None)

@app.route('/rest/tasks/<string:fromTimestamp>')
def getTasksFrom(fromTimestamp):
    return getTasksFromUntil(fromTimestamp, None)

@app.route('/rest/tasks/<string:fromTimestamp>/<string:untilTimestamp>')
@gzipped
def getTasksFromUntil(fromTimestamp=None, untilTimestamp=None):
    if fromTimestamp and isinstance(fromTimestamp, basestring):
        fromTimestamp = asDatetime(fromTimestamp)

    if untilTimestamp and isinstance(untilTimestamp, basestring):
        untilTimestamp = asDatetime(untilTimestamp)

    tasks = radb().getTasks(fromTimestamp, untilTimestamp)
    updateTaskDetails(tasks)

    return jsonify({'tasks': tasks})

def updateTaskDetails(tasks):
    #update the mom details and the storage details in parallel
    t1 = Thread(target=updateTaskMomDetails, args=(tasks, momqueryrpc))
    t2 = Thread(target=updateTaskStorageDetails, args=(tasks, sqrpc, curpc))
    t1.daemon = True
    t2.daemon = True
    t1.start()
    t2.start()

    #wait for mom details thread to finish
    t1.join()

    #task details (such as name/description) from MoM are done
    #get extra details on reserved resources for reservations (while the storage details still run in t2)
    reservationTasks = [t for t in tasks if t['type'] == 'reservation']
    if reservationTasks:
        reservationClaims = radb().getResourceClaims(task_ids=[t['id'] for t in reservationTasks], extended=True, include_properties=False)
        task2claims = {}
        for claim in reservationClaims:
            if claim['task_id'] not in task2claims:
                task2claims[claim['task_id']] = []
            task2claims[claim['task_id']].append(claim)
        for task in reservationTasks:
            claims = task2claims.get(task['id'], [])
            task['name'] = ', '.join(c['resource_name'] for c in claims)
            task['description'] = 'Reservation on ' + task['name']

    #wait for storage details thread to finish
    t2.join()


@app.route('/rest/tasks/<int:task_id>', methods=['GET'])
@gzipped
def getTask(task_id):
    try:
        task = radb().getTask(task_id)

        if not task:
            abort(404)

        task['name'] = 'Task %d' % task['id']
        updateTaskDetails([task])
        return jsonify({'task': task})
    except Exception as e:
        abort(404)

    return jsonify({'task': None})

@app.route('/rest/tasks/otdb/<int:otdb_id>', methods=['GET'])
@gzipped
def getTaskByOTDBId(otdb_id):
    try:
        task = radb().getTask(otdb_id=otdb_id)

        if not task:
            abort(404)

        task['name'] = 'Task %d' % task['id']
        updateTaskDetails([task])
        return jsonify({'task': task})
    except Exception as e:
        abort(404)

    return jsonify({'task': None})

@app.route('/rest/tasks/mom/<int:mom_id>', methods=['GET'])
@gzipped
def getTaskByMoMId(mom_id):
    try:
        task = radb().getTask(mom_id=mom_id)

        if not task:
            abort(404)

        task['name'] = 'Task %d' % task['id']
        updateTaskDetails([task])
        return jsonify({'task': task})
    except Exception as e:
        abort(404)

    return jsonify({'task': None})

@app.route('/rest/tasks/mom/group/<int:mom_group_id>', methods=['GET'])
@gzipped
def getTasksByMoMGroupId(mom_group_id):
    try:
        mom_ids = momqueryrpc.getTaskIdsInGroup(mom_group_id)[str(mom_group_id)]
        tasks = radb().getTasks(mom_ids=mom_ids)
        updateTaskDetails(tasks)
        return jsonify({'tasks': tasks})
    except Exception as e:
        abort(404)

@app.route('/rest/tasks/mom/parentgroup/<int:mom_parent_group_id>', methods=['GET'])
@gzipped
def getTasksByMoMParentGroupId(mom_parent_group_id):
    try:
        mom_ids = momqueryrpc.getTaskIdsInParentGroup(mom_parent_group_id)[str(mom_parent_group_id)]
        tasks = radb().getTasks(mom_ids=mom_ids)
        updateTaskDetails(tasks)
        return jsonify({'tasks': tasks})
    except Exception as e:
        abort(404)

@app.route('/rest/tasks/<int:task_id>', methods=['PUT'])
def putTask(task_id):
    if 'Content-Type' in request.headers and \
            request.headers['Content-Type'].startswith('application/json'):
        try:
            updatedTask = json_loads(request.data)

            if task_id != int(updatedTask['id']):
                abort(404, 'task_id in url is not equal to id in request.data')

            #check if task is known
            task = radb().getTask(task_id)
            if not task:
                abort(404, "unknown task %s" % str(updatedTask))

            # first handle start- endtimes...
            if 'starttime' in updatedTask or 'endtime' in updatedTask:
                logger.info('starttime or endtime in updatedTask: %s', updatedTask)
                if isProductionEnvironment():
                    abort(403, 'Editing of %s of tasks by users is not yet approved' % (time,))

                #update dict for otdb spec
                spec_update = {}

                for timeprop in ['starttime', 'endtime']:
                    if timeprop in updatedTask:
                        try:
                            updatedTask[timeprop] = asDatetime(updatedTask[timeprop])
                        except ValueError:
                            abort(400, 'timestamp not in iso format: ' + updatedTask[timeprop])
                        otdb_key = 'LOFAR.ObsSW.Observation.' + ('startTime' if timeprop == 'starttime' else 'stopTime')
                        spec_update[otdb_key] = updatedTask[timeprop].strftime('%Y-%m-%d %H:%M:%S')

                #update timestamps in both otdb and radb
                otdbrpc.taskSetSpecification(task['otdb_id'], spec_update)

                # update the task's (and its claims) start/endtime
                # do not update the tasks status directly via the radb. See few lines below. task status is routed via otdb (and then ends up in radb automatically)
                # it might be that editing the start/end time results in a (rabd)task status update (for example to 'conflict' due to conflicting claims)
                # that's ok, since we'll update the status to the requested status later via otdb (see few lines below)
                radb().updateTaskAndResourceClaims(task_id,
                                                   starttime=updatedTask.get('starttime'),
                                                   endtime=updatedTask.get('endtime'))

            # ...then, handle status update which might trigger resource assignment,
            # for which the above updated times are needed
            if 'status' in updatedTask:
                try:
                    #update status in otdb only
                    #the status change will propagate automatically into radb via other services (by design)
                    otdbrpc.taskSetStatus(task['otdb_id'], updatedTask['status'])

                    #we expect the status in otdb/radb to eventually become what we asked for...
                    expected_statuses = set([updatedTask['status']])

                    #except for the prescheduled status, because then the resource assigner tries
                    #to schedule the task, and it will end up in either 'scheduled', 'conflict', 'error' state.
                    if updatedTask['status'] == 'prescheduled':
                        expected_statuses = set(['scheduled', 'conflict', 'error'])

                    #block until radb and mom task status are equal to the expected_statuses (with timeout)
                    start_wait = datetime.utcnow()

                    while True:
                        task = radb().getTask(otdb_id=task['otdb_id'])
                        otdb_status = otdbrpc.taskGetStatus(task['otdb_id'])

                        logger.info('waiting for otdb/radb task status to be in [%s].... otdb:%s radb:%s',
                                    ', '.join(expected_statuses), otdb_status, task['status'])

                        if (task['status'] in expected_statuses and
                            otdb_status in expected_statuses):
                            break

                        if datetime.utcnow() - start_wait > timedelta(seconds=10):
                            break

                        time.sleep(0.2)
                except RPCException as e:
                    if 'does not exist' in e.message:
                        # task does not exist (anymore) in otdb
                        #so remove it from radb as well (with cascading deletes on specification)
                        logger.warn('task with otdb_id %s does not exist anymore in OTDB. removing task radb_id %s from radb', task['otdb_id'], task['id'])
                        radb().deleteSpecification(task['specification_id'])

            if 'data_pinned' in updatedTask:
                task = radb().getTask(task_id)

                if not task:
                    abort(404, "unknown task %s" % str(updatedTask))

                curpc.setTaskDataPinned(task['otdb_id'], updatedTask['data_pinned'])

            return "", 204
        except Exception as e:
            logger.error(e)
            abort(404, str(e))
    abort(406)

@app.route('/rest/tasks/<int:task_id>/cleanup', methods=['DELETE'])
def cleanupTaskData(task_id):
    try:
        delete_params = {}

        if 'Content-Type' in request.headers and (request.headers['Content-Type'].startswith('application/json') or request.headers['Content-Type'].startswith('text/plain')):
            delete_params = json_loads(request.data)

        task = radb().getTask(task_id)

        if not task:
            abort(404, 'No such task (id=%s)' % task_id)

        logger.info("cleanup task data id=%s otdb_id=%s delete_params=%s", task_id, task['otdb_id'], delete_params)

        result = curpc.removeTaskData(task['otdb_id'],
                                      delete_is=delete_params.get('delete_is', True),
                                      delete_cs=delete_params.get('delete_cs', True),
                                      delete_uv=delete_params.get('delete_uv', True),
                                      delete_im=delete_params.get('delete_im', True),
                                      delete_img=delete_params.get('delete_img', True),
                                      delete_pulp=delete_params.get('delete_pulp', True),
                                      delete_scratch=delete_params.get('delete_scratch', True),
                                      force=delete_params.get('force_delete', False))
        logger.info(result)
        return jsonify(result)
    except Exception as e:
        abort(500)

@app.route('/rest/tasks/<int:task_id>/datapath', methods=['GET'])
@gzipped
def getTaskDataPath(task_id):
    try:
        task = radb().getTask(task_id)

        if not task:
            abort(404, 'No such task (id=%s)' % task_id)

        result = sqrpc.getPathForOTDBId(task['otdb_id'])
    except Exception as e:
        abort(500, str(e))

    if result['found']:
        return jsonify({'datapath': result['path']})
    abort(404, result['message'] if result and 'message' in result else '')

@app.route('/rest/tasks/otdb/<int:otdb_id>/diskusage', methods=['GET'])
@gzipped
def getTaskDiskUsageByOTDBId(otdb_id):
    try:
        result = sqrpc.getDiskUsageForTaskAndSubDirectories(otdb_id=otdb_id, force_update=request.args.get('force')=='true')
    except Exception as e:
        abort(500, str(e))

    if result['found']:
        return jsonify(result)
    abort(404, result['message'] if result and 'message' in result else '')

@app.route('/rest/tasks/<int:task_id>/diskusage', methods=['GET'])
@gzipped
def getTaskDiskUsage(task_id):
    try:
        result = sqrpc.getDiskUsageForTaskAndSubDirectories(radb_id=task_id, force_update=request.args.get('force')=='true')
    except Exception as e:
        abort(500, str(e))

    if result['found']:
        return jsonify(result)
    abort(404, result['message'] if result and 'message' in result else '')

@app.route('/rest/tasks/<int:task_id>/copy', methods=['PUT'])
def copyTask(task_id):
    if isProductionEnvironment():
        abort(403, 'Copying of tasks is by users is not yet approved')

    try:
        task = radb().getTask(task_id)

        if not task:
            logger.error('Could not find task %s' % task_id)
            abort(404, 'Could not find task %s' % task_id)

        mom2id = task['mom_id']
        new_task_mom2id = momrpc.copyTask(mom2id=mom2id)
        return jsonify({'copied':True, 'new_task_mom2id':new_task_mom2id, 'old_task_mom2id':mom2id})
    except Exception as e:
        logger.error(e)
        abort(404, str(e))

        return jsonify({'copied':False})

@app.route('/rest/tasks/<int:task_id>/parset', methods=['GET'])
@gzipped
def getParset(task_id):
    try:
        task = radb().getTask(task_id)

        if not task:
            abort(404)

        return getParsetByOTDBId(task['otdb_id'])
    except Exception as e:
        abort(404)
    abort(404)

@app.route('/rest/tasks/otdb/<int:otdb_id>/parset', methods=['GET'])
@gzipped
def getParsetByOTDBId(otdb_id):
    try:
        logger.info('getParsetByOTDBId(%s)', otdb_id)
        parset = otdbrpc.taskGetSpecification(otdb_id=otdb_id)['specification']
        return '\n'.join(['%s=%s' % (k,parset[k]) for k in sorted(parset.keys())]), 200, {'Content-Type': 'text/plain; charset=utf-8'}
    except Exception as e:
        abort(404)
    abort(404)

@app.route('/rest/tasks/<int:task_id>/resourceclaims')
@gzipped
def taskResourceClaims(task_id):
    return jsonify({'taskResourceClaims': radb().getResourceClaims(task_ids=[task_id], include_properties=True)})

@app.route('/rest/tasktypes')
@gzipped
def tasktypes():
    result = radb().getTaskTypes()
    result = sorted(result, key=lambda q: q['id'])
    return jsonify({'tasktypes': result})

@app.route('/rest/taskstatustypes')
@gzipped
def getTaskStatusTypes():
    result = radb().getTaskStatuses()
    result = sorted(result, key=lambda q: q['id'])
    return jsonify({'taskstatustypes': result})

@app.route('/rest/resourcetypes')
@gzipped
def resourcetypes():
    result = radb().getResourceTypes()
    result = sorted(result, key=lambda q: q['id'])
    return jsonify({'resourcetypes': result})

@app.route('/rest/resourceclaimpropertytypes')
@gzipped
def resourceclaimpropertytypes():
    result = radb().getResourceClaimPropertyTypes()
    result = sorted(result, key=lambda q: q['id'])
    return jsonify({'resourceclaimpropertytypes': result})

@app.route('/rest/projects')
@gzipped
def getProjects():
    projects = []
    try:
        projects = momqueryrpc.getProjects()
        projects = [x for x in projects if x['status_id'] in [1, 7]]
        for project in projects:
            project['mom_id'] = project.pop('mom2id')
    except Exception as e:
        logger.error(e)
        projects.append({'name':'<unknown>', 'mom_id':-99, 'description': 'Container project for tasks for which we could not find a MoM project'})

    projects.append({'name':'OTDB Only', 'mom_id':-98, 'description': 'Container project for tasks which exists only in OTDB'})
    projects.append({'name':'Reservations', 'mom_id':-97, 'description': 'Container project for reservation tasks'})
    return jsonify({'momprojects': projects})

@app.route('/rest/projects/<int:project_mom2id>')
@gzipped
def getProject(project_mom2id):
    try:
        projects = momqueryrpc.getProjects()
        project = next(x for x in projects if x['mom2id'] == project_mom2id)
        return jsonify({'momproject': project})
    except StopIteration as e:
        logger.error(e)
        abort(404, "No project with mom2id %s" % project_mom2id)
    except Exception as e:
        logger.error(e)
        abort(404, str(e))

@app.route('/rest/projects/<int:project_mom2id>/tasks')
@gzipped
def getProjectTasks(project_mom2id):
    return getProjectTasksFromUntil(project_mom2id, None, None)

@app.route('/rest/projects/<int:project_mom2id>/tasks/<string:fromTimestamp>/<string:untilTimestamp>')
@gzipped
def getProjectTasksFromUntil(project_mom2id, fromTimestamp=None, untilTimestamp=None):
    try:
        if fromTimestamp and isinstance(fromTimestamp, basestring):
            fromTimestamp = asDatetime(fromTimestamp)

        if untilTimestamp and isinstance(untilTimestamp, basestring):
            untilTimestamp = asDatetime(untilTimestamp)

        task_mom2ids = momqueryrpc.getProjectTaskIds(project_mom2id)['task_mom2ids']

        tasks = radb().getTasks(mom_ids=task_mom2ids, lower_bound=fromTimestamp, upper_bound=untilTimestamp)
        updateTaskDetails(tasks)
        return jsonify({'tasks': tasks})
    except Exception as e:
        logger.error(e)
        abort(404, str(e))

@app.route('/rest/projects/<int:project_mom2id>/taskstimewindow')
@gzipped
def getProjectTasksTimeWindow(project_mom2id):
    try:
        task_mom2ids = momqueryrpc.getProjectTaskIds(project_mom2id)['task_mom2ids']

        timewindow = radb().getTasksTimeWindow(mom_ids=task_mom2ids)

        return jsonify(timewindow)
    except Exception as e:
        logger.error(e)
        abort(404, str(e))

@app.route('/rest/projects/<int:project_mom2id>/diskusage')
@gzipped
def getProjectDiskUsageById(project_mom2id):
    try:
        project = momqueryrpc.getProject(project_mom2id=project_mom2id)
        return getProjectDiskUsageByName(project['name'])
    except StopIteration as e:
        logger.error(e)
        abort(404, "No project with mom2id %s" % project_mom2id)
    except Exception as e:
        logger.error(e)
        abort(404, str(e))

@app.route('/rest/projects/<string:project_name>/diskusage')
@gzipped
def getProjectDiskUsageByName(project_name):
    try:
        result = sqrpc.getDiskUsageForProjectDirAndSubDirectories(project_name=project_name, force_update=request.args.get('force')=='true')
        return jsonify(result)
    except Exception as e:
        logger.error(e)
        abort(404, str(e))

@app.route('/rest/projects/diskusage')
@gzipped
def getProjectsDiskUsage():
    try:
        result = sqrpc.getDiskUsageForProjectsDirAndSubDirectories(force_update=request.args.get('force')=='true')
        return jsonify(result)
    except Exception as e:
        logger.error(e)
        abort(404, str(e))

@app.route('/rest/momobjectdetails/<int:mom2id>')
@gzipped
def getMoMObjectDetails(mom2id):
    details = momqueryrpc.getObjectDetails(mom2id)
    details = details.values()[0] if details else None
    if details:
        details['project_mom_id'] = details.pop('project_mom2id')
        details['object_mom_id'] = details.pop('object_mom2id')

    return jsonify({'momobjectdetails': details})

@app.route('/rest/updates/<int:sinceChangeNumber>')
@gzipped
def getUpdateEventsSince(sinceChangeNumber):
    changesSince = changeshandler.getChangesSince(sinceChangeNumber)
    return jsonify({'changes': changesSince})

@app.route('/rest/mostRecentChangeNumber')
@gzipped
def getMostRecentChangeNumber():
    mrcn = changeshandler.getMostRecentChangeNumber()
    return jsonify({'mostRecentChangeNumber': mrcn})

@app.route('/rest/updates')
def getUpdateEvents():
    return getUpdateEventsSince(-1L)

@app.route('/rest/logEvents')
@gzipped
def getMostRecentLogEvents():
    return getLogEventsSince(datetime.utcnow() - timedelta(hours=6))

@app.route('/rest/logEvents/<string:fromTimestamp>')
@gzipped
def getLogEventsSince(fromTimestamp=None):
    if not fromTimestamp:
        fromTimestamp = datetime.utcnow() - timedelta(hours=6)
    eventsSince = changeshandler.getEventsSince(fromTimestamp)
    return jsonify({'logEvents': eventsSince})

@app.route('/rest/lofarTime')
@gzipped
def getLofarTime():
    return jsonify({'lofarTime': asIsoFormat(datetime.utcnow())})


#ugly method to generate html tables for all tasks
@app.route('/tasks.html')
@gzipped
def getTasksHtml():
    tasks = radb().getTasks()
    if not tasks:
        abort(404)

    updateTaskDetails(tasks)

    html = '<!DOCTYPE html><html><head><title>Tasks</title><style>table, th, td {border: 1px solid black; border-collapse: collapse; padding: 4px;}</style></head><body><table style="width:100%">\n'

    props = sorted(tasks[0].keys())
    html += '<tr>%s</tr>\n' % ''.join('<th>%s</th>' % prop for prop in props)

    for task in tasks:
        html += '<tr>'
        for prop in props:
            if prop in task:
                if prop == 'id':
                    html += '<td><a href="/rest/tasks/%s.html">%s</a></td> ' % (task[prop], task[prop])
                else:
                    html += '<td>%s</td> ' % task[prop]
        html += '</tr>\n'

    html += '</table></body></html>\n'

    return html

#ugly method to generate html tables for the task and it's claims
@app.route('/tasks/<int:task_id>.html', methods=['GET'])
@gzipped
def getTaskHtml(task_id):
    task = radb().getTask(task_id)

    if not task:
        abort(404, 'No such task %s' % task_id)

    task['name'] = 'Task %d' % task['id']
    updateTaskDetails([task])

    html = '<!DOCTYPE html><html><head><title>Tasks</title><style>table, th, td {border: 1px solid black; border-collapse: collapse; padding: 4px;}</style></head><body><table style="">\n'

    html += '<h1>Task %s</h1>' % task_id

    html += '<p><a href="/tasks/%s/log.html">%s log</a></p> ' % (task['id'], task['type'])

    html += '<p><a href="/rest/tasks/%s/parset">view %s parset</a></p> ' % (task['id'], task['type'])

    props = sorted(task.keys())
    html += '<tr><th>key</th><th>value</th></tr>\n'

    for prop in props:
        html += '<tr><td>%s</td>' % prop

        if prop == 'id':
            html += '<td><a href="/tasks/%s.html">%s</a></td> ' % (task[prop], task[prop])
        elif prop == 'predecessor_ids' or prop == 'successor_ids':
            ids = task[prop]
            if ids:
                html += '<td>%s</td> ' % ', '.join('<a href="/tasks/%s.html">%s</a>' % (id, id) for id in ids)
            else:
                html += '<td></td> '
        else:
            html += '<td>%s</td> ' % task[prop]

        html += '</tr>'

    html += '</table>\n<br>'

    claims = radb().getResourceClaims(task_ids=[task_id], extended=True, include_properties=True)

    if claims:
        html += '<h1>Claims</h1>'

        for claim in claims:
            html += '<table>'
            for claim_key,claim_value in claim.items():
                if claim_key == 'properties':
                    html += '<tr><td>properties</td><td><table>'
                    if claim_value:
                        propnames = sorted(claim_value[0].keys())
                        html += '<tr>%s</tr>\n' % ''.join('<th>%s</th>' % propname for propname in propnames)
                        for prop in claim_value:
                            html += '<tr>%s</tr>\n' % ''.join('<td>%s</td>' % prop[propname] for propname in propnames)
                    html += '</table></td></tr>'
                elif claim_key == 'saps':
                    html += '<tr><td>saps</td><td><table>'
                    saps = claim_value
                    if saps:
                        sap_keys = ['sap_nr', 'properties']
                        html += '<tr>%s</tr>\n' % ''.join('<th>%s</th>' % sap_key for sap_key in sap_keys)
                        for sap in saps:
                            html += '<tr>'
                            for sap_key in sap_keys:
                                if sap_key == 'properties':
                                    html += '<td><table>'
                                    sap_props = sap[sap_key]
                                    if sap_props:
                                        propnames = sorted(sap_props[0].keys())
                                        html += '<tr>%s</tr>\n' % ''.join('<th>%s</th>' % propname for propname in propnames)
                                        for prop in sap_props:
                                            html += '<tr>%s</tr>\n' % ''.join('<td>%s</td>' % prop[propname] for propname in propnames)
                                    html += '</table></td>'
                                else:
                                    html += '<td>%s</td>' % (sap[sap_key])
                            html += '</tr>'
                    html += '</table></td></tr>'
                else:
                    html += '<tr><td>%s</td><td>%s</td></tr>' % (claim_key,claim_value)
            html += '</table>'
            html += '<br>'

    html += '</body></html>\n'

    return html

@app.route('/rest/tasks/<int:task_id>/resourceclaims.html', methods=['GET'])
@gzipped
def resourceClaimsForTaskHtml(task_id):
    claims = radb().getResourceClaims(task_ids=[task_id], extended=True, include_properties=True)

    if not claims:
        abort(404, 'No resource claims for task %s' % task_id)

    html = '<!DOCTYPE html><html><head><title>Tasks</title><style>table, th, td {border: 1px solid black; border-collapse: collapse; padding: 4px;}</style></head><body><table style="">\n'

    for claim in claims:
        html += '<tr><td>%s</td>' % claim

    html += '</table></body></html>\n'

    return html

@app.route('/tasks/<int:task_id>/log.html', methods=['GET'])
@gzipped
def getTaskLogHtml(task_id):
    task = radb().getTask(task_id)

    cmd = []
    if task['type'] == 'pipeline':
        cmd = ['ssh', 'lofarsys@head01.cep4.control.lofar', 'cat /data/log/pipeline-%s-*.log' % task['otdb_id']]
    else:
        cmd = ['ssh', 'mcu001.control.lofar', 'cat /opt/lofar/var/log/mcu001\\:ObservationControl\\[0\\]\\{%s\\}.log*' % task['otdb_id']]

    logger.info(' '.join(cmd))

    proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    out, err = proc.communicate()
    if proc.returncode == 0:
        return out, 200, {'Content-Type': 'text/plain; charset=utf-8'}
    else:
        return err, 500, {'Content-Type': 'text/plain; charset=utf-8'}

def main():
    # make sure we run in UTC timezone
    import os
    os.environ['TZ'] = 'UTC'

    # Check the invocation arguments
    parser = OptionParser('%prog [options]',
                          description='run the resource assignment editor web service')
    parser.add_option('--webserver_port', dest='webserver_port', type='int', default=7412, help='port number on which to host the webservice, default: %default')
    parser.add_option('-q', '--broker', dest='broker', type='string', default=None, help='Address of the qpid broker, default: localhost')
    parser.add_option('--radb_busname', dest='radb_busname', type='string', default=DEFAULT_RADB_BUSNAME, help='Name of the bus exchange on the qpid broker on which the radbservice listens, default: %default')
    parser.add_option('--radb_servicename', dest='radb_servicename', type='string', default=DEFAULT_RADB_SERVICENAME, help='Name of the radbservice, default: %default')
    parser.add_option('--radb_notification_busname', dest='radb_notification_busname', type='string', default=DEFAULT_RADB_CHANGES_BUSNAME, help='Name of the notification bus exchange on the qpid broker on which the radb notifications are published, default: %default')
    parser.add_option('--radb_notification_subjects', dest='radb_notification_subjects', type='string', default=DEFAULT_RADB_CHANGES_SUBJECTS, help='Subject(s) to listen for on the radb notification bus exchange on the qpid broker, default: %default')
    parser.add_option('--otdb_busname', dest='otdb_busname', type='string', default=DEFAULT_OTDB_SERVICE_BUSNAME, help='Name of the bus exchange on the qpid broker on which the otdbservice listens, default: %default')
    parser.add_option('--otdb_servicename', dest='otdb_servicename', type='string', default=DEFAULT_OTDB_SERVICENAME, help='Name of the otdbservice, default: %default')
    parser.add_option('--otdb_notification_busname', dest='otdb_notification_busname', type='string', default=DEFAULT_OTDB_NOTIFICATION_BUSNAME, help='Name of the notification bus exchange on the qpid broker on which the otdb notifications are published, default: %default')
    parser.add_option('--otdb_notification_subject', dest='otdb_notification_subject', type='string', default=DEFAULT_OTDB_NOTIFICATION_SUBJECT, help='Subject to listen for on the otdb notification bus exchange on the qpid broker, default: %default')
    parser.add_option('--mom_busname', dest='mom_busname', type='string', default=DEFAULT_MOM_BUSNAME, help='Name of the bus exchange on the qpid broker on which the momservice listens, default: %default')
    parser.add_option('--mom_servicename', dest='mom_servicename', type='string', default=DEFAULT_MOM_SERVICENAME, help='Name of the momservice, default: %default')
    parser.add_option('--mom_broker', dest='mom_broker', type='string', default=None, help='Address of the qpid broker for the mom service, default: localhost')
    parser.add_option('--mom_query_busname', dest='mom_query_busname', type='string', default=DEFAULT_MOMQUERY_BUSNAME, help='Name of the bus exchange on the qpid broker on which the momqueryservice listens, default: %default')
    parser.add_option('--mom_query_servicename', dest='mom_query_servicename', type='string', default=DEFAULT_MOMQUERY_SERVICENAME, help='Name of the momqueryservice, default: %default')
    parser.add_option('--cleanup_busname', dest='cleanup_busname', type='string', default=DEFAULT_CLEANUP_BUSNAME, help='Name of the bus exchange on the qpid broker on which the cleanupservice listens, default: %default')
    parser.add_option('--cleanup_servicename', dest='cleanup_servicename', type='string', default=DEFAULT_CLEANUP_SERVICENAME, help='Name of the cleanupservice, default: %default')
    parser.add_option('--storagequery_busname', dest='storagequery_busname', type='string', default=DEFAULT_STORAGEQUERY_BUSNAME, help='Name of the bus exchange on the qpid broker on which the storagequeryservice listens, default: %default')
    parser.add_option('--storagequery_servicename', dest='storagequery_servicename', type='string', default=DEFAULT_STORAGEQUERY_SERVICENAME, help='Name of the storagequeryservice, default: %default')
    parser.add_option('--dm_notification_busname', dest='dm_notification_busname', type='string', default=DEFAULT_DM_NOTIFICATION_BUSNAME, help='Name of the notification bus exchange on the qpid broker on which the data management notifications are published, default: %default')
    parser.add_option('--dm_notification_subjects', dest='dm_notification_subjects', type='string', default=DEFAULT_DM_NOTIFICATION_SUBJECTS, help='Subject(s) to listen for on the data management notification bus exchange on the qpid broker, default: %default')
    parser.add_option('--ingest_notification_busname', dest='ingest_notification_busname', type='string', default=DEFAULT_INGEST_NOTIFICATION_BUSNAME, help='Name of the notification bus exchange on the qpid broker on which the ingest notifications are published, default: %default')
    parser.add_option('--ingest_notification_subjects', dest='ingest_notification_subjects', type='string', default=DEFAULT_INGEST_NOTIFICATION_SUBJECTS, help='Subject(s) to listen for on the ingest notification bus exchange on the qpid broker, default: %default')
    parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging')
    parser.add_option_group(dbcredentials.options_group(parser))
    parser.set_defaults(dbcredentials="RADB")
    (options, args) = parser.parse_args()

    logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
                        level=logging.DEBUG if options.verbose else logging.INFO)

    global _radb_dbcreds
    _radb_dbcreds = dbcredentials.parse_options(options)
    if _radb_dbcreds.database:
        logger.info("Using dbcreds for direct RADB access: %s" % _radb_dbcreds.stringWithHiddenPassword())
    else:
        _radb_dbcreds = None

    global rarpc
    rarpc = RARPC(busname=options.radb_busname, servicename=options.radb_servicename, broker=options.broker)
    global momrpc
    momrpc = MoMRPC(busname=options.mom_busname, servicename=options.mom_servicename, timeout=10, broker=options.broker)
    global otdbrpc
    otdbrpc = OTDBRPC(busname=options.otdb_busname, servicename=options.otdb_servicename, broker=options.broker)
    global curpc
    curpc = CleanupRPC(busname=options.cleanup_busname, servicename=options.cleanup_servicename, broker=options.broker)
    global sqrpc
    sqrpc = StorageQueryRPC(busname=options.storagequery_busname, servicename=options.storagequery_servicename, broker=options.broker)
    global momqueryrpc
    momqueryrpc = MoMQueryRPC(busname=options.mom_query_busname, servicename=options.mom_query_servicename, timeout=10, broker=options.broker)
    global changeshandler
    changeshandler = ChangesHandler(radb_busname=options.radb_notification_busname, radb_subjects=options.radb_notification_subjects,
                                    dm_busname=options.dm_notification_busname, dm_subjects=options.dm_notification_subjects,
                                    otdb_busname=options.otdb_notification_busname, otdb_subject=options.otdb_notification_subject,
                                    ingest_busname=options.ingest_notification_busname, ingest_subjects=options.ingest_notification_subjects,
                                    broker=options.broker, momqueryrpc=momqueryrpc, radbrpc=rarpc, sqrpc=sqrpc)

    with changeshandler, rarpc, otdbrpc, curpc, sqrpc, momrpc, momqueryrpc:
        '''Start the webserver'''
        app.run(debug=options.verbose, threaded=True, host='0.0.0.0', port=options.webserver_port)

if __name__ == '__main__':
    main()