Skip to content
Snippets Groups Projects
Commit 399e6554 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

TMSS-2565: removed tmss_ra_adapter now that MoM/OTDB is not being used for observing anymore :)

parent 9f4bd234
No related branches found
No related tags found
No related merge requests found
# - Create for each LOFAR package a variable containing the absolute path to
# its source directory.
# its source directory.
#
# Generated by gen_LofarPackageList_cmake.sh at ma 5 dec 2022 15:15:08 CET
#
......@@ -203,7 +203,6 @@ if(NOT DEFINED LOFAR_PACKAGE_LIST_INCLUDED)
set(TMSSLTAAdapter_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TMSS/backend/services/lta_adapter)
set(TMSSPostgresListenerService_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TMSS/backend/services/postgres_listener)
set(TMSSPreCalculationsService_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TMSS/backend/services/precalculations)
set(TMSSRAAdapter_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TMSS/backend/services/ra_adapter)
set(TMSSSchedulingService_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TMSS/backend/services/scheduling)
set(TMSSServices_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TMSS/backend/services)
set(TMSSSlackWebhookService_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TMSS/backend/services/slack_webhook)
......
......@@ -8,7 +8,7 @@ FROM ci_base_ubuntu:$BASE_VERSION
RUN echo "Installing packages for TMSS..." && \
apt-get install -y liblog4cplus-2.0.5 liblog4cplus-dev libboost-all-dev libreadline-dev libreadline8 binutils gettext libldap-dev git default-jdk python3-twisted graphviz libaio1 libaio-dev unzip postgresql-client postgresql-server-dev-all postgresql-all libsasl2-dev
# Oracle decided in all its wisdom to not make use of rpm/deb
# So, we're forced to download the Oracle client packages, and configure the paths
# See: https://help.ubuntu.com/community/Oracle%20Instant%20Client
......@@ -22,7 +22,6 @@ COPY SAS/TMSS/upper_constraints.txt tmss_constraints.txt
COPY SAS/TMSS/backend/services/lobster/requirements.txt tmss_lobster.txt
COPY SAS/TMSS/backend/services/ingest_tmss_adapter/requirements.txt tmss_ingest_tmss_adapter.txt
COPY SAS/TMSS/backend/services/ra_adapter/requirements.txt tmss_ra_adapter.txt
COPY SAS/TMSS/backend/services/scheduling/requirements.txt tmss_scheduling.txt
COPY SAS/TMSS/backend/services/slack_webhook/requirements.txt tmss_slack_webhook.txt
COPY SAS/TMSS/backend/services/websocket/requirements.txt tmss_websocket.txt
......@@ -36,7 +35,7 @@ RUN pip3 install astroplan cachetools comet coverage cx_Oracle cython django-aut
python-ldap-test python-qpid-proton pyxb==1.2.5 scipy SimpleWebSocketServer \
swagger-spec-validator testing.postgresql ujson websocket_client xmljson \
-r tmss_lobster.txt -r tmss_ingest_tmss_adapter.txt -r tmss_scheduling.txt \
-r tmss_ra_adapter.txt -r tmss_slack_webhook.txt -r tmss_websocket.txt \
-r tmss_slack_webhook.txt -r tmss_websocket.txt \
-c tmss_constraints.txt
# Note: nodejs now comes with npm, do not install the npm package separately, since that will be taken from the ubuntu repo and is conflicting.
......
......@@ -6,7 +6,6 @@ lofar_add_package(TMSSLobster lobster)
lofar_add_package(TMSSLTAAdapter lta_adapter)
lofar_add_package(TMSSPostgresListenerService postgres_listener)
lofar_add_package(TMSSPreCalculationsService precalculations)
lofar_add_package(TMSSRAAdapter ra_adapter)
lofar_add_package(TMSSSchedulingService scheduling)
lofar_add_package(TMSSSlackWebhookService slack_webhook)
lofar_add_package(TMSSWebSocketService websocket)
......
lofar_package(TMSSRAAdapter 0.1 DEPENDS TMSSClient RATaskSpecifiedService)
add_subdirectory(lib)
add_subdirectory(bin)
lofar_add_bin_scripts(tmss_ra_adapter)
#!/usr/bin/python3
# Copyright (C) 2012-2015 ASTRON (Netherlands Institute for Radio Astronomy)
# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
from lofar.sas.tmss.services.ra_adapter import main
if __name__ == "__main__":
main()
lofar_find_package(PythonInterp 3.4 REQUIRED)
include(PythonInstall)
set(_py_files
ra_adapter.py
)
python_install(${_py_files}
DESTINATION lofar/sas/tmss/services
)
#!/usr/bin/env python3
# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
import logging
import os
from optparse import OptionParser, OptionGroup
logger = logging.getLogger(__name__)
import requests
from dateutil import parser
from datetime import timedelta
import json
from lofar.common import isProductionEnvironment, isTestEnvironment
from lofar.common.util import waitForInterrupt
from lofar.parameterset import parameterset
from lofar.sas.tmss.client.tmssbuslistener import *
from lofar.sas.tmss.client.tmss_http_rest_client import TMSSsession
from lofar.sas.resourceassignment.resourceassigner.rabuslistener import RABusListener, RAEventMessageHandler
from lofar.sas.resourceassignment.resourceassigner.rarpc import RARPC
class TMSSEventMessageHandlerForRASynchronization(TMSSEventMessageHandler):
'''Handle incoming TMSS subtask events, and propagate them into the RA as a reservation upon scheduling/unscheduling an observation subtask'''
def __init__(self, rest_client_creds_id: str="TMSSClient"):
super().__init__(log_event_messages=False)
self._tmss_client = TMSSsession.create_from_dbcreds_for_ldap(rest_client_creds_id)
def start_handling(self):
self._tmss_client.open()
super().start_handling()
def stop_handling(self):
super().stop_handling()
self._tmss_client.close()
def onSubTaskStatusChanged(self, id: int, status:str):
if status in ('scheduled', 'unscheduling'):
subtask = self._tmss_client.get_subtask(id)
if subtask['subtask_type'] == 'observation':
logger.info('Creating RA-reservation-specification for TMSS observation subtask: id=%s status=%s', id, status)
# Lots of magic to create an RA task specification for a reservation.
# The quick and dirty hack here is that we take the observation parset from TMSS
# and just modify the processType/task_type into an RA reservation.
# then just sumbit it to RA, and let it do the reservation-task creation/scheduling/unscheduling,
# and making the claims on the used stations.
parset_txt = self._tmss_client.get_subtask_parset(id)
parset = parameterset.fromString(parset_txt)
parset_dict = parset.dict()
parset_dict['Observation.processType'] = 'RESERVATION' # for RA/OTDB this is a reservation
parset_dict['Observation.processSubtype'] = 'project' # for RA/OTDB this is a reservation of the type project to denote that it is projects/observing relation (instead of maintenance)
ra_spec = {'tmss_id': id,
'task_type': 'reservation', # for RA/OTDB this is a reservation
'task_subtype': 'project', # for RA/OTDB this is a reservation of the type project to denote that it is projects/observing relation (instead of maintenance)
'status': 'prescheduled' if status=='scheduled' else 'approved',
'starttime': parser.parse(subtask['scheduled_start_time'], ignoretz=True),
'endtime': parser.parse(subtask['scheduled_stop_time'], ignoretz=True),
'cluster': subtask['cluster_name'],
'station_requirements': [],
'specification': parset_dict}
with RARPC.create() as rarpc:
rarpc.do_assignment(ra_spec)
class RAEventMessageHandlerForTMSSSynchronization(RAEventMessageHandler):
'''Handle incoming RA task events, and propagate them into TMSS as a reservation upon scheduling/unscheduling an observation task'''
def __init__(self, rest_client_creds_id: str="TMSSClient"):
super().__init__()
self._tmss_client = TMSSsession.create_from_dbcreds_for_ldap(rest_client_creds_id)
def start_handling(self):
self._tmss_client.open()
super().start_handling()
def stop_handling(self):
super().stop_handling()
self._tmss_client.close()
def onTaskApproved(self, task_ids):
self.onTaskApprovedOrScheduled(task_ids, 'approved')
def onTaskScheduled(self, task_ids):
self.onTaskApprovedOrScheduled(task_ids, 'scheduled')
def onTaskApprovedOrScheduled(self, task_ids, status: str):
otdb_id = task_ids.get('otdb_id')
tmss_id = task_ids.get('tmss_id')
if otdb_id is not None and tmss_id is None:
if isProductionEnvironment():
ra_url = 'http://scu001.control.lofar:7412'
elif isTestEnvironment():
ra_url = 'http://scu199.control.lofar:7412'
else:
ra_url = 'http://0.0.0.0:7412'
ra_otdb_task_url = ra_url + '/rest/tasks/otdb/' + str(otdb_id)
ra_task = json.loads(requests.get(ra_otdb_task_url).text).get('task')
if ra_task.get('type') in ('observation', 'reservation'):
reservation_name = "OTDB %s %s %s" % (ra_task['project_name'], ra_task.get('type'), ra_task['otdb_id'])
if status in ('approved', 'prescheduled', 'scheduled'):
# RA status 'approved' means that it is either new, or unscheduled.
# RA status 'prescheduled'/'scheduled' means that it gets scheduled.
# In both cases, check if we know any reservation for this otdb id.
# If so, then the RA task was unscheduled, and we delete the corresponding TMSS reservation(s)
# get exisiting reservations with this specific reservation name as key
# this is hacky and not guaranteed unique, but hey, it's a temporary quick solution while both MoM/OTDB and TMSS are in use.
# the name as key is unique enough until proven otherwise.
existing_reservations = self._tmss_client.get_path_as_json_object('/reservation', {'name': reservation_name})
for reservation in existing_reservations:
logger.info('Deleting TMSS-reservation id=%s \'%s\' for unscheduled RA observation otdb_id=%s', reservation['id'], reservation_name, otdb_id)
self._tmss_client.session.delete(url=reservation['url'])
if status == 'scheduled':
logger.info('Creating TMSS-reservation \'%s\' for RA %s otdb_id=%s', reservation_name, ra_task.get('type'), otdb_id)
ra_claims_url = ra_url + '/rest/tasks/' + str(ra_task['id']) + '/resourceclaims'
ra_claims = json.loads(requests.get(ra_claims_url).text).get('resourceclaims')
station_claims = [c for c in ra_claims if c['resource_type_name']=='rcu']
stations = [c['resource_name'].rstrip('rcu') for c in station_claims]
# create a TMSS reservation
# use the default reservation template, and fill in some details
specifications_doc = self._tmss_client.get_path_as_json_object('/reservation_template/1/default')
specifications_doc['activity']['subject'] = 'system'
specifications_doc['activity']['type'] = 'other'
specifications_doc['activity']['description'] = "OTDB observation %s %s" % (ra_task['project_name'], ra_task['otdb_id'])
specifications_doc['schedulability']['dynamic'] = False
specifications_doc['schedulability']['fixed_time'] = False
specifications_doc['resources']['stations'] = stations
reservation = {
"name": "OTDB %s observation %s" % (ra_task['project_name'], ra_task['otdb_id']),
"description": "A reservation while OTDB observation %s %s %s is observing" % (ra_task['project_name'], ra_task['otdb_id'], ra_task['description']),
"specifications_doc": specifications_doc,
"specifications_template": self._tmss_client.get_full_url_for_path('/reservation_template/1'),
"start_time": (parser.parse(ra_task['starttime'], ignoretz=True) - timedelta(minutes=1)).isoformat(), # add one minute so that TMSS stops early enough before MoM starts
"stop_time": (parser.parse(ra_task['endtime'], ignoretz=True) + timedelta(minutes=1)).isoformat() # add one minute so that TMSS does not start something else too soon after MoM finishes
}
self._tmss_client.post_to_path_and_get_result_as_json_object('/reservation', json_data=reservation)
def create_tmss_buslistener(exchange: str=DEFAULT_BUSNAME, broker: str=DEFAULT_BROKER, rest_client_creds_id: str="TMSSClient"):
return TMSSBusListener(handler_type=TMSSEventMessageHandlerForRASynchronization,
handler_kwargs={'rest_client_creds_id': rest_client_creds_id},
exchange=exchange, broker=broker)
def create_ra_buslistener(exchange: str=DEFAULT_BUSNAME, broker: str=DEFAULT_BROKER, rest_client_creds_id: str="TMSSClient"):
return RABusListener(handler_type=RAEventMessageHandlerForTMSSSynchronization,
handler_kwargs={'rest_client_creds_id': rest_client_creds_id},
exchange=exchange, broker=broker)
def main():
# make sure we run in UTC timezone
os.environ['TZ'] = 'UTC'
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
# Check the invocation arguments
parser = OptionParser('%prog [options]',
description='run the tmss_ra_adapter which continuously synchronizes TMSS and the ResourceAssigner. Each scheduled task in the one creates a reservation in the other.')
group = OptionGroup(parser, 'Messaging options')
group.add_option('-b', '--broker', dest='broker', type='string', default=DEFAULT_BROKER,
help='Address of the message broker, default: %default')
group.add_option('-e', "--exchange", dest="exchange", type="string", default=DEFAULT_BUSNAME,
help="exchange where the RA & TMSS event messages are published. [default: %default]")
parser.add_option_group(group)
group = OptionGroup(parser, 'TMSS options')
parser.add_option_group(group)
group.add_option('-R', '--rest_credentials', dest='rest_credentials', type='string', default='TMSSClient', help='django REST API credentials name, default: %default')
(options, args) = parser.parse_args()
TMSSsession.check_connection_and_exit_on_error(options.rest_credentials)
with create_tmss_buslistener(options.exchange, options.broker, rest_client_creds_id=options.rest_credentials):
with create_ra_buslistener(options.exchange, options.broker, rest_client_creds_id=options.rest_credentials):
waitForInterrupt()
if __name__ == '__main__':
main()
requests # Apache 2
\ No newline at end of file
......@@ -135,29 +135,6 @@ services:
driver: journald
options:
tag: tmss_lta_adapter
ra_adapter:
container_name: tmss_ra_adapter
image: tmss_ra_adapter
build:
context: ./app
dockerfile: Dockerfile
args:
SOURCE_IMAGE: ${SOURCE_IMAGE}
HOME: "/localhome/lofarsys"
restart: unless-stopped
env_file:
- env
environment:
- USER=lofarsys
- HOME=/localhome/lofarsys
command: /bin/bash -c 'source /opt/lofar/lofarinit.sh; exec tmss_ra_adapter'
depends_on:
db_migrate:
condition: service_completed_successfully
logging:
driver: journald
options:
tag: tmss_ra_adapter
ingest_adapter:
container_name: tmss_ingest_adapter
image: tmss_ingest_adapter
......@@ -294,4 +271,4 @@ services:
logging:
driver: journald
options:
tag: tmss_slack_webhook
\ No newline at end of file
tag: tmss_slack_webhook
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment