-
Jorrit Schaap authoredJorrit Schaap authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
webservice.py 45.77 KiB
#!/usr/bin/python
# Copyright (C) 2012-2015 ASTRON (Netherlands Institute for Radio Astronomy)
# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
# $Id$
'''ResourceAssignmentEditor webservice serves a interactive html5 website for
viewing and editing lofar resources.'''
import sys
import os
import time
from optparse import OptionParser
from threading import Condition, Lock, current_thread, Thread
import _strptime
from datetime import datetime
from json import loads as json_loads
import time
import logging
import subprocess
from dateutil import parser, tz
from flask import Flask
from flask import render_template
from flask import request
from flask import abort
from flask import url_for
from lofar.common.flask_utils import gzipped
from lofar.messaging.RPC import RPCException
from lofar.sas.resourceassignment.resourceassignmenteditor.fakedata import *
from lofar.sas.resourceassignment.resourceassignmenteditor.changeshandler import ChangesHandler, CHANGE_DELETE_TYPE
from lofar.sas.datamanagement.common.config import DEFAULT_DM_NOTIFICATION_BUSNAME, DEFAULT_DM_NOTIFICATION_SUBJECTS
from lofar.sas.resourceassignment.database.config import DEFAULT_NOTIFICATION_BUSNAME as DEFAULT_RADB_CHANGES_BUSNAME
from lofar.sas.resourceassignment.database.config import DEFAULT_NOTIFICATION_SUBJECTS as DEFAULT_RADB_CHANGES_SUBJECTS
from lofar.sas.otdb.config import DEFAULT_OTDB_NOTIFICATION_BUSNAME, DEFAULT_OTDB_NOTIFICATION_SUBJECT
from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME as DEFAULT_RADB_BUSNAME
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_SERVICENAME as DEFAULT_RADB_SERVICENAME
from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC
from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME
from lofar.mom.momqueryservice.momrpc import MoMRPC
from lofar.mom.momqueryservice.config import DEFAULT_MOM_BUSNAME, DEFAULT_MOM_SERVICENAME
from lofar.sas.resourceassignment.resourceassignmenteditor.mom import updateTaskMomDetails
from lofar.sas.resourceassignment.resourceassignmenteditor.storage import updateTaskStorageDetails
from lofar.sas.datamanagement.cleanup.rpc import CleanupRPC
from lofar.sas.datamanagement.cleanup.config import DEFAULT_BUSNAME as DEFAULT_CLEANUP_BUSNAME
from lofar.sas.datamanagement.cleanup.config import DEFAULT_SERVICENAME as DEFAULT_CLEANUP_SERVICENAME
from lofar.sas.datamanagement.storagequery.rpc import StorageQueryRPC
from lofar.sas.datamanagement.storagequery.config import DEFAULT_BUSNAME as DEFAULT_STORAGEQUERY_BUSNAME
from lofar.sas.datamanagement.storagequery.config import DEFAULT_SERVICENAME as DEFAULT_STORAGEQUERY_SERVICENAME
from lofar.sas.otdb.otdbrpc import OTDBRPC
from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICE_BUSNAME, DEFAULT_OTDB_SERVICENAME
from lofar.lta.ingest.common.config import DEFAULT_INGEST_NOTIFICATION_BUSNAME
from lofar.lta.ingest.common.config import DEFAULT_INGEST_NOTIFICATION_SUBJECTS
from lofar.common import isProductionEnvironment, isTestEnvironment
from lofar.common.util import humanreadablesize
from lofar.common import dbcredentials
from lofar.sas.resourceassignment.database.radb import RADatabase
logger = logging.getLogger(__name__)
def asDatetime(isoString):
if isoString[-1] == 'Z':
isoString = isoString[:-1]
if isoString[-4] == '.':
isoString += '000'
return datetime.strptime(isoString, '%Y-%m-%dT%H:%M:%S.%f')
def asIsoFormat(timestamp):
return datetime.strftime(timestamp, '%Y-%m-%dT%H:%M:%S.%fZ')
__root_path = os.path.dirname(os.path.realpath(__file__))
'''The flask webservice app'''
app = Flask('Scheduler',
instance_path=__root_path,
template_folder=os.path.join(__root_path, 'templates'),
static_folder=os.path.join(__root_path, 'static'),
instance_relative_config=True)
# Load the default configuration
app.config.from_object('lofar.sas.resourceassignment.resourceassignmenteditor.config.default')
try:
import ujson
def convertDictDatetimeValuesToString(obj):
'''recursively convert all string values in the dict to buffer'''
if isinstance(obj, list):
return [convertDictDatetimeValuesToString(x) if (isinstance(x, dict) or isinstance(x, list)) else x for x in obj]
return dict( (k, convertDictDatetimeValuesToString(v) if (isinstance(v, dict) or isinstance(v, list)) else asIsoFormat(v) if isinstance(v, datetime) else v) for k,v in obj.items())
def jsonify(obj):
'''faster implementation of flask.json.jsonify using ultrajson and the above datetime->string convertor'''
json_str = ujson.dumps(dict(convertDictDatetimeValuesToString(obj)))
return app.response_class(json_str, mimetype='application/json')
except:
from flask.json import jsonify
from flask.json import JSONEncoder
class CustomJSONEncoder(JSONEncoder):
def default(self, obj):
try:
if isinstance(obj, datetime):
return asIsoFormat(obj)
iterable = iter(obj)
except TypeError:
pass
else:
return list(iterable)
return JSONEncoder.default(self, obj)
app.json_encoder = CustomJSONEncoder
rarpc = None
momrpc = None
otdbrpc = None
curpc = None
sqrpc = None
momqueryrpc = None
changeshandler = None
_radb_pool = {}
_radb_pool_lock = Lock()
_radb_dbcreds = None
def radb():
global _radb_pool, _radb_pool_lock
if _radb_dbcreds:
with _radb_pool_lock:
thread = current_thread()
tid = thread.ident
now = datetime.utcnow()
if tid not in _radb_pool:
logger.info('creating radb connection for thread %s', tid)
_radb_pool[tid] = { 'connection': RADatabase(dbcreds=_radb_dbcreds, log_queries=True),
'last_used': now }
thread_conn_obj = _radb_pool[tid]
thread_conn_obj['last_used'] = now
threshold = timedelta(minutes=5)
obsolete_connections_tids = [tid for tid,tco in _radb_pool.items() if now - tco['last_used'] > threshold]
for tid in obsolete_connections_tids:
logger.info('deleting radb connection for thread %s', tid)
del _radb_pool[tid]
return thread_conn_obj['connection']
return rarpc
@app.route('/')
@app.route('/index.htm')
@app.route('/index.html')
@gzipped
def index():
'''Serves the ResourceAssignmentEditor's index page'''
return render_template('index.html', title='Scheduler')
@app.route('/projects')
@app.route('/projects.htm')
@app.route('/projects.html')
@gzipped
def projects():
return render_template('projects.html', title='Projects')
@app.route('/rest/config')
@gzipped
def config():
config = {'mom_base_url':'',
'lta_base_url':'',
'inspection_plots_base_url':'https://proxy.lofar.eu/inspect/HTML/',
'sky_view_base_url':'http://dop344.astron.nl:5000/uvis/id'}
if isProductionEnvironment():
config['mom_base_url'] = 'https://lofar.astron.nl/mom3'
config['lta_base_url'] = 'http://lofar.target.rug.nl/'
elif isTestEnvironment():
config['mom_base_url'] = 'http://lofartest.control.lofar:8080/mom3'
config['lta_base_url'] = 'http://lofar-test.target.rug.nl/'
return jsonify({'config': config})
@app.route('/rest/resources')
@gzipped
def resources():
result = radb().getResources(include_availability=True)
return jsonify({'resources': result})
@app.route('/rest/resources/<int:resource_id>')
@gzipped
def resource(resource_id):
result = radb().getResources(resource_ids=[resource_id], include_availability=True)
if result:
return jsonify(result[0])
return jsonify({})
@app.route('/rest/resources/<int:resource_id>/resourceclaims')
@gzipped
def resourceclaimsForResource(resource_id):
return resourceclaimsForResourceFromUntil(resource_id, None, None)
@app.route('/rest/resources/<int:resource_id>/resourceclaims/<string:fromTimestamp>')
@gzipped
def resourceclaimsForResourceFrom(resource_id, fromTimestamp=None):
return resourceclaimsForResourceFromUntil(resource_id, fromTimestamp, None)
@app.route('/rest/resources/<int:resource_id>/resourceclaims/<string:fromTimestamp>/<string:untilTimestamp>')
@gzipped
def resourceclaimsForResourceFromUntil(resource_id, fromTimestamp=None, untilTimestamp=None):
if fromTimestamp and isinstance(fromTimestamp, basestring):
fromTimestamp = asDatetime(fromTimestamp)
if untilTimestamp and isinstance(untilTimestamp, basestring):
untilTimestamp = asDatetime(untilTimestamp)
claims = radb().getResourceClaims(lower_bound=fromTimestamp,
upper_bound=untilTimestamp,
resource_ids=[resource_id],
extended=False,
include_properties=True)
return jsonify({'resourceclaims': claims})
@app.route('/rest/resourcegroups')
@gzipped
def resourcegroups():
result = radb().getResourceGroups()
return jsonify({'resourcegroups': result})
@app.route('/rest/resourcegroupmemberships')
@gzipped
def resourcegroupsmemberships():
result = radb().getResourceGroupMemberships()
return jsonify({'resourcegroupmemberships': result})
@app.route('/rest/resourceclaims')
def resourceclaims():
return resourceclaimsFromUntil(None, None)
@app.route('/rest/resourceclaims/<string:fromTimestamp>')
def resourceclaimsFrom(fromTimestamp=None):
return resourceclaimsFromUntil(fromTimestamp, None)
@app.route('/rest/resourceclaims/<string:fromTimestamp>/<string:untilTimestamp>')
@gzipped
def resourceclaimsFromUntil(fromTimestamp=None, untilTimestamp=None):
if fromTimestamp and isinstance(fromTimestamp, basestring):
fromTimestamp = asDatetime(fromTimestamp)
if untilTimestamp and isinstance(untilTimestamp, basestring):
untilTimestamp = asDatetime(untilTimestamp)
claims = radb().getResourceClaims(lower_bound=fromTimestamp, upper_bound=untilTimestamp, include_properties=True)
return jsonify({'resourceclaims': claims})
@app.route('/rest/resourceusages')
@gzipped
def resourceUsages():
return resourceUsagesFromUntil(None, None)
@app.route('/rest/resourceusages/<string:fromTimestamp>/<string:untilTimestamp>')
@gzipped
def resourceUsagesFromUntil(fromTimestamp=None, untilTimestamp=None):
if fromTimestamp and isinstance(fromTimestamp, basestring):
fromTimestamp = asDatetime(fromTimestamp)
if untilTimestamp and isinstance(untilTimestamp, basestring):
untilTimestamp = asDatetime(untilTimestamp)
result = radb().getResourceUsages(lower_bound=fromTimestamp, upper_bound=untilTimestamp)
return jsonify({'resourceusages': result})
@app.route('/rest/resources/<int:resource_id>/usages', methods=['GET'])
@app.route('/rest/resourceusages/<int:resource_id>', methods=['GET'])
@gzipped
def resourceUsagesForResource(resource_id):
return resourceUsagesForResourceFromUntil(resource_id, None, None)
@app.route('/rest/resources/<int:resource_id>/usages/<string:fromTimestamp>/<string:untilTimestamp>', methods=['GET'])
@app.route('/rest/resourceusages/<int:resource_id>/<string:fromTimestamp>/<string:untilTimestamp>', methods=['GET'])
@gzipped
def resourceUsagesForResourceFromUntil(resource_id, fromTimestamp=None, untilTimestamp=None):
if fromTimestamp and isinstance(fromTimestamp, basestring):
fromTimestamp = asDatetime(fromTimestamp)
if untilTimestamp and isinstance(untilTimestamp, basestring):
untilTimestamp = asDatetime(untilTimestamp)
result = radb().getResourceUsages(resource_ids=[resource_id], lower_bound=fromTimestamp, upper_bound=untilTimestamp)
return jsonify({'resourceusages': result})
@app.route('/rest/tasks/<int:task_id>/resourceusages', methods=['GET'])
@gzipped
def resourceUsagesForTask(task_id):
result = radb().getResourceUsages(task_ids=[task_id])
return jsonify({'resourceusages': result})
@app.route('/rest/tasks/<int:task_id>/resourceclaims', methods=['GET'])
@gzipped
def resourceClaimsForTask(task_id):
result = radb().getResourceClaims(task_ids=[task_id], extended=True, include_properties=True)
return jsonify({'resourceclaims': result})
@app.route('/rest/tasks')
def getTasks():
return getTasksFromUntil(None, None)
@app.route('/rest/tasks/<string:fromTimestamp>')
def getTasksFrom(fromTimestamp):
return getTasksFromUntil(fromTimestamp, None)
@app.route('/rest/tasks/<string:fromTimestamp>/<string:untilTimestamp>')
@gzipped
def getTasksFromUntil(fromTimestamp=None, untilTimestamp=None):
if fromTimestamp and isinstance(fromTimestamp, basestring):
fromTimestamp = asDatetime(fromTimestamp)
if untilTimestamp and isinstance(untilTimestamp, basestring):
untilTimestamp = asDatetime(untilTimestamp)
tasks = radb().getTasks(fromTimestamp, untilTimestamp)
updateTaskDetails(tasks)
return jsonify({'tasks': tasks})
def updateTaskDetails(tasks):
#update the mom details and the storage details in parallel
t1 = Thread(target=updateTaskMomDetails, args=(tasks, momqueryrpc))
t2 = Thread(target=updateTaskStorageDetails, args=(tasks, sqrpc, curpc))
t1.daemon = True
t2.daemon = True
t1.start()
t2.start()
#wait for mom details thread to finish
t1.join()
#task details (such as name/description) from MoM are done
#get extra details on reserved resources for reservations (while the storage details still run in t2)
reservationTasks = [t for t in tasks if t['type'] == 'reservation']
if reservationTasks:
reservationClaims = radb().getResourceClaims(task_ids=[t['id'] for t in reservationTasks], extended=True, include_properties=False)
task2claims = {}
for claim in reservationClaims:
if claim['task_id'] not in task2claims:
task2claims[claim['task_id']] = []
task2claims[claim['task_id']].append(claim)
for task in reservationTasks:
claims = task2claims.get(task['id'], [])
task['name'] = ', '.join(c['resource_name'] for c in claims)
task['description'] = 'Reservation on ' + task['name']
#wait for storage details thread to finish
t2.join()
@app.route('/rest/tasks/<int:task_id>', methods=['GET'])
@gzipped
def getTask(task_id):
try:
task = radb().getTask(task_id)
if not task:
abort(404)
task['name'] = 'Task %d' % task['id']
updateTaskDetails([task])
return jsonify({'task': task})
except Exception as e:
abort(404)
return jsonify({'task': None})
@app.route('/rest/tasks/otdb/<int:otdb_id>', methods=['GET'])
@gzipped
def getTaskByOTDBId(otdb_id):
try:
task = radb().getTask(otdb_id=otdb_id)
if not task:
abort(404)
task['name'] = 'Task %d' % task['id']
updateTaskDetails([task])
return jsonify({'task': task})
except Exception as e:
abort(404)
return jsonify({'task': None})
@app.route('/rest/tasks/mom/<int:mom_id>', methods=['GET'])
@gzipped
def getTaskByMoMId(mom_id):
try:
task = radb().getTask(mom_id=mom_id)
if not task:
abort(404)
task['name'] = 'Task %d' % task['id']
updateTaskDetails([task])
return jsonify({'task': task})
except Exception as e:
abort(404)
return jsonify({'task': None})
@app.route('/rest/tasks/mom/group/<int:mom_group_id>', methods=['GET'])
@gzipped
def getTasksByMoMGroupId(mom_group_id):
try:
mom_ids = momqueryrpc.getTaskIdsInGroup(mom_group_id)[str(mom_group_id)]
tasks = radb().getTasks(mom_ids=mom_ids)
updateTaskDetails(tasks)
return jsonify({'tasks': tasks})
except Exception as e:
abort(404)
@app.route('/rest/tasks/mom/parentgroup/<int:mom_parent_group_id>', methods=['GET'])
@gzipped
def getTasksByMoMParentGroupId(mom_parent_group_id):
try:
mom_ids = momqueryrpc.getTaskIdsInParentGroup(mom_parent_group_id)[str(mom_parent_group_id)]
tasks = radb().getTasks(mom_ids=mom_ids)
updateTaskDetails(tasks)
return jsonify({'tasks': tasks})
except Exception as e:
abort(404)
@app.route('/rest/tasks/<int:task_id>', methods=['PUT'])
def putTask(task_id):
if 'Content-Type' in request.headers and \
request.headers['Content-Type'].startswith('application/json'):
try:
updatedTask = json_loads(request.data)
if task_id != int(updatedTask['id']):
abort(404, 'task_id in url is not equal to id in request.data')
#check if task is known
task = radb().getTask(task_id)
if not task:
abort(404, "unknown task %s" % str(updatedTask))
# first handle start- endtimes...
if 'starttime' in updatedTask or 'endtime' in updatedTask:
logger.info('starttime or endtime in updatedTask: %s', updatedTask)
if isProductionEnvironment():
abort(403, 'Editing of %s of tasks by users is not yet approved' % (time,))
#update dict for otdb spec
spec_update = {}
for timeprop in ['starttime', 'endtime']:
if timeprop in updatedTask:
try:
updatedTask[timeprop] = asDatetime(updatedTask[timeprop])
except ValueError:
abort(400, 'timestamp not in iso format: ' + updatedTask[timeprop])
otdb_key = 'LOFAR.ObsSW.Observation.' + ('startTime' if timeprop == 'starttime' else 'stopTime')
spec_update[otdb_key] = updatedTask[timeprop].strftime('%Y-%m-%d %H:%M:%S')
#update timestamps in both otdb and radb
otdbrpc.taskSetSpecification(task['otdb_id'], spec_update)
# update the task's (and its claims) start/endtime
# do not update the tasks status directly via the radb. See few lines below. task status is routed via otdb (and then ends up in radb automatically)
# it might be that editing the start/end time results in a (rabd)task status update (for example to 'conflict' due to conflicting claims)
# that's ok, since we'll update the status to the requested status later via otdb (see few lines below)
radb().updateTaskAndResourceClaims(task_id,
starttime=updatedTask.get('starttime'),
endtime=updatedTask.get('endtime'))
# ...then, handle status update which might trigger resource assignment,
# for which the above updated times are needed
if 'status' in updatedTask:
try:
#update status in otdb only
#the status change will propagate automatically into radb via other services (by design)
otdbrpc.taskSetStatus(task['otdb_id'], updatedTask['status'])
#we expect the status in otdb/radb to eventually become what we asked for...
expected_statuses = set([updatedTask['status']])
#except for the prescheduled status, because then the resource assigner tries
#to schedule the task, and it will end up in either 'scheduled', 'conflict', 'error' state.
if updatedTask['status'] == 'prescheduled':
expected_statuses = set(['scheduled', 'conflict', 'error'])
#block until radb and mom task status are equal to the expected_statuses (with timeout)
start_wait = datetime.utcnow()
while True:
task = radb().getTask(otdb_id=task['otdb_id'])
otdb_status = otdbrpc.taskGetStatus(task['otdb_id'])
logger.info('waiting for otdb/radb task status to be in [%s].... otdb:%s radb:%s',
', '.join(expected_statuses), otdb_status, task['status'])
if (task['status'] in expected_statuses and
otdb_status in expected_statuses):
break
if datetime.utcnow() - start_wait > timedelta(seconds=10):
break
time.sleep(0.2)
except RPCException as e:
if 'does not exist' in e.message:
# task does not exist (anymore) in otdb
#so remove it from radb as well (with cascading deletes on specification)
logger.warn('task with otdb_id %s does not exist anymore in OTDB. removing task radb_id %s from radb', task['otdb_id'], task['id'])
radb().deleteSpecification(task['specification_id'])
if 'data_pinned' in updatedTask:
task = radb().getTask(task_id)
if not task:
abort(404, "unknown task %s" % str(updatedTask))
curpc.setTaskDataPinned(task['otdb_id'], updatedTask['data_pinned'])
return "", 204
except Exception as e:
logger.error(e)
abort(404, str(e))
abort(406)
@app.route('/rest/tasks/<int:task_id>/cleanup', methods=['DELETE'])
def cleanupTaskData(task_id):
try:
delete_params = {}
if 'Content-Type' in request.headers and (request.headers['Content-Type'].startswith('application/json') or request.headers['Content-Type'].startswith('text/plain')):
delete_params = json_loads(request.data)
task = radb().getTask(task_id)
if not task:
abort(404, 'No such task (id=%s)' % task_id)
logger.info("cleanup task data id=%s otdb_id=%s delete_params=%s", task_id, task['otdb_id'], delete_params)
result = curpc.removeTaskData(task['otdb_id'],
delete_is=delete_params.get('delete_is', True),
delete_cs=delete_params.get('delete_cs', True),
delete_uv=delete_params.get('delete_uv', True),
delete_im=delete_params.get('delete_im', True),
delete_img=delete_params.get('delete_img', True),
delete_pulp=delete_params.get('delete_pulp', True),
delete_scratch=delete_params.get('delete_scratch', True),
force=delete_params.get('force_delete', False))
logger.info(result)
return jsonify(result)
except Exception as e:
abort(500)
@app.route('/rest/tasks/<int:task_id>/datapath', methods=['GET'])
@gzipped
def getTaskDataPath(task_id):
try:
task = radb().getTask(task_id)
if not task:
abort(404, 'No such task (id=%s)' % task_id)
result = sqrpc.getPathForOTDBId(task['otdb_id'])
except Exception as e:
abort(500, str(e))
if result['found']:
return jsonify({'datapath': result['path']})
abort(404, result['message'] if result and 'message' in result else '')
@app.route('/rest/tasks/otdb/<int:otdb_id>/diskusage', methods=['GET'])
@gzipped
def getTaskDiskUsageByOTDBId(otdb_id):
try:
result = sqrpc.getDiskUsageForTaskAndSubDirectories(otdb_id=otdb_id, force_update=request.args.get('force')=='true')
except Exception as e:
abort(500, str(e))
if result['found']:
return jsonify(result)
abort(404, result['message'] if result and 'message' in result else '')
@app.route('/rest/tasks/<int:task_id>/diskusage', methods=['GET'])
@gzipped
def getTaskDiskUsage(task_id):
try:
result = sqrpc.getDiskUsageForTaskAndSubDirectories(radb_id=task_id, force_update=request.args.get('force')=='true')
except Exception as e:
abort(500, str(e))
if result['found']:
return jsonify(result)
abort(404, result['message'] if result and 'message' in result else '')
@app.route('/rest/tasks/<int:task_id>/copy', methods=['PUT'])
def copyTask(task_id):
if isProductionEnvironment():
abort(403, 'Copying of tasks is by users is not yet approved')
try:
task = radb().getTask(task_id)
if not task:
logger.error('Could not find task %s' % task_id)
abort(404, 'Could not find task %s' % task_id)
mom2id = task['mom_id']
new_task_mom2id = momrpc.copyTask(mom2id=mom2id)
return jsonify({'copied':True, 'new_task_mom2id':new_task_mom2id, 'old_task_mom2id':mom2id})
except Exception as e:
logger.error(e)
abort(404, str(e))
return jsonify({'copied':False})
@app.route('/rest/tasks/<int:task_id>/parset', methods=['GET'])
@gzipped
def getParset(task_id):
try:
task = radb().getTask(task_id)
if not task:
abort(404)
return getParsetByOTDBId(task['otdb_id'])
except Exception as e:
abort(404)
abort(404)
@app.route('/rest/tasks/otdb/<int:otdb_id>/parset', methods=['GET'])
@gzipped
def getParsetByOTDBId(otdb_id):
try:
logger.info('getParsetByOTDBId(%s)', otdb_id)
parset = otdbrpc.taskGetSpecification(otdb_id=otdb_id)['specification']
return '\n'.join(['%s=%s' % (k,parset[k]) for k in sorted(parset.keys())]), 200, {'Content-Type': 'text/plain; charset=utf-8'}
except Exception as e:
abort(404)
abort(404)
@app.route('/rest/tasks/<int:task_id>/resourceclaims')
@gzipped
def taskResourceClaims(task_id):
return jsonify({'taskResourceClaims': radb().getResourceClaims(task_ids=[task_id], include_properties=True)})
@app.route('/rest/tasktypes')
@gzipped
def tasktypes():
result = radb().getTaskTypes()
result = sorted(result, key=lambda q: q['id'])
return jsonify({'tasktypes': result})
@app.route('/rest/taskstatustypes')
@gzipped
def getTaskStatusTypes():
result = radb().getTaskStatuses()
result = sorted(result, key=lambda q: q['id'])
return jsonify({'taskstatustypes': result})
@app.route('/rest/resourcetypes')
@gzipped
def resourcetypes():
result = radb().getResourceTypes()
result = sorted(result, key=lambda q: q['id'])
return jsonify({'resourcetypes': result})
@app.route('/rest/resourceclaimpropertytypes')
@gzipped
def resourceclaimpropertytypes():
result = radb().getResourceClaimPropertyTypes()
result = sorted(result, key=lambda q: q['id'])
return jsonify({'resourceclaimpropertytypes': result})
@app.route('/rest/projects')
@gzipped
def getProjects():
projects = []
try:
projects = momqueryrpc.getProjects()
projects = [x for x in projects if x['status_id'] in [1, 7]]
for project in projects:
project['mom_id'] = project.pop('mom2id')
except Exception as e:
logger.error(e)
projects.append({'name':'<unknown>', 'mom_id':-99, 'description': 'Container project for tasks for which we could not find a MoM project'})
projects.append({'name':'OTDB Only', 'mom_id':-98, 'description': 'Container project for tasks which exists only in OTDB'})
projects.append({'name':'Reservations', 'mom_id':-97, 'description': 'Container project for reservation tasks'})
return jsonify({'momprojects': projects})
@app.route('/rest/projects/<int:project_mom2id>')
@gzipped
def getProject(project_mom2id):
try:
projects = momqueryrpc.getProjects()
project = next(x for x in projects if x['mom2id'] == project_mom2id)
return jsonify({'momproject': project})
except StopIteration as e:
logger.error(e)
abort(404, "No project with mom2id %s" % project_mom2id)
except Exception as e:
logger.error(e)
abort(404, str(e))
@app.route('/rest/projects/<int:project_mom2id>/tasks')
@gzipped
def getProjectTasks(project_mom2id):
return getProjectTasksFromUntil(project_mom2id, None, None)
@app.route('/rest/projects/<int:project_mom2id>/tasks/<string:fromTimestamp>/<string:untilTimestamp>')
@gzipped
def getProjectTasksFromUntil(project_mom2id, fromTimestamp=None, untilTimestamp=None):
try:
if fromTimestamp and isinstance(fromTimestamp, basestring):
fromTimestamp = asDatetime(fromTimestamp)
if untilTimestamp and isinstance(untilTimestamp, basestring):
untilTimestamp = asDatetime(untilTimestamp)
task_mom2ids = momqueryrpc.getProjectTaskIds(project_mom2id)['task_mom2ids']
tasks = radb().getTasks(mom_ids=task_mom2ids, lower_bound=fromTimestamp, upper_bound=untilTimestamp)
updateTaskDetails(tasks)
return jsonify({'tasks': tasks})
except Exception as e:
logger.error(e)
abort(404, str(e))
@app.route('/rest/projects/<int:project_mom2id>/taskstimewindow')
@gzipped
def getProjectTasksTimeWindow(project_mom2id):
try:
task_mom2ids = momqueryrpc.getProjectTaskIds(project_mom2id)['task_mom2ids']
timewindow = radb().getTasksTimeWindow(mom_ids=task_mom2ids)
return jsonify(timewindow)
except Exception as e:
logger.error(e)
abort(404, str(e))
@app.route('/rest/projects/<int:project_mom2id>/diskusage')
@gzipped
def getProjectDiskUsageById(project_mom2id):
try:
project = momqueryrpc.getProject(project_mom2id=project_mom2id)
return getProjectDiskUsageByName(project['name'])
except StopIteration as e:
logger.error(e)
abort(404, "No project with mom2id %s" % project_mom2id)
except Exception as e:
logger.error(e)
abort(404, str(e))
@app.route('/rest/projects/<string:project_name>/diskusage')
@gzipped
def getProjectDiskUsageByName(project_name):
try:
result = sqrpc.getDiskUsageForProjectDirAndSubDirectories(project_name=project_name, force_update=request.args.get('force')=='true')
return jsonify(result)
except Exception as e:
logger.error(e)
abort(404, str(e))
@app.route('/rest/projects/diskusage')
@gzipped
def getProjectsDiskUsage():
try:
result = sqrpc.getDiskUsageForProjectsDirAndSubDirectories(force_update=request.args.get('force')=='true')
return jsonify(result)
except Exception as e:
logger.error(e)
abort(404, str(e))
@app.route('/rest/momobjectdetails/<int:mom2id>')
@gzipped
def getMoMObjectDetails(mom2id):
details = momqueryrpc.getObjectDetails(mom2id)
details = details.values()[0] if details else None
if details:
details['project_mom_id'] = details.pop('project_mom2id')
details['object_mom_id'] = details.pop('object_mom2id')
return jsonify({'momobjectdetails': details})
@app.route('/rest/updates/<int:sinceChangeNumber>')
@gzipped
def getUpdateEventsSince(sinceChangeNumber):
changesSince = changeshandler.getChangesSince(sinceChangeNumber)
return jsonify({'changes': changesSince})
@app.route('/rest/mostRecentChangeNumber')
@gzipped
def getMostRecentChangeNumber():
mrcn = changeshandler.getMostRecentChangeNumber()
return jsonify({'mostRecentChangeNumber': mrcn})
@app.route('/rest/updates')
def getUpdateEvents():
return getUpdateEventsSince(-1L)
@app.route('/rest/logEvents')
@gzipped
def getMostRecentLogEvents():
return getLogEventsSince(datetime.utcnow() - timedelta(hours=6))
@app.route('/rest/logEvents/<string:fromTimestamp>')
@gzipped
def getLogEventsSince(fromTimestamp=None):
if not fromTimestamp:
fromTimestamp = datetime.utcnow() - timedelta(hours=6)
eventsSince = changeshandler.getEventsSince(fromTimestamp)
return jsonify({'logEvents': eventsSince})
@app.route('/rest/lofarTime')
@gzipped
def getLofarTime():
return jsonify({'lofarTime': asIsoFormat(datetime.utcnow())})
#ugly method to generate html tables for all tasks
@app.route('/tasks.html')
@gzipped
def getTasksHtml():
tasks = radb().getTasks()
if not tasks:
abort(404)
updateTaskDetails(tasks)
html = '<!DOCTYPE html><html><head><title>Tasks</title><style>table, th, td {border: 1px solid black; border-collapse: collapse; padding: 4px;}</style></head><body><table style="width:100%">\n'
props = sorted(tasks[0].keys())
html += '<tr>%s</tr>\n' % ''.join('<th>%s</th>' % prop for prop in props)
for task in tasks:
html += '<tr>'
for prop in props:
if prop in task:
if prop == 'id':
html += '<td><a href="/rest/tasks/%s.html">%s</a></td> ' % (task[prop], task[prop])
else:
html += '<td>%s</td> ' % task[prop]
html += '</tr>\n'
html += '</table></body></html>\n'
return html
#ugly method to generate html tables for the task and it's claims
@app.route('/tasks/<int:task_id>.html', methods=['GET'])
@gzipped
def getTaskHtml(task_id):
task = radb().getTask(task_id)
if not task:
abort(404, 'No such task %s' % task_id)
task['name'] = 'Task %d' % task['id']
updateTaskDetails([task])
html = '<!DOCTYPE html><html><head><title>Tasks</title><style>table, th, td {border: 1px solid black; border-collapse: collapse; padding: 4px;}</style></head><body><table style="">\n'
html += '<h1>Task %s</h1>' % task_id
html += '<p><a href="/tasks/%s/log.html">%s log</a></p> ' % (task['id'], task['type'])
html += '<p><a href="/rest/tasks/%s/parset">view %s parset</a></p> ' % (task['id'], task['type'])
props = sorted(task.keys())
html += '<tr><th>key</th><th>value</th></tr>\n'
for prop in props:
html += '<tr><td>%s</td>' % prop
if prop == 'id':
html += '<td><a href="/tasks/%s.html">%s</a></td> ' % (task[prop], task[prop])
elif prop == 'predecessor_ids' or prop == 'successor_ids':
ids = task[prop]
if ids:
html += '<td>%s</td> ' % ', '.join('<a href="/tasks/%s.html">%s</a>' % (id, id) for id in ids)
else:
html += '<td></td> '
else:
html += '<td>%s</td> ' % task[prop]
html += '</tr>'
html += '</table>\n<br>'
claims = radb().getResourceClaims(task_ids=[task_id], extended=True, include_properties=True)
if claims:
html += '<h1>Claims</h1>'
for claim in claims:
html += '<table>'
for claim_key,claim_value in claim.items():
if claim_key == 'properties':
html += '<tr><td>properties</td><td><table>'
if claim_value:
propnames = sorted(claim_value[0].keys())
html += '<tr>%s</tr>\n' % ''.join('<th>%s</th>' % propname for propname in propnames)
for prop in claim_value:
html += '<tr>%s</tr>\n' % ''.join('<td>%s</td>' % prop[propname] for propname in propnames)
html += '</table></td></tr>'
elif claim_key == 'saps':
html += '<tr><td>saps</td><td><table>'
saps = claim_value
if saps:
sap_keys = ['sap_nr', 'properties']
html += '<tr>%s</tr>\n' % ''.join('<th>%s</th>' % sap_key for sap_key in sap_keys)
for sap in saps:
html += '<tr>'
for sap_key in sap_keys:
if sap_key == 'properties':
html += '<td><table>'
sap_props = sap[sap_key]
if sap_props:
propnames = sorted(sap_props[0].keys())
html += '<tr>%s</tr>\n' % ''.join('<th>%s</th>' % propname for propname in propnames)
for prop in sap_props:
html += '<tr>%s</tr>\n' % ''.join('<td>%s</td>' % prop[propname] for propname in propnames)
html += '</table></td>'
else:
html += '<td>%s</td>' % (sap[sap_key])
html += '</tr>'
html += '</table></td></tr>'
else:
html += '<tr><td>%s</td><td>%s</td></tr>' % (claim_key,claim_value)
html += '</table>'
html += '<br>'
html += '</body></html>\n'
return html
@app.route('/rest/tasks/<int:task_id>/resourceclaims.html', methods=['GET'])
@gzipped
def resourceClaimsForTaskHtml(task_id):
claims = radb().getResourceClaims(task_ids=[task_id], extended=True, include_properties=True)
if not claims:
abort(404, 'No resource claims for task %s' % task_id)
html = '<!DOCTYPE html><html><head><title>Tasks</title><style>table, th, td {border: 1px solid black; border-collapse: collapse; padding: 4px;}</style></head><body><table style="">\n'
for claim in claims:
html += '<tr><td>%s</td>' % claim
html += '</table></body></html>\n'
return html
@app.route('/tasks/<int:task_id>/log.html', methods=['GET'])
@gzipped
def getTaskLogHtml(task_id):
task = radb().getTask(task_id)
cmd = []
if task['type'] == 'pipeline':
cmd = ['ssh', 'lofarsys@head01.cep4.control.lofar', 'cat /data/log/pipeline-%s-*.log' % task['otdb_id']]
else:
cmd = ['ssh', 'mcu001.control.lofar', 'cat /opt/lofar/var/log/mcu001\\:ObservationControl\\[0\\]\\{%s\\}.log*' % task['otdb_id']]
logger.info(' '.join(cmd))
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = proc.communicate()
if proc.returncode == 0:
return out, 200, {'Content-Type': 'text/plain; charset=utf-8'}
else:
return err, 500, {'Content-Type': 'text/plain; charset=utf-8'}
def main():
# make sure we run in UTC timezone
import os
os.environ['TZ'] = 'UTC'
# Check the invocation arguments
parser = OptionParser('%prog [options]',
description='run the resource assignment editor web service')
parser.add_option('--webserver_port', dest='webserver_port', type='int', default=7412, help='port number on which to host the webservice, default: %default')
parser.add_option('-q', '--broker', dest='broker', type='string', default=None, help='Address of the qpid broker, default: localhost')
parser.add_option('--radb_busname', dest='radb_busname', type='string', default=DEFAULT_RADB_BUSNAME, help='Name of the bus exchange on the qpid broker on which the radbservice listens, default: %default')
parser.add_option('--radb_servicename', dest='radb_servicename', type='string', default=DEFAULT_RADB_SERVICENAME, help='Name of the radbservice, default: %default')
parser.add_option('--radb_notification_busname', dest='radb_notification_busname', type='string', default=DEFAULT_RADB_CHANGES_BUSNAME, help='Name of the notification bus exchange on the qpid broker on which the radb notifications are published, default: %default')
parser.add_option('--radb_notification_subjects', dest='radb_notification_subjects', type='string', default=DEFAULT_RADB_CHANGES_SUBJECTS, help='Subject(s) to listen for on the radb notification bus exchange on the qpid broker, default: %default')
parser.add_option('--otdb_busname', dest='otdb_busname', type='string', default=DEFAULT_OTDB_SERVICE_BUSNAME, help='Name of the bus exchange on the qpid broker on which the otdbservice listens, default: %default')
parser.add_option('--otdb_servicename', dest='otdb_servicename', type='string', default=DEFAULT_OTDB_SERVICENAME, help='Name of the otdbservice, default: %default')
parser.add_option('--otdb_notification_busname', dest='otdb_notification_busname', type='string', default=DEFAULT_OTDB_NOTIFICATION_BUSNAME, help='Name of the notification bus exchange on the qpid broker on which the otdb notifications are published, default: %default')
parser.add_option('--otdb_notification_subject', dest='otdb_notification_subject', type='string', default=DEFAULT_OTDB_NOTIFICATION_SUBJECT, help='Subject to listen for on the otdb notification bus exchange on the qpid broker, default: %default')
parser.add_option('--mom_busname', dest='mom_busname', type='string', default=DEFAULT_MOM_BUSNAME, help='Name of the bus exchange on the qpid broker on which the momservice listens, default: %default')
parser.add_option('--mom_servicename', dest='mom_servicename', type='string', default=DEFAULT_MOM_SERVICENAME, help='Name of the momservice, default: %default')
parser.add_option('--mom_broker', dest='mom_broker', type='string', default=None, help='Address of the qpid broker for the mom service, default: localhost')
parser.add_option('--mom_query_busname', dest='mom_query_busname', type='string', default=DEFAULT_MOMQUERY_BUSNAME, help='Name of the bus exchange on the qpid broker on which the momqueryservice listens, default: %default')
parser.add_option('--mom_query_servicename', dest='mom_query_servicename', type='string', default=DEFAULT_MOMQUERY_SERVICENAME, help='Name of the momqueryservice, default: %default')
parser.add_option('--cleanup_busname', dest='cleanup_busname', type='string', default=DEFAULT_CLEANUP_BUSNAME, help='Name of the bus exchange on the qpid broker on which the cleanupservice listens, default: %default')
parser.add_option('--cleanup_servicename', dest='cleanup_servicename', type='string', default=DEFAULT_CLEANUP_SERVICENAME, help='Name of the cleanupservice, default: %default')
parser.add_option('--storagequery_busname', dest='storagequery_busname', type='string', default=DEFAULT_STORAGEQUERY_BUSNAME, help='Name of the bus exchange on the qpid broker on which the storagequeryservice listens, default: %default')
parser.add_option('--storagequery_servicename', dest='storagequery_servicename', type='string', default=DEFAULT_STORAGEQUERY_SERVICENAME, help='Name of the storagequeryservice, default: %default')
parser.add_option('--dm_notification_busname', dest='dm_notification_busname', type='string', default=DEFAULT_DM_NOTIFICATION_BUSNAME, help='Name of the notification bus exchange on the qpid broker on which the data management notifications are published, default: %default')
parser.add_option('--dm_notification_subjects', dest='dm_notification_subjects', type='string', default=DEFAULT_DM_NOTIFICATION_SUBJECTS, help='Subject(s) to listen for on the data management notification bus exchange on the qpid broker, default: %default')
parser.add_option('--ingest_notification_busname', dest='ingest_notification_busname', type='string', default=DEFAULT_INGEST_NOTIFICATION_BUSNAME, help='Name of the notification bus exchange on the qpid broker on which the ingest notifications are published, default: %default')
parser.add_option('--ingest_notification_subjects', dest='ingest_notification_subjects', type='string', default=DEFAULT_INGEST_NOTIFICATION_SUBJECTS, help='Subject(s) to listen for on the ingest notification bus exchange on the qpid broker, default: %default')
parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging')
parser.add_option_group(dbcredentials.options_group(parser))
parser.set_defaults(dbcredentials="RADB")
(options, args) = parser.parse_args()
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
level=logging.DEBUG if options.verbose else logging.INFO)
global _radb_dbcreds
_radb_dbcreds = dbcredentials.parse_options(options)
if _radb_dbcreds.database:
logger.info("Using dbcreds for direct RADB access: %s" % _radb_dbcreds.stringWithHiddenPassword())
else:
_radb_dbcreds = None
global rarpc
rarpc = RARPC(busname=options.radb_busname, servicename=options.radb_servicename, broker=options.broker)
global momrpc
momrpc = MoMRPC(busname=options.mom_busname, servicename=options.mom_servicename, timeout=10, broker=options.broker)
global otdbrpc
otdbrpc = OTDBRPC(busname=options.otdb_busname, servicename=options.otdb_servicename, broker=options.broker)
global curpc
curpc = CleanupRPC(busname=options.cleanup_busname, servicename=options.cleanup_servicename, broker=options.broker)
global sqrpc
sqrpc = StorageQueryRPC(busname=options.storagequery_busname, servicename=options.storagequery_servicename, broker=options.broker)
global momqueryrpc
momqueryrpc = MoMQueryRPC(busname=options.mom_query_busname, servicename=options.mom_query_servicename, timeout=10, broker=options.broker)
global changeshandler
changeshandler = ChangesHandler(radb_busname=options.radb_notification_busname, radb_subjects=options.radb_notification_subjects,
dm_busname=options.dm_notification_busname, dm_subjects=options.dm_notification_subjects,
otdb_busname=options.otdb_notification_busname, otdb_subject=options.otdb_notification_subject,
ingest_busname=options.ingest_notification_busname, ingest_subjects=options.ingest_notification_subjects,
broker=options.broker, momqueryrpc=momqueryrpc, radbrpc=rarpc, sqrpc=sqrpc)
with changeshandler, rarpc, otdbrpc, curpc, sqrpc, momrpc, momqueryrpc:
'''Start the webserver'''
app.run(debug=options.verbose, threaded=True, host='0.0.0.0', port=options.webserver_port)
if __name__ == '__main__':
main()