diff --git a/CMake/LofarPackageList.cmake b/CMake/LofarPackageList.cmake index 5360203eddbf66d2966112b2449b1afdc13c1f3b..73ada83661880612d153729599f48fd524d057ab 100644 --- a/CMake/LofarPackageList.cmake +++ b/CMake/LofarPackageList.cmake @@ -1,7 +1,7 @@ # - Create for each LOFAR package a variable containing the absolute path to # its source directory. # -# Generated by gen_LofarPackageList_cmake.sh at di 19 sep 2023 12:39:35 CEST +# Generated by gen_LofarPackageList_cmake.sh at vr 26 jan 2024 9:12:52 CET # # ---- DO NOT EDIT ---- # @@ -172,6 +172,9 @@ if(NOT DEFINED LOFAR_PACKAGE_LIST_INCLUDED) set(TMSSSlackWebhookService_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TMSS/backend/services/slack_webhook) set(TMSSWebSocketService_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TMSS/backend/services/websocket) set(TMSSWorkflowService_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TMSS/backend/services/workflow_service) + set(TMSSReportRefreshService_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TMSS/backend/services/report_refresh) + set(TMSSLofar2SiblingService_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TMSS/backend/services/lofar2_siblings) + set(TMSSCopyService_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TMSS/backend/services/copy_service) set(CCU_MAC_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SubSystems/CCU_MAC) set(LCU_MAC_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SubSystems/LCU_MAC) set(MCU_MAC_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SubSystems/MCU_MAC) diff --git a/Docker/lofar-ci/Dockerfile_ci_base_ubuntu b/Docker/lofar-ci/Dockerfile_ci_base_ubuntu index f00c5c30f12a76b7ae6a88b65eb685ffe0e0a5ae..e8bd83c7fc6eae60c54477dea03085297a26b150 100644 --- a/Docker/lofar-ci/Dockerfile_ci_base_ubuntu +++ b/Docker/lofar-ci/Dockerfile_ci_base_ubuntu @@ -22,3 +22,4 @@ ENV SETUPTOOLS_USE_DISTUTILS stdlib RUN apt-get -y install python3-pip && \ pip3 install --upgrade --force-reinstall setuptools + diff --git a/Docker/lofar-ci/Dockerfile_ci_tmss b/Docker/lofar-ci/Dockerfile_ci_tmss index ba33c7ece4386e6f6b2772717da6bb2c006afe5c..5fb33f6b5279f5a8b8767d76eec1b5eccefb0004 100644 --- a/Docker/lofar-ci/Dockerfile_ci_tmss +++ b/Docker/lofar-ci/Dockerfile_ci_tmss @@ -7,7 +7,7 @@ ARG BASE_VERSION=latest FROM ci_base_ubuntu:$BASE_VERSION RUN echo "Installing packages for TMSS..." && \ - apt-get install -y liblog4cplus-2.0.5 liblog4cplus-dev libboost-all-dev libreadline-dev libreadline8 binutils gettext libldap-dev git default-jdk python3-twisted graphviz libaio1 libaio-dev unzip postgresql-client postgresql-server-dev-all postgresql-all libsasl2-dev + apt-get update && apt-get install --fix-missing -y liblog4cplus-2.0.5 liblog4cplus-dev libboost-all-dev libreadline-dev libreadline8 binutils gettext libldap-dev git default-jdk python3-twisted graphviz libaio1 libaio-dev unzip postgresql-client postgresql-server-dev-all postgresql-all libsasl2-dev rsync # Oracle decided in all its wisdom to not make use of rpm/deb # So, we're forced to download the Oracle client packages, and configure the paths diff --git a/SAS/TMSS/backend/services/CMakeLists.txt b/SAS/TMSS/backend/services/CMakeLists.txt index 1b2e6dd08f60fa7c43fc28f1453670dbddb47d89..69ca5bf227465a09e4baa799cd99eed1c63ec23e 100644 --- a/SAS/TMSS/backend/services/CMakeLists.txt +++ b/SAS/TMSS/backend/services/CMakeLists.txt @@ -12,4 +12,5 @@ lofar_add_package(TMSSWebSocketService websocket) lofar_add_package(TMSSWorkflowService workflow_service) lofar_add_package(TMSSReportRefreshService report_refresh) lofar_add_package(TMSSLofar2SiblingService lofar2_siblings) +lofar_add_package(TMSSCopyService copy_service) diff --git a/SAS/TMSS/backend/services/copy_service/CMakeLists.txt b/SAS/TMSS/backend/services/copy_service/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..635b78ba216924b7eeed49e8db941be85f90b81a --- /dev/null +++ b/SAS/TMSS/backend/services/copy_service/CMakeLists.txt @@ -0,0 +1,20 @@ +lofar_package(TMSSCopyService 0.1 DEPENDS TMSSClient PyCommon PyMessaging) +lofar_find_package(PythonInterp 3.4 REQUIRED) + +include(PythonInstall) + +set(_py_files + copy_service.py +) + +python_install(${_py_files} + DESTINATION lofar/sas/tmss/services +) + +lofar_add_bin_scripts(tmss_copy_service) + +if(BUILD_TESTING) + include(LofarCTest) + + lofar_add_test(t_copy_service) +endif() diff --git a/SAS/TMSS/backend/services/copy_service/copy_service.py b/SAS/TMSS/backend/services/copy_service/copy_service.py new file mode 100755 index 0000000000000000000000000000000000000000..230e508f83b16b0eb26457a3c582342e4723c540 --- /dev/null +++ b/SAS/TMSS/backend/services/copy_service/copy_service.py @@ -0,0 +1,133 @@ +#!/usr/bin/env python3 + +# Copyright (C) 2012-2015 ASTRON (Netherlands Institute for Radio Astronomy) +# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands +# +# This file is part of the LOFAR software suite. +# The LOFAR software suite is free software: you can redistribute it and/or +# modify it under the terms of the GNU General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# The LOFAR software suite is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. + + +import os +from optparse import OptionParser, OptionGroup +import logging +logger = logging.getLogger(__name__) + +from lofar.sas.tmss.client.tmssbuslistener import * +from lofar.sas.tmss.client.tmss_http_rest_client import TMSSsession +from subprocess import call +from lofar.common.cep4_utils import * + + +class TMSSCopyServiceEventMessageHandler(TMSSEventMessageHandler): + ''' + ''' + def __init__(self, tmss_client_credentials_id: str="TMSSClient"): + super().__init__() + self.tmss_client = TMSSsession.create_from_dbcreds_for_ldap(tmss_client_credentials_id) + + def start_handling(self): + self.tmss_client.open() + + def stop_handling(self): + self.tmss_client.close() + + def onSubTaskStatusChanged(self, id: int, status:str): + if status == 'scheduled': + subtask = self.tmss_client.get_subtask(id) + if subtask['subtask_type'] == 'copy': + try: + self.tmss_client.set_subtask_status(id, 'starting') + self.tmss_client.set_subtask_status(id, 'started') + + # cache to reduced rest-calls + producer2cluster = {} + + for input_dataproduct in self.tmss_client.get_subtask_input_dataproducts(subtask['id']): + if subtask['specifications_doc'].get('managed_output', False): + output_dataproduct = self.tmss_client.get_subtask_transformed_output_dataproduct(subtask['id'], input_dataproduct['id']) + destination_path = output_dataproduct['filepath'] + else: + destination_path = os.path.join(subtask['specifications_doc']['destination'].rstrip('/'), input_dataproduct['filename']) + + # strip localhost as destination host to save on ssh wrapping + destination_path = destination_path.lstrip('localhost:').lstrip('127.0.0.1:') + + # prepare the actual copy command + cmd = ['rsync', '-a', '--mkpath', input_dataproduct['filepath'], destination_path] + + # check which cluster we run on + if input_dataproduct['producer'] not in producer2cluster: + producer = self.tmss_client.get_url_as_json_object(input_dataproduct['producer']) + filesystem = self.tmss_client.get_url_as_json_object(producer['filesystem']) + cluster = self.tmss_client.get_url_as_json_object(filesystem['cluster']) + producer2cluster[input_dataproduct['producer']] = cluster + + # wrap in cep4 ssh call if cep4 + cluster = producer2cluster[input_dataproduct['producer']] + if cluster['name'].lower() == 'cep4': + cmd = wrap_command_in_cep4_available_node_with_lowest_load_ssh_call(cmd, via_head=True) + + logger.info("copying dataproduct id=%s for copy subtask id=%s, executing: %s", input_dataproduct['id'], subtask['id'], ' '.join(cmd)) + + if call(cmd) == 0: + logger.info("copied dataproduct id=%s for copy subtask id=%s to '%s'", input_dataproduct['id'], subtask['id'], destination_path) + else: + msg = "could not copy dataproduct id=%s for copy subtask id=%s to '%s'" % (input_dataproduct['id'], subtask['id'], destination_path) + logger.error(msg) + self.tmss_client.set_subtask_status(id, 'error', error_reason=msg) + return + + self.tmss_client.set_subtask_status(id, 'finishing') + self.tmss_client.set_subtask_status(id, 'finished') + except Exception as e: + self.tmss_client.set_subtask_status(id, 'error', error_reason=str(e)) + + + +def create_copy_service(exchange: str=DEFAULT_BUSNAME, broker: str=DEFAULT_BROKER, tmss_client_credentials_id: str="TMSSClient"): + return TMSSBusListener(handler_type=TMSSCopyServiceEventMessageHandler, + handler_kwargs={'tmss_client_credentials_id': tmss_client_credentials_id}, + exchange=exchange, broker=broker) + + +def main(): + # make sure we run in UTC timezone + os.environ['TZ'] = 'UTC' + logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) + + # Check the invocation arguments + parser = OptionParser('%prog [options]', + description='run the tmss_copy_service which runs the copy-pipeline for scheduled copy-subtasks') + + group = OptionGroup(parser, 'Messaging options') + group.add_option('-b', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, help='Address of the message broker, default: %default') + group.add_option('-e', "--exchange", dest="exchange", type="string", default=DEFAULT_BUSNAME, help="exchange where the TMSS event messages are published. [default: %default]") + parser.add_option_group(group) + + group = OptionGroup(parser, 'Django options') + parser.add_option_group(group) + group.add_option('-R', '--tmss_client_credentials_id', dest='tmss_client_credentials_id', type='string', default='TMSSClient', help='TMSS django REST API credentials name, default: %default') + + (options, args) = parser.parse_args() + + # check TMSS is up and running via the client + TMSSsession.check_connection_and_exit_on_error(options.tmss_client_credentials_id) + + from lofar.common.util import waitForInterrupt + + with create_copy_service(options.exchange, options.broker, options.tmss_client_credentials_id): + waitForInterrupt() + +if __name__ == '__main__': + main() diff --git a/SAS/TMSS/backend/services/copy_service/t_copy_service.py b/SAS/TMSS/backend/services/copy_service/t_copy_service.py new file mode 100755 index 0000000000000000000000000000000000000000..45e82fd15354a870303595680e6eb5cf1e613ef4 --- /dev/null +++ b/SAS/TMSS/backend/services/copy_service/t_copy_service.py @@ -0,0 +1,168 @@ +#!/usr/bin/env python3 + +# Copyright (C) 2012-2015 ASTRON (Netherlands Institute for Radio Astronomy) +# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands +# +# This file is part of the LOFAR software suite. +# The LOFAR software suite is free software: you can redistribute it and/or +# modify it under the terms of the GNU General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# The LOFAR software suite is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. + +import unittest +import uuid +import os +from unittest import mock + +import logging +logger = logging.getLogger('lofar.'+__name__) + +from lofar.common.test_utils import skip_integration_tests +if skip_integration_tests(): + exit(3) + +from lofar.messaging.messagebus import TemporaryExchange, BusListenerJanitor + +from time import sleep +from datetime import datetime, timedelta + +class TestCopyService(unittest.TestCase): + ''' + Tests for the TMSSService + ''' + @classmethod + def setUpClass(cls) -> None: + cls.TEST_UUID = uuid.uuid1() + + cls.tmp_exchange = TemporaryExchange("%s_%s" % (cls.__name__, cls.TEST_UUID)) + cls.tmp_exchange.open() + + # override DEFAULT_BUSNAME + import lofar + lofar.messaging.config.DEFAULT_BUSNAME = cls.tmp_exchange.address + lofar.messaging.DEFAULT_BUSNAME = cls.tmp_exchange.address + + # import here, and not at top of module, because DEFAULT_BUSNAME needs to be set before importing + from lofar.sas.tmss.test.test_environment import TMSSTestEnvironment + + cls.tmss_test_env = TMSSTestEnvironment(exchange=cls.tmp_exchange.address, populate_schemas=True, populate_test_data=False, + start_subtask_scheduler=True, start_postgres_listener=True, + start_dynamic_scheduler=False, enable_viewflow=False) + cls.tmss_test_env.start() + + + @classmethod + def tearDownClass(cls) -> None: + cls.tmss_test_env.stop() + cls.tmp_exchange.close() + + def setUp(self): + def mocked_wrap_command_in_cep4_head_node_ssh_call(cmd, *args, **kwargs): + logger.info('mocked_wrap_command_in_cep4_head_node_ssh_call returning original command (without ssh): %s', ' '.join(cmd)) + return cmd + + wrap_command_in_cep4_head_node_ssh_call_patcher = mock.patch('lofar.sas.tmss.services.copy_service.wrap_command_in_cep4_available_node_with_lowest_load_ssh_call') + self.addCleanup(wrap_command_in_cep4_head_node_ssh_call_patcher.stop) + self.wrap_command_in_cep4_head_node_ssh_call_mock = wrap_command_in_cep4_head_node_ssh_call_patcher.start() + self.wrap_command_in_cep4_head_node_ssh_call_mock.side_effect = mocked_wrap_command_in_cep4_head_node_ssh_call + + def test_copy_managed_and_unmanaged(self): + from lofar.sas.tmss.tmss.tmssapp.models import SchedulingUnitObservingStrategyTemplate, SchedulingSet, SubtaskType + from lofar.sas.tmss.tmss.tmssapp.tasks import create_scheduling_unit_draft_from_observing_strategy_template, create_scheduling_unit_blueprint_and_tasks_and_subtasks_from_scheduling_unit_draft, create_cleanuptask_for_scheduling_unit_blueprint + from lofar.sas.tmss.tmss.tmssapp.subtasks import schedule_subtask, wait_for_subtask_status + from lofar.sas.tmss.test.test_utils import set_subtask_state_following_allowed_transitions + from lofar.sas.tmss.test.tmss_test_data_django_models import SchedulingSet_test_data + + # create and start the service (the object under test) + from lofar.sas.tmss.services.copy_service import create_copy_service + service = create_copy_service(exchange=self.tmp_exchange.address, tmss_client_credentials_id=self.tmss_test_env.client_credentials.dbcreds_id) + with BusListenerJanitor(service): + for managed_output in [True, False]: + self.tmss_test_env.delete_scheduling_unit_drafts_cascade() + self.tmss_test_env.delete_reservations_cascade() + + logger.info('\n\n\n----- managed_output=%s -----', managed_output) + DESTINATION_DIR = '/tmp/t_copy_service_%s' % uuid.uuid4().hex[:8] + + strategy_template = SchedulingUnitObservingStrategyTemplate.get_version_or_latest(name="Simple Observation") + strategy_template.template['tasks']['Observation']['specifications_doc']['station_configuration']['SAPs'][0]['subbands'] = [0, 1, 2, 3] + strategy_template.template['tasks']['CopyPipeline'] = { + "specifications_doc": { + "destination": "localhost:" + DESTINATION_DIR, + "managed_output": managed_output + }, + "specifications_template": { + "name": "copy pipeline", + "version": 1 + } + } + + # make the copytask copy the observation output + strategy_template.template['task_relations'] = [{ + "consumer": "CopyPipeline", + "input": { + "dataformat": "MeasurementSet", + "datatype": "visibilities", + "role": "any" + }, + "output": { + "dataformat": "MeasurementSet", + "datatype": "visibilities", + "role": "correlator" + }, + "producer": "Observation", + "selection_doc": {}, + "selection_template": {"name": "all" } + }] + + # specs complete, create SU draft and blueprint + scheduling_unit_draft = create_scheduling_unit_draft_from_observing_strategy_template(strategy_template, scheduling_set=SchedulingSet.objects.create(**SchedulingSet_test_data()), name="name", description="description") + scheduling_unit_blueprint = create_scheduling_unit_blueprint_and_tasks_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft) + + # "run" the observation (thus creating obs-dataproducts, and triggering the copy subtask to get scheduled + obs_subtask = scheduling_unit_blueprint.subtasks.filter(specifications_template__type__value=SubtaskType.Choices.OBSERVATION.value).first() + obs_subtask = schedule_subtask(obs_subtask, datetime.utcnow()+timedelta(minutes=1)) + self.assertGreater(obs_subtask.output_dataproducts.count(), 0) + + try: + # create some output files, as if the observation ran + for output_dp in obs_subtask.output_dataproducts.all(): + os.makedirs(output_dp.directory, exist_ok=True) + logger.info('writing 1KB test dataproduct for subtask id=%s %s', obs_subtask.id, output_dp.filepath) + with open(output_dp.filepath, 'w') as file: + file.write(1024 * 'a') + + # "finish" the observation. + obs_subtask = set_subtask_state_following_allowed_transitions(obs_subtask, 'finished') + + # wait until the copy_subtask was picked up by the service + copy_subtask = scheduling_unit_blueprint.subtasks.filter(specifications_template__type__value=SubtaskType.Choices.COPY.value).first() + copy_subtask = wait_for_subtask_status(copy_subtask, 'finished') + + # check the output, was it copied? + copied_files = os.listdir(DESTINATION_DIR) + self.assertEqual(obs_subtask.output_dataproducts.count(), len(copied_files)) + + if managed_output: + self.assertEqual(obs_subtask.output_dataproducts.count(), copy_subtask.output_dataproducts.count()) + for output_dp in copy_subtask.output_dataproducts.all(): + self.assertTrue(os.path.exists(output_dp.filepath)) + + finally: + # cleanup in case of accidents + import shutil + shutil.rmtree(DESTINATION_DIR, ignore_errors=True) + for output_dp in obs_subtask.output_dataproducts.all(): + shutil.rmtree(output_dp.directory, ignore_errors=True) + +if __name__ == '__main__': + #run the unit tests + unittest.main() diff --git a/SAS/TMSS/backend/services/copy_service/t_copy_service.run b/SAS/TMSS/backend/services/copy_service/t_copy_service.run new file mode 100755 index 0000000000000000000000000000000000000000..3c4000f8dbbdfb6d38d0d48bf7c5108178cdc33f --- /dev/null +++ b/SAS/TMSS/backend/services/copy_service/t_copy_service.run @@ -0,0 +1,4 @@ +#!/bin/bash + +python3 t_copy_service.py + diff --git a/SAS/TMSS/backend/services/copy_service/t_copy_service.sh b/SAS/TMSS/backend/services/copy_service/t_copy_service.sh new file mode 100755 index 0000000000000000000000000000000000000000..d07ff4769207e994bc651bcb4f25b1eb965f5150 --- /dev/null +++ b/SAS/TMSS/backend/services/copy_service/t_copy_service.sh @@ -0,0 +1,3 @@ +#!/bin/sh + +./runctest.sh t_copy_service \ No newline at end of file diff --git a/SAS/TMSS/backend/services/copy_service/tmss_copy_service b/SAS/TMSS/backend/services/copy_service/tmss_copy_service new file mode 100755 index 0000000000000000000000000000000000000000..36aa2ca928b126aa00271d4a368a616119c738c3 --- /dev/null +++ b/SAS/TMSS/backend/services/copy_service/tmss_copy_service @@ -0,0 +1,23 @@ +#!/usr/bin/env python3 + +# Copyright (C) 2012-2015 ASTRON (Netherlands Institute for Radio Astronomy) +# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands +# +# This file is part of the LOFAR software suite. +# The LOFAR software suite is free software: you can redistribute it and/or +# modify it under the terms of the GNU General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# The LOFAR software suite is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. + +from lofar.sas.tmss.services import copy_service + +if __name__ == '__main__': + copy_service.main() diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/conversions.py b/SAS/TMSS/backend/src/tmss/tmssapp/conversions.py index 65d40dd65aa8bf5b710a3937e186a479ef8e9190..7ec18a4fa0b1a910f1b58ad62f43a9bf0133085f 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/conversions.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/conversions.py @@ -420,8 +420,8 @@ def coordinates_timestamp_and_station_to_target_rise_and_set(pointing: Pointing, "set": target_set.to_datetime(), "always_above_horizon": False, "always_below_horizon": False} - except TypeError as e: - if "numpy.float64" in str(e): + except (TypeError, ValueError) as e: + if "numpy.float64" in str(e) or "could not broadcast input array" in str(e) or "Cannot cast array data from dtype" in str(e): # Note: when the target is always above or below horizon, astroplan excepts with the not very # meaningful error: 'numpy.float64' object does not support item assignment # Determine whether the target is always above or below horizon so that we can return some useful diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/populate.py b/SAS/TMSS/backend/src/tmss/tmssapp/populate.py index b093121ea48a2b251c9b86120c47eeb0f1a99a74..915ce6b0588ac2bf80d9f23611b4e8718f52c17f 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/populate.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/populate.py @@ -722,6 +722,17 @@ def populate_connectors(): task_template_name=task_template_name, iotype=IOType.objects.get(value=IOType.Choices.INPUT.value)) + # The Copy (sub)task accepts any kinds of data, and passes it on. + task_template_name = "copy pipeline" + for output_connector_type in TaskConnectorType.objects.filter(iotype=IOType.objects.get(value=IOType.Choices.OUTPUT.value)).all(): + any_role = Role.objects.get(value=Role.Choices.ANY.value) + for iotype_value in [IOType.Choices.INPUT.value, IOType.Choices.OUTPUT.value]: + create_task_connector_skip_duplicate(role=any_role, + datatype=output_connector_type.datatype, + dataformat=output_connector_type.dataformat, + task_template_name=task_template_name, + iotype=IOType.objects.get(value=iotype_value)) + def populate_permissions(): logger.info('Populating permissions') diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/schemas/subtask_template/copy/copy_pipeline-1.json b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/subtask_template/copy/copy_pipeline-1.json new file mode 100644 index 0000000000000000000000000000000000000000..069234fee0e5155284b7fad08b442e14ae834753 --- /dev/null +++ b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/subtask_template/copy/copy_pipeline-1.json @@ -0,0 +1,32 @@ +{ + "description": "This schema defines the parameters to setup and control a preprocessing pipeline subtask.", + "name": "copy pipeline", + "purpose": "production", + "schema": { + "$id": "https://tmss.lofar.eu/api/schemas/subtasktemplate/copy%20pipeline/1#", + "$schema": "http://json-schema.org/draft-06/schema#", + "additionalProperties": false, + "description": "This schema defines the parameters to setup and control a copy pipeline subtask.", + "patternProperties": { + "^[$]schema$": {} + }, + "properties": { + "destination": { + "default": "localhost:/tmp", + "description": "Where to copy the data to: <host>:/path/to/destination", + "type": "string" + }, + "managed_output": { + "default": true, + "description": "Are the output dataproducts managed by TMSS (needed for cleanup and/or ingest)", + "type": "boolean" + } + }, + "required": ["destination", "managed_output"], + "title": "copy pipeline", + "type": "object", + "version": 1 + }, + "state": "active", + "version": 1 +} diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/schemas/task_template/pipeline/copy_pipeline-1.json b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/task_template/pipeline/copy_pipeline-1.json new file mode 100644 index 0000000000000000000000000000000000000000..3609ca87056d9b4d01816d5729122b093a1751c9 --- /dev/null +++ b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/task_template/pipeline/copy_pipeline-1.json @@ -0,0 +1,32 @@ +{ + "description": "This schema defines the parameters to setup and control a preprocessing pipeline subtask.", + "name": "copy pipeline", + "purpose": "production", + "schema": { + "$id": "https://tmss.lofar.eu/api/schemas/tasktemplate/copy%20pipeline/1#", + "$schema": "http://json-schema.org/draft-06/schema#", + "additionalProperties": false, + "description": "This schema defines the parameters to setup and control a copy pipeline task.", + "patternProperties": { + "^[$]schema$": {} + }, + "properties": { + "destination": { + "default": "localhost:/tmp", + "description": "Where to copy the data to: <host>:/path/to/destination", + "type": "string" + }, + "managed_output": { + "default": true, + "description": "Are the output dataproducts managed by TMSS (needed for cleanup and/or ingest)", + "type": "boolean" + } + }, + "required": ["destination", "managed_output"], + "title": "copy pipeline", + "type": "object", + "version": 1 + }, + "state": "active", + "version": 1 +} diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py index 58df0f355848a032f4ea2d8ec779b2c2efdd89b3..6c8e15bfe8d1ace50bd93db66f17bd45640876cf 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py @@ -88,6 +88,7 @@ def create_or_update_subtasks_from_task_blueprint(task_blueprint: TaskBlueprint) create_qaplots_subtask_from_task_blueprint], 'preprocessing pipeline': [create_preprocessing_subtask_from_task_blueprint], 'pulsar pipeline': [create_pulsar_pipeline_subtask_from_task_blueprint], + 'copy pipeline': [create_copy_pipeline_subtask_from_task_blueprint], 'ingest': [create_ingest_subtask_from_task_blueprint], 'cleanup': [create_cleanup_subtask_from_task_blueprint]} generators_mapping['calibrator observation'] = generators_mapping['target observation'] @@ -659,7 +660,7 @@ def create_observation_control_subtask_from_task_blueprint(task_blueprint: TaskB raise SubtaskCreationException("Total number of subbands %d exceeds the maximum of 488 for task_blueprint id=%s" % (len(all_subbands), task_blueprint.id)) # step 1: create subtask in defining state - cluster_name = task_blueprint.specifications_doc.get("storage_cluster", "CEP4") + cluster_name = task_blueprint.specifications_doc.get("correlator", {}).get("storage_cluster", "CEP4") subtask_data = { "scheduled_start_time": None, "scheduled_stop_time": None, "state": SubtaskState.objects.get(value=SubtaskState.Choices.DEFINING.value), @@ -989,6 +990,37 @@ def create_cleanup_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> return subtask +def create_copy_pipeline_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> Subtask: + # step 0: check pre-requisites + check_prerequities_for_subtask_creation(task_blueprint) + + # create subtask in defining state, with filled-in subtask_template + copy_subtask_template = SubtaskTemplate.get_version_or_latest(name="copy pipeline") + copy_subtask_spec = copy_subtask_template.get_default_json_document_for_schema() + copy_subtask_spec['destination'] = task_blueprint.specifications_doc.get('destination', '/tmp') + copy_subtask_spec['managed_output'] = task_blueprint.specifications_doc.get('managed_output', False) + + copy_subtask_data = { "scheduled_start_time": None, + "scheduled_stop_time": None, + "state": SubtaskState.objects.get(value=SubtaskState.Choices.DEFINING.value), + "task_blueprint": task_blueprint, + "specifications_template": copy_subtask_template, + "specifications_doc": copy_subtask_spec, + "primary": True, + "cluster": task_blueprint.predecessors.first().subtasks.first().cluster } + copy_subtask = Subtask.objects.create(**copy_subtask_data) + + # create and link subtask input/output + _create_or_update_subtask_inputs(copy_subtask) + + # set state to DEFINED + copy_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value) + copy_subtask.save() + + # done, now return the subtask, and allow the system to wait for the predecessors to be finished before we schedule this copy_subtask + return copy_subtask + + # ==== various schedule* methods to schedule a Subtasks (if possible) ==== # LOFAR needs to have a gap in between observations to (re)initialize hardware. @@ -2246,17 +2278,50 @@ def schedule_copy_subtask(copy_subtask: Subtask): # iterate over all inputs for copy_subtask_input in copy_subtask.inputs.all(): - # select and set input dataproducts that meet the filter defined in selection_doc - dataproducts = [dataproduct for dataproduct in copy_subtask_input.producer.dataproducts.all() - if specifications_doc_meets_selection_doc(dataproduct.specifications_doc, copy_subtask_input.selection_doc)] - copy_subtask_input.dataproducts.set(dataproducts) - - # skip resource assigner - - # step 5: set state to SCHEDULED (resulting in the qaservice to pick this subtask up and run it) - copy_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULED.value) - copy_subtask.save() + input_dataproducts = [dataproduct for dataproduct in copy_subtask_input.producer.dataproducts.all() + if specifications_doc_meets_selection_doc(dataproduct.specifications_doc, copy_subtask_input.selection_doc)] + if input_dataproducts: + copy_subtask_input.dataproducts.set(input_dataproducts) + + if copy_subtask.specifications_doc.get('managed_output', True): + # create an exact copy of the input type, but then for output + output_connector = copy_subtask.task_blueprint.specifications_template.connector_types.filter(dataformat=copy_subtask_input.input_role.dataformat, + datatype=copy_subtask_input.input_role.datatype, + role=copy_subtask_input.input_role.role, + iotype__value=IOType.Choices.OUTPUT.value).first() + + subtask_output = SubtaskOutput.objects.create(subtask=copy_subtask, + output_role=output_connector, + filesystem=_output_filesystem(copy_subtask)) + + # prepare output_dataproducts, which are just exact copies of the input, exept for the directory + dataproduct_feedback_template = DataproductFeedbackTemplate.get_version_or_latest(name="empty") + dataproduct_feedback_doc = dataproduct_feedback_template.get_default_json_document_for_schema() + destination_dir = copy_subtask.specifications_doc['destination'][copy_subtask.specifications_doc['destination'].find(':')+1:] + output_dataproducts = [Dataproduct(filename=input_dp.filename, + directory=destination_dir, + dataformat=input_dp.dataformat, + datatype=input_dp.datatype, + producer=subtask_output, + specifications_doc=input_dp.specifications_doc, + specifications_template=input_dp.specifications_template, + feedback_doc=dataproduct_feedback_doc, + feedback_template=dataproduct_feedback_template, + sap=input_dp.sap, + global_identifier=None) for input_dp in input_dataproducts] + + # create the dataproducts + output_dataproducts = _bulk_create_dataproducts_with_global_identifiers(output_dataproducts) + subtask_output.dataproducts.set(output_dataproducts) + + # a copy action is an identity transform + transforms = [DataproductTransform(input=input_dp, output=output_dp, identity=True) for input_dp,output_dp in zip(input_dataproducts, output_dataproducts)] + DataproductTransform.objects.bulk_create(transforms) + + # step 5: set state to SCHEDULED (resulting in the copy_service to pick this subtask up and run it) + copy_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULED.value) + copy_subtask.save() return copy_subtask