diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 0649271e1093ca3fc3b7dae32e9a000000fee165..b5de38741c876c8e37290cadf719a43af3aab630 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -700,38 +700,43 @@ deploy-tmss-prod-lcs129: - rm -rf lobster/.env; rm -rf lobster/env; rm -rf tmss/.env; rm -rf tmss/env; rm -rf app/.env; rm -rf app/id_rsa - ssh $LOFAR_USER@$LOFAR_TARGET -p ${LOFAR_TARGET_PORT} "cd .lofar/tmss/tmss; ln -s ../env ; ${COMPOSE_PATH} down; ${COMPOSE_PATH} build; ${COMPOSE_PATH} up -d" -deploy-lobster-prod-lcs129: +deploy-tmss-services-prod-lcs129: extends: .deploy-tmss-prod + parallel: + matrix: + - SERVICE: + - lobster + - qa-statistics variables: LOFAR_ENVIRONMENT: ${TMSS_DEPLOY_LCS129_PRODUCTION} LOFAR_USER: "lofarsys" LOFAR_TARGET: "lcs129.control.lofar" LOFAR_TARGET_PORT: 22 SOURCE_IMAGE: "${REGISTRY_PATH}/tmss_django:$CI_COMMIT_SHORT_SHA" - EXPORT_IMAGE: tmss_lobster + EXPORT_IMAGE: "tmss_${SERVICE}" environment: - name: production-lcs129-lobster + name: production-lcs129-${SERVICE} url: http://lcs129.control.lofar:8500 before_script: - *prepare_ssh_agent - ssh ${LOFAR_USER}@${LOFAR_TARGET} -p ${LOFAR_TARGET_PORT} -o "StrictHostKeyChecking=no" 'echo "critical do not remove me"' script: - *deploy-tmss-prod-common-script - - cd lobster + - cd ${SERVICE} - docker-compose build - docker image ls - docker save ${EXPORT_IMAGE} > ${EXPORT_IMAGE}.img - - ssh ${LOFAR_USER}@${LOFAR_TARGET} -p ${LOFAR_TARGET_PORT} 'mkdir -p ~/.lofar/tmss/lobster' - - ssh ${LOFAR_USER}@${LOFAR_TARGET} -p ${LOFAR_TARGET_PORT} "cd .lofar/tmss/lobster; rm -f ${EXPORT_IMAGE}.img" - - rsync -aAXv -e "ssh -p ${LOFAR_TARGET_PORT}" --chmod=700 ./${EXPORT_IMAGE}.img ${LOFAR_USER}@${LOFAR_TARGET}:~/.lofar/tmss/lobster/${EXPORT_IMAGE}.img - - ssh ${LOFAR_USER}@${LOFAR_TARGET} -p ${LOFAR_TARGET_PORT} "cd .lofar/tmss/lobster; docker load < ./${EXPORT_IMAGE}.img" + - ssh ${LOFAR_USER}@${LOFAR_TARGET} -p ${LOFAR_TARGET_PORT} 'mkdir -p ~/.lofar/tmss/${SERVICE}' + - ssh ${LOFAR_USER}@${LOFAR_TARGET} -p ${LOFAR_TARGET_PORT} "cd .lofar/tmss/${SERVICE}; rm -f ${EXPORT_IMAGE}.img" + - rsync -aAXv -e "ssh -p ${LOFAR_TARGET_PORT}" --chmod=700 ./${EXPORT_IMAGE}.img ${LOFAR_USER}@${LOFAR_TARGET}:~/.lofar/tmss/${SERVICE}/${EXPORT_IMAGE}.img + - ssh ${LOFAR_USER}@${LOFAR_TARGET} -p ${LOFAR_TARGET_PORT} "cd .lofar/tmss/${SERVICE}; docker load < ./${EXPORT_IMAGE}.img" - export DOCKER_HOST="ssh://${LOFAR_USER}@${LOFAR_TARGET}" - echo ${DOCKER_HOST} - docker-compose down - docker-compose up -d after_script: - *prepare_ssh_agent - - ssh ${LOFAR_USER}@${LOFAR_TARGET} -p ${LOFAR_TARGET_PORT} "cd .lofar/tmss/lobster; rm -f ${EXPORT_IMAGE}.img" + - ssh ${LOFAR_USER}@${LOFAR_TARGET} -p ${LOFAR_TARGET_PORT} "cd .lofar/tmss/${SERVICE}; rm -f ${EXPORT_IMAGE}.img" deploy-lofar-pipeline-prod: stage: deploy-prod diff --git a/CMake/LofarPackageList.cmake b/CMake/LofarPackageList.cmake index d03e2c50bb27d6804226fc7b6b9a5189a55a222d..edaf8ef7e7615333d9deb710faba37bbdc666bd7 100644 --- a/CMake/LofarPackageList.cmake +++ b/CMake/LofarPackageList.cmake @@ -170,6 +170,7 @@ if(NOT DEFINED LOFAR_PACKAGE_LIST_INCLUDED) set(TMSSPreCalculationsService_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TMSS/backend/services/precalculations) set(TMSSSchedulingService_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TMSS/backend/services/scheduling) set(TMSSSlackWebhookService_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TMSS/backend/services/slack_webhook) + set(TMSSTuna_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TMSS/backend/services/tuna) set(TMSSWebSocketService_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TMSS/backend/services/websocket) set(TMSSWorkflowService_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TMSS/backend/services/workflow_service) set(TMSSReportRefreshService_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TMSS/backend/services/report_refresh) diff --git a/Docker/lofar-ci/Dockerfile_ci_tmss b/Docker/lofar-ci/Dockerfile_ci_tmss index d52cc98ece2e8238b85434fda6a2cb6f3938190e..43d2b941910a4d30d5dbcc3cdac31119aaa4346f 100644 --- a/Docker/lofar-ci/Dockerfile_ci_tmss +++ b/Docker/lofar-ci/Dockerfile_ci_tmss @@ -25,6 +25,7 @@ COPY SAS/TMSS/backend/services/ingest_tmss_adapter/requirements.txt tmss_ingest_ COPY SAS/TMSS/backend/services/scheduling/requirements.txt tmss_scheduling.txt COPY SAS/TMSS/backend/services/slack_webhook/requirements.txt tmss_slack_webhook.txt COPY SAS/TMSS/backend/services/websocket/requirements.txt tmss_websocket.txt +COPY SAS/TMSS/backend/services/tuna/requirements.txt tmss_tuna.txt RUN pip3 install "setuptools>=62.6" "wheel>=0.40" "pip>=23.1.2" RUN pip3 install astroplan cachetools comet coreapi coverage cx_Oracle cython django-auth-ldap \ @@ -36,7 +37,7 @@ RUN pip3 install astroplan cachetools comet coreapi coverage cx_Oracle cython dj python-ldap-test python-qpid-proton==0.37.0 pyxb==1.2.5 scipy SimpleWebSocketServer \ swagger-spec-validator testing.postgresql ujson websocket_client xmljson \ -r tmss_lobster.txt -r tmss_ingest_tmss_adapter.txt -r tmss_scheduling.txt \ - -r tmss_slack_webhook.txt -r tmss_websocket.txt \ + -r tmss_slack_webhook.txt -r tmss_websocket.txt -r tmss_tuna.txt \ -c tmss_constraints.txt --ignore-installed RUN echo "This string is here to prevent Docker caching. It is 09 am on Apr 26, 2024." diff --git a/SAS/DataManagement/Cleanup/CleanupService/service.py b/SAS/DataManagement/Cleanup/CleanupService/service.py index c3421ea12220ebd95e0cd3a71d0bfb9c99b436e8..335019008b61b264bda48af1166500a169b26bc7 100644 --- a/SAS/DataManagement/Cleanup/CleanupService/service.py +++ b/SAS/DataManagement/Cleanup/CleanupService/service.py @@ -141,6 +141,12 @@ class CleanupHandler(ServiceMessageHandler): # This method is currently only used in the web-scheduler for otdb/mom tasks. No need to TMSS-ify it. return _getOTDBPinnedStatuses() + def _are_dataproducts_accessible(self, tmss_id: int) -> bool: + subtask = self._tmss_client.get_subtask(tmss_id) + + # we can only access CEP4 at the moment + return subtask.cluster_name == "CEP4" + def _has_unfinished_non_cleanup_successors(self, otdb_id: int, tmss_id: int) -> bool: # TODO: otdb handling can be removed once we use TMSS only. if otdb_id is not None: @@ -227,6 +233,17 @@ class CleanupHandler(ServiceMessageHandler): 'message': message}) return {'deleted': False, 'message': message} + + if not self._are_dataproducts_accessible(tmss_id): + message = "Task otdb_id=%s tmss_id=%s cannot be accessed by this service. Not deleting data." % (otdb_id, tmss_id) + logger.error(message) + self._sendNotification(subject='TaskDeleted', content={'deleted': False, + 'otdb_id': otdb_id, + 'tmss_id': tmss_id, + 'message': message}) + return {'deleted': False, 'message': message} + + path_result = self.path_resolver.getPathForOTDBId(otdb_id) if otdb_id is not None else self.path_resolver.getPathForTMSSId(tmss_id) if path_result['found']: rm_results = [] diff --git a/SAS/TMSS/backend/services/CMakeLists.txt b/SAS/TMSS/backend/services/CMakeLists.txt index 6f4ee21a9a9e5a8a0a2c1b7e16fada51c0cfdac0..3e88b21151e981fe7da4b5196b71c5434a902ac5 100644 --- a/SAS/TMSS/backend/services/CMakeLists.txt +++ b/SAS/TMSS/backend/services/CMakeLists.txt @@ -3,6 +3,7 @@ lofar_package(TMSSServices 0.1 DEPENDS TMSSClient) lofar_add_package(LTAIngestTMSSAdaper ingest_tmss_adapter) lofar_add_package(TMSSFeedbackHandlingService feedback_handling) lofar_add_package(TMSSLobster lobster) +lofar_add_package(TMSSTuna tuna) lofar_add_package(TMSSLTAAdapter lta_adapter) lofar_add_package(TMSSPostgresListenerService postgres_listener) lofar_add_package(TMSSPreCalculationsService precalculations) diff --git a/SAS/TMSS/backend/services/qa_statistics_service/CMakeLists.txt b/SAS/TMSS/backend/services/qa_statistics_service/CMakeLists.txt index 83731006823d0b55b8e31eefb4b2c7f4edf106cd..2c85c2aa8f1ab5c3b1b3b1a820f3fb5eb44fc212 100644 --- a/SAS/TMSS/backend/services/qa_statistics_service/CMakeLists.txt +++ b/SAS/TMSS/backend/services/qa_statistics_service/CMakeLists.txt @@ -1,4 +1,4 @@ -lofar_package(TMSSQAStatisticsService 0.1 DEPENDS TMSSClient PyCommon PyMessaging) +lofar_package(TMSSQAStatisticsService 0.1 DEPENDS TMSSClient PyCommon PyMessaging TMSSTuna) lofar_find_package(PythonInterp 3.4 REQUIRED) include(PythonInstall) diff --git a/SAS/TMSS/backend/services/qa_statistics_service/qa_statistics_service.py b/SAS/TMSS/backend/services/qa_statistics_service/qa_statistics_service.py index 5013088764525a349d5114e3024a7eca3b44e3f0..c433eb620b35348cd998a207dedde3bd85470538 100755 --- a/SAS/TMSS/backend/services/qa_statistics_service/qa_statistics_service.py +++ b/SAS/TMSS/backend/services/qa_statistics_service/qa_statistics_service.py @@ -17,30 +17,122 @@ # You should have received a copy of the GNU General Public License along # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. +"""Provide a mock HTTP server emulating Nomad""" +from datetime import datetime, timedelta import os from optparse import OptionParser, OptionGroup import logging + +from lofar.sas.tmss.services.tuna.job_dispatcher import ( + JobGroup, + JobDispatcher, + StatisticsAggregateJob, +) + logger = logging.getLogger(__name__) -from lofar.sas.tmss.client.tmssbuslistener import TMSSEventMessageHandler, TMSSBusListener +from lofar.sas.tmss.client.tmssbuslistener import ( + TMSSEventMessageHandler, + TMSSBusListener, +) from lofar.sas.tmss.client.tmss_http_rest_client import TMSSsession from lofar.messaging import DEFAULT_BUSNAME, DEFAULT_BROKER +from lofar.sas.tmss.tmss.tmssapp.conversions import ( + antennafields_for_antennaset_and_station, +) + + +POLL_INTERVAL = timedelta(seconds=15) + + +class QASubtask: + """Model and manage a QA Subtask in TMSS""" + + # In which state we need to be to get to a certain state. + # TODO: Ask TMSS what transitions are allowed. + PREV_STATE = { + "queueing": "scheduled", + "queued": "queueing", + "starting": "queued", + "started": "starting", + "finishing": "started", + "finished": "finishing", + } + + def __init__(self, subtask_id, tmss_client, job, initial_status="scheduled"): + self.subtask_id = subtask_id + self.tmss_client = tmss_client + self.status = initial_status + + self.job: JobGroup | None = None + + def __str__(self): + return f"QASubtask({self.subtask_id=}, {self.status=})" + + def set_status(self, new_status, **kwargs): + logger.info( + f"Requested transitioning subtask {self} from {self.status} -> {new_status}" + ) + self._set_status(new_status, **kwargs) + + def _set_status(self, new_status, **kwargs): + if new_status == self.status: + return + + if new_status in self.PREV_STATE: + if self.status != self.PREV_STATE[new_status]: + # Force the correct chain of state transitions if set_status + # skips over them due to race conditions. + self.set_status(self.PREV_STATE[new_status]) + + logger.info(f"Transitioning subtask {self} from {self.status} -> {new_status}") + self.status = new_status + self.tmss_client.set_subtask_status(self.subtask_id, self.status, **kwargs) + + def send_feedback_and_set_to_finished(self): + """Subtask `subtask_id` has finished its work. Report dataproduct feedback back to TMSS.""" + + # TODO: + # * TMSS: model feedback in a dataproduct feedback template + # * TMSS: add endpoint to provide feedback in JSON + # * here: construct and send feedback to TMSS + + self.set_status("finished") class TMSSQAStatisticsServiceEventMessageHandler(TMSSEventMessageHandler): - ''' - ''' - def __init__(self, tmss_client_credentials_id: str="TMSSClient"): + """ """ + + def __init__(self, tmss_client_credentials_id: str = "TMSSClient"): super().__init__() - self.tmss_client = TMSSsession.create_from_dbcreds_for_ldap(tmss_client_credentials_id) + self.tmss_client = TMSSsession.create_from_dbcreds_for_ldap( + tmss_client_credentials_id + ) + self._last_poll_timestamp = datetime.min + + self.dispatcher = JobDispatcher("statistics-aggregate") + + self.active_qa_subtasks = {} + + # Sense whether lingering jobs from previous runs are still active in nomad + self.reconstruct_active_qa_subtasks() + + # Update their status + self.check_job_statusses() + + def reconstruct_active_qa_subtasks(self): + """Look in the system which jobs are running, and reconstruct self.job_sets.""" + pass def start_handling(self): self.tmss_client.open() super().start_handling() try: - subtasks = self.tmss_client.get_subtasks(subtask_type='qa_statistics', state='scheduled') + subtasks = self.tmss_client.get_subtasks( + subtask_type="qa_statistics", state="scheduled" + ) for subtask in subtasks: self.run_qa_statistics_subtask(subtask) except Exception as e: @@ -50,62 +142,175 @@ class TMSSQAStatisticsServiceEventMessageHandler(TMSSEventMessageHandler): super().stop_handling() self.tmss_client.close() - def onSubTaskStatusChanged(self, id: int, status:str): + def before_receive_message(self): + """Use this method which is called in the event handler message loop to poll at regular intervals. + This allows us to check up on running jobs and report if their status changes. + """ + + if datetime.utcnow() - self._last_poll_timestamp >= POLL_INTERVAL: + self._last_poll_timestamp = datetime.utcnow() + self.check_job_statusses() + + def onSubTaskStatusChanged(self, id: int, status: str): logger.info("onSubTaskStatusChanged: subtask id=%s status=%s", id, status) - if status == 'scheduled': + if status == "scheduled": subtask = self.tmss_client.get_subtask(id) - if subtask['subtask_type'] == 'qa_statistics': + if subtask["subtask_type"] == "qa_statistics": self.run_qa_statistics_subtask(subtask) else: - logger.info("skipping subtask id=%s status=%s type=%s", subtask['id'], subtask['state_value'], subtask['subtask_type']) + logger.info( + "skipping subtask id=%s status=%s type=%s", + subtask["id"], + subtask["state_value"], + subtask["subtask_type"], + ) def run_qa_statistics_subtask(self, subtask): - if subtask['subtask_type'] != 'qa_statistics': + if subtask["subtask_type"] != "qa_statistics": return - if subtask['state_value'] not in ('queued', 'scheduled'): + if subtask["state_value"] != "scheduled": return + logger.info(f"Queueing subtask {subtask['id']}") + + qa_subtask = self.active_qa_subtasks[subtask["id"]] = QASubtask( + subtask["id"], self.tmss_client, subtask["state_value"] + ) + try: - self.tmss_client.set_subtask_status(subtask['id'], 'starting') - self.tmss_client.set_subtask_status(subtask['id'], 'started') + qa_subtask.set_status("queueing") # cache to reduce rest-calls # maps producer_id to tuple of producing subtask id and cluster name _cache = {} - for qa_stat_dataproduct in self.tmss_client.get_subtask_output_dataproducts(subtask['id']): - logger.info("TODO: do the actual S3/minio calls to create this h5 file from the continuously running stats: %s", qa_stat_dataproduct['filename']) + # lookup preceeding observation + predecessors = self.tmss_client.get_subtask_predecessors(subtask["id"]) + if len(predecessors) != 1: + raise ValueError( + f"Subtask needs to have exactly one predecessor. It has {len(predecessors)}." + ) + if predecessors[0]["subtask_type"] != "observation": + raise ValueError( + f"Subtask predecessor needs to be an observation. It is a {predecessors[0]['subtask_type']}." + ) + + # lookup required fields + obs_subtask = predecessors[0] + observation_id = obs_subtask["id"] + start_time = obs_subtask["scheduled_start_time"] + end_time = obs_subtask["scheduled_stop_time"] + stations = obs_subtask["specifications_doc"]["stations"]["station_list"] + antenna_set = obs_subtask["specifications_doc"]["stations"]["antenna_set"] + + # schedule a job for every antenna field + antenna_fields = { + station: antennafields_for_antennaset_and_station(antenna_set, station) + for station in stations + } - self.tmss_client.set_subtask_status(subtask['id'], 'finishing') - self.tmss_client.set_subtask_status(subtask['id'], 'finished') + qa_subtask.job = JobGroup(StatisticsAggregateJob)( + subtask_id=subtask["id"], + observation_id=observation_id, + station=stations, + antenna_field=antenna_fields, + begin=start_time, + end=end_time, + destination="s3://dataproducts/", + ) + + logger.info(f"Dispatching {qa_subtask.job}") + + _ = self.dispatcher.dispatch(qa_subtask.job) + + qa_subtask.set_status("queued") + + # for qa_stat_dataproduct in self.tmss_client.get_subtask_output_dataproducts(subtask['id']): + # logger.info("TODO: do the actual S3/minio calls to create this h5 file from the continuously running stats: %s", qa_stat_dataproduct['filename']) except Exception as e: logger.exception(e) - self.tmss_client.set_subtask_status(subtask['id'], 'error', error_reason=str(e)) + qa_subtask.set_status("error", error_reason=str(e)) + + def check_job_statusses(self): + for subtask_id, qa_subtask in list(self.active_qa_subtasks.items()): + try: + status = self.dispatcher.status(qa_subtask.job) + logger.info(f"Subtask {subtask_id} has aggregated job status {status}") -def create_qa_statistics_service(exchange: str=DEFAULT_BUSNAME, broker: str=DEFAULT_BROKER, tmss_client_credentials_id: str="TMSSClient"): - return TMSSBusListener(handler_type=TMSSQAStatisticsServiceEventMessageHandler, - handler_kwargs={'tmss_client_credentials_id': tmss_client_credentials_id}, - exchange=exchange, broker=broker) + # update TMSS about the subtask status + if status == "queued": + qa_subtask.set_status("queued") + elif status == "started": + qa_subtask.set_status("started") + elif status == "finished": + qa_subtask.set_status("finishing") + + # wrap up the subtask + qa_subtask.send_feedback_and_set_to_finished() + + del self.active_qa_subtasks[subtask_id] + except Exception as e: + logger.exception( + f"Exception occurred for subtask {subtask_id} jobs {qa_subtask.job}: {e}" + ) + + +def create_qa_statistics_service( + exchange: str = DEFAULT_BUSNAME, + broker: str = DEFAULT_BROKER, + tmss_client_credentials_id: str = "TMSSClient", +): + return TMSSBusListener( + handler_type=TMSSQAStatisticsServiceEventMessageHandler, + handler_kwargs={"tmss_client_credentials_id": tmss_client_credentials_id}, + exchange=exchange, + broker=broker, + ) def main(): # make sure we run in UTC timezone - os.environ['TZ'] = 'UTC' - logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) + os.environ["TZ"] = "UTC" + logging.basicConfig( + format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO + ) # Check the invocation arguments - parser = OptionParser('%prog [options]', - description='run the tmss_qa_statistics_service which executes scheduled qa_statistics subtasks') + parser = OptionParser( + "%prog [options]", + description="run the tmss_qa_statistics_service which executes scheduled qa_statistics subtasks", + ) - group = OptionGroup(parser, 'Messaging options') - group.add_option('-b', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, help='Address of the message broker, default: %default') - group.add_option('-e', "--exchange", dest="exchange", type="string", default=DEFAULT_BUSNAME, help="exchange where the TMSS event messages are published. [default: %default]") + group = OptionGroup(parser, "Messaging options") + group.add_option( + "-b", + "--broker", + dest="broker", + type="string", + default=DEFAULT_BROKER, + help="Address of the message broker, default: %default", + ) + group.add_option( + "-e", + "--exchange", + dest="exchange", + type="string", + default=DEFAULT_BUSNAME, + help="exchange where the TMSS event messages are published. [default: %default]", + ) parser.add_option_group(group) - group = OptionGroup(parser, 'Django options') + group = OptionGroup(parser, "Django options") parser.add_option_group(group) - group.add_option('-R', '--tmss_client_credentials_id', dest='tmss_client_credentials_id', type='string', default='TMSSClient', help='TMSS django REST API credentials name, default: %default') + group.add_option( + "-R", + "--tmss_client_credentials_id", + dest="tmss_client_credentials_id", + type="string", + default="TMSSClient", + help="TMSS django REST API credentials name, default: %default", + ) (options, args) = parser.parse_args() @@ -114,8 +319,11 @@ def main(): from lofar.common.util import waitForInterrupt - with create_qa_statistics_service(options.exchange, options.broker, options.tmss_client_credentials_id): + with create_qa_statistics_service( + options.exchange, options.broker, options.tmss_client_credentials_id + ): waitForInterrupt() -if __name__ == '__main__': + +if __name__ == "__main__": main() diff --git a/SAS/TMSS/backend/services/qa_statistics_service/t_qa_statistics_service.py b/SAS/TMSS/backend/services/qa_statistics_service/t_qa_statistics_service.py index 12c666114abfdea561982239213e81d19a3125d6..4829336df7331fc5996915e0fccc48a453d314e3 100755 --- a/SAS/TMSS/backend/services/qa_statistics_service/t_qa_statistics_service.py +++ b/SAS/TMSS/backend/services/qa_statistics_service/t_qa_statistics_service.py @@ -34,7 +34,7 @@ from lofar.messaging.messagebus import TemporaryExchange, BusListenerJanitor from time import sleep from datetime import datetime, timedelta -class TestCopyService(unittest.TestCase): +class TestQAStatisticsService(unittest.TestCase): ''' Tests for the TMSSService ''' @@ -50,11 +50,15 @@ class TestCopyService(unittest.TestCase): lofar.messaging.config.DEFAULT_BUSNAME = cls.tmp_exchange.address lofar.messaging.DEFAULT_BUSNAME = cls.tmp_exchange.address + # override POLL_INTERVAL to have tests run faster + import lofar.sas.tmss.services.qa_statistics_service + lofar.sas.tmss.services.qa_statistics_service.POLL_INTERVAL = timedelta(seconds=1) + # import here, and not at top of module, because DEFAULT_BUSNAME needs to be set before importing from lofar.sas.tmss.test.test_environment import TMSSTestEnvironment cls.tmss_test_env = TMSSTestEnvironment(exchange=cls.tmp_exchange.address, populate_schemas=True, populate_test_data=False, - start_subtask_scheduler=True, start_postgres_listener=True, + start_subtask_scheduler=False, start_postgres_listener=True, start_dynamic_scheduler=False, enable_viewflow=False) cls.tmss_test_env.start() @@ -87,7 +91,7 @@ class TestCopyService(unittest.TestCase): # overrule the normal stations for this template with just the one CS001 LOFAR2 station station_groups = [{'stations': ['CS001'], 'max_nr_missing': 0}] scheduling_unit_spec['tasks']['Observation']['specifications_doc']['station_configuration']['station_groups'] = station_groups - scheduling_unit_spec['tasks']['Observation']['specifications_doc']['station_configuration']['antenna_set'] = 'HBA_ZERO' + scheduling_unit_spec['tasks']['Observation']['specifications_doc']['station_configuration']['antenna_set'] = 'HBA_DUAL' # create a scheduling unit draft & blueprint su_draft = create_scheduling_unit_draft_from_observing_strategy_template(strategy_template, @@ -109,11 +113,14 @@ class TestCopyService(unittest.TestCase): # create and start the service (the object under test) from lofar.sas.tmss.services.qa_statistics_service import create_qa_statistics_service + from lofar.sas.tmss.services.tuna.nomad_mock import NomadMock + service = create_qa_statistics_service(exchange=self.tmp_exchange.address, tmss_client_credentials_id=self.tmss_test_env.client_credentials.dbcreds_id) - with BusListenerJanitor(service): + with BusListenerJanitor(service), NomadMock("statistics-aggregate") as nomad: from lofar.sas.tmss.tmss.tmssapp.subtasks import wait_for_subtask_status, schedule_subtask from lofar.sas.tmss.test.test_utils import set_subtask_state_following_allowed_transitions + # start and finish preceeding observation observation_subtask = self.prepare_observation_subtask() observation_subtask = schedule_subtask(observation_subtask, datetime.utcnow() + timedelta(seconds=15)) self.assertEqual(SubtaskState.Choices.SCHEDULED.value, observation_subtask.state.value) @@ -122,8 +129,19 @@ class TestCopyService(unittest.TestCase): qa_stats_subtask = observation_subtask.successors.filter(specifications_template__type__value=SubtaskType.Choices.QA_STATISTICS.value).first() qa_stats_subtask = schedule_subtask(qa_stats_subtask, observation_subtask.scheduled_stop_time + timedelta(seconds=15)) + # wait for the qa_statistics_service to queue the subtask + wait_for_subtask_status(qa_stats_subtask, 'queued') + + self.assertTrue(nomad.server.jobs, "No nomad jobs got scheduled?") + + # progress the subtask in the mock + for subjob_id in nomad.server.jobs: + logger.info(f"Progressing nomad job {subjob_id}") + nomad.server.subjob_started(subjob_id) + nomad.server.subjob_finished(subjob_id) + # wait for the qa_statistics_service to execute the subtask, and check results - wait_for_subtask_status(qa_stats_subtask, 'finished') + wait_for_subtask_status(qa_stats_subtask, 'finished', timeout=30) # TODO: check results once qa_statistics_service.run_qa_statistics_subtask is implemented diff --git a/SAS/TMSS/backend/services/tuna/CMakeLists.txt b/SAS/TMSS/backend/services/tuna/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..d215ed0abcb42da39de1c17572aef1a2c507b583 --- /dev/null +++ b/SAS/TMSS/backend/services/tuna/CMakeLists.txt @@ -0,0 +1,6 @@ +lofar_package(TMSSTuna 0.1 DEPENDS TMSSClient PyCommon PyMessaging) + +IF(NOT SKIP_TMSS_BUILD) + add_subdirectory(lib) + add_subdirectory(test) +ENDIF(NOT SKIP_TMSS_BUILD) diff --git a/SAS/TMSS/backend/services/tuna/lib/CMakeLists.txt b/SAS/TMSS/backend/services/tuna/lib/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..3355be5bb9cb75f49ae60c10218f2e488ba41f02 --- /dev/null +++ b/SAS/TMSS/backend/services/tuna/lib/CMakeLists.txt @@ -0,0 +1,13 @@ +lofar_find_package(PythonInterp 3.7 REQUIRED) +include(PythonInstall) + +set(_py_files + __init__.py + _patch_nomad_job.py + job_dispatcher.py + nomad_mock.py +) + +python_install(${_py_files} + DESTINATION lofar/sas/tmss/services/tuna +) diff --git a/SAS/TMSS/backend/services/tuna/lib/__init__.py b/SAS/TMSS/backend/services/tuna/lib/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..ddfe1a96cdc400d2f956a62d4ca03c2146272411 --- /dev/null +++ b/SAS/TMSS/backend/services/tuna/lib/__init__.py @@ -0,0 +1,3 @@ +import lofar.sas.tmss.services.tuna._patch_nomad_job + +_patch_nomad_job.patch() diff --git a/SAS/TMSS/backend/services/tuna/lib/_patch_nomad_job.py b/SAS/TMSS/backend/services/tuna/lib/_patch_nomad_job.py new file mode 100644 index 0000000000000000000000000000000000000000..aa791f52bdffe358e010964d88c3bdfe5cbbd76b --- /dev/null +++ b/SAS/TMSS/backend/services/tuna/lib/_patch_nomad_job.py @@ -0,0 +1,36 @@ +import nomad + +"""Hotpatch nomad to add much needed functionality""" + + +def dispatch_job( + self, id_, payload=None, meta=None, idempotency_token=None, id_prefix_template=None +): + """Dispatches a new instance of a parameterized job. + + https://www.nomadproject.io/docs/http/job.html + + arguments: + - id_ + - payload + - meta + - idempotency_token + - id_prefix_template + returns: dict + raises: + - nomad.api.exceptions.BaseNomadException + - nomad.api.exceptions.URLNotFoundNomadException + """ + dispatch_json = {"Meta": meta, "Payload": payload} + if id_prefix_template: + dispatch_json["IdPrefixTemplate"] = id_prefix_template + params = {} + if idempotency_token: + params["IdempotencyToken"] = idempotency_token + return self.request( + id_, "dispatch", json=dispatch_json, method="post", params=params + ).json() + + +def patch(): + nomad.api.job.Job.dispatch_job = dispatch_job diff --git a/SAS/TMSS/backend/services/tuna/lib/job_dispatcher.py b/SAS/TMSS/backend/services/tuna/lib/job_dispatcher.py new file mode 100644 index 0000000000000000000000000000000000000000..54bee50899a28089540458f038392beca020b50f --- /dev/null +++ b/SAS/TMSS/backend/services/tuna/lib/job_dispatcher.py @@ -0,0 +1,140 @@ +import os +import typing +from abc import ABC, abstractmethod +from collections import defaultdict +from dataclasses import dataclass +from typing import TypeVar +import logging + +import nomad + +logger = logging.getLogger() + + +class NomadJob(ABC): + EXPAND = [] + KEY = None + + @property + @abstractmethod + def prefix(self): + pass + + @abstractmethod + def meta(self) -> dict: + pass + + +@dataclass +class StatisticsAggregateJob(NomadJob): + """Class for keeping track of an item in inventory.""" + + EXPAND = ["station", "antenna_field"] + KEY = "station" + + subtask_id: str + observation_id: str + station: str + antenna_field: str + begin: str + end: str + destination: str + source: str = "s3://central-statistics/" + + @property + def prefix(self): + return f"{self.subtask_id}_{self.observation_id}_{self.station}_{self.antenna_field}" + + def meta(self): + return { + # "observation_id": self.observation_id, + "station": self.station, + "antenna_field": self.antenna_field, + "begin": self.begin, + "end": self.end, + "destination": self.destination, + "source": self.source, + } + + +T = TypeVar("T", bound=NomadJob) + + +class JobGroup(list[T]): + def __init__(self, type_class: typing.Type[T]): + super().__init__() + + self.type = type_class + self.prefix = "" + + def __call__(self, **kwargs): + self.prefix = f"{kwargs['subtask_id']}_" + job_kwargs = [{k: v for k, v in kwargs.items() if k not in self.type.EXPAND}] + + for exp in self.type.EXPAND: + if type(kwargs[exp]) is not list: + continue + new_job_kwargs = [] + for v in kwargs[exp]: + for kw in job_kwargs: + new_job_kwargs.append({exp: v, **kw}) + job_kwargs = new_job_kwargs + + for exp in self.type.EXPAND: + if type(kwargs[exp]) is not dict: + continue + new_job_kwargs = [] + for kw in job_kwargs: + for v in kwargs[exp][kw[self.type.KEY]]: + new_job_kwargs.append({exp: v, **kw}) + job_kwargs = new_job_kwargs + + for jkwa in job_kwargs: + self.append(self.type(**jkwa)) + return self + + +class JobDispatcher: + def __init__(self, job_id, namespace: str = None): + self._nomad = nomad.Nomad(namespace=namespace, timeout=5) + self.job_id = job_id + + def get_job(self, jobs: NomadJob | JobGroup): + return self._nomad.jobs.get_jobs(prefix=f"{self.job_id}/dispatch-{jobs.prefix}") + + def dispatch(self, jobs: NomadJob | JobGroup): + dispatched_jobs = self.get_job(jobs) + job_ids = [j["ID"] for j in dispatched_jobs] + for job in (jobs if isinstance(jobs, list) else [jobs]): + if f"{self.job_id}/dispatch-{job.prefix}" not in job_ids: + logger.info(f"Nomad: dispatching {job}") + dispatched_jobs.append( + self._nomad.job.dispatch_job( + self.job_id, + payload=None, + meta=job.meta(), + id_prefix_template=job.prefix, + ) + ) + return dispatched_jobs + + @staticmethod + def _summarise_status(job_statusses: list[str]): + status_count = defaultdict(int) + for job in job_statusses: + status_count[job] += 1 + + if status_count["pending"] == len(job_statusses): + return "queued" + if status_count["running"] > 0: + return "started" + if status_count["dead"] + status_count["complete"] == len(job_statusses): + return "finished" + + return "started" + + def status(self, jobs: NomadJob | JobGroup): + result = self.get_job(jobs) + job_status = defaultdict(int) + + return self._summarise_status([job["Status"] for job in result]) diff --git a/SAS/TMSS/backend/services/tuna/lib/nomad_mock.py b/SAS/TMSS/backend/services/tuna/lib/nomad_mock.py new file mode 100644 index 0000000000000000000000000000000000000000..db40eae6f3cba7e2a91671f1b4522fb0b91e0564 --- /dev/null +++ b/SAS/TMSS/backend/services/tuna/lib/nomad_mock.py @@ -0,0 +1,184 @@ +#!/usr/bin/env python3 + +# Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy) +# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands +# +# This file is part of the LOFAR software suite. +# The LOFAR software suite is free software: you can redistribute it and/or +# modify it under the terms of the GNU General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# The LOFAR software suite is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. + +from functools import cached_property +import json +from http.server import BaseHTTPRequestHandler, HTTPServer +from http import HTTPStatus +from urllib.parse import parse_qsl, urlparse +from pathlib import Path +import os +import logging + +logger = logging.getLogger() +logging.basicConfig(level=logging.INFO) + + +class NomadJob: + def __init__(self, job_id, subjob_id): + self.job_id = job_id + self.subjob_id = subjob_id + self.status = "pending" + + def start(self): + self.status = "running" + + def finish(self): + self.status = "dead" + + def metadata(self) -> dict: + return { + "ID": self.job_id, + "Status": self.status, + } + + +class NomadRequestHandler(BaseHTTPRequestHandler): + @cached_property + def url(self): + return urlparse(self.path) + + @cached_property + def query(self) -> dict[str, str]: + return dict(parse_qsl(self.url.query)) + + @cached_property + def post_data(self) -> bytes: + content_length = int(self.headers.get("Content-Length", 0)) + return self.rfile.read(content_length) + + @cached_property + def post_json(self) -> dict: + return json.loads(self.post_data.decode("utf-8")) if self.post_data else {} + + @cached_property + def form_data(self): + return dict(parse_qsl(self.post_data)) + + def do_GET(self): + try: + response = self.get_response() + http_status = HTTPStatus.OK + except NotImplementedError: + http_status = HTTPStatus.NOT_FOUND + response = json.dumps( + { + "url": self.url, + "query": self.query, + } + ) + + self.send_response(http_status) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(response.encode("utf-8")) + + do_POST = do_GET + + def get_response(self): + logger.info( + f"Nomad: get_response() for {self.url=} {self.post_data=} {self.post_json=}" + ) + + path = Path(self.url.path) + + if path == Path("/v1/jobs"): + # return current job list + prefix = self.query.get("prefix", "") + matching_jobs = [ + v + for subjob_id, v in self.server.jobs.items() + if f"{self.server.job_id}/dispatch-{subjob_id}".startswith(prefix) + ] + return json.dumps([j.metadata() for j in matching_jobs]) + + elif path.match("/v1/job/*/dispatch"): + # dispatch a new job + subjob_id = self.post_json["IdPrefixTemplate"] + job_id = path.parts[-2] + + if job_id != self.server.job_id: + raise ValueError( + f"Asked to dispatch {job_id} but mock models job {self.server.job_id}" + ) + + job = NomadJob(job_id, subjob_id) + self.server.jobs[subjob_id] = job + else: + raise NotImplementedError() + + return json.dumps({"result": "ok"}) + + +class NomadHTTPServer(HTTPServer): + def __init__(self, job_id: str, *args, **kwargs): + super().__init__(*args, **kwargs) + + # active set of nomad dispatched jobs + self.job_id = job_id + self.jobs: dict[str, NomadJob] = {} + + def job_ids(self) -> list[str]: + return list(self.jobs.keys()) + + def subjob_started(self, subjob_id): + self.jobs[subjob_id].start() + + def subjob_finished(self, subjob_id): + self.jobs[subjob_id].finish() + + +class NomadMock: + def __init__(self, job_id: str, port: int = 4646): + self.job_id = job_id + self.server = NomadHTTPServer(job_id, ("0.0.0.0", port), NomadRequestHandler) + + def start(self): + import _thread + + logger.info(f"Starting mock Nomad server on port {self.server.server_port}") + + _thread.start_new_thread(self.server.serve_forever, ()) + + # tell applications where Nomad is + self.old_NOMAD_ADDR = os.environ.get("NOMAD_ADDR") + os.environ["NOMAD_ADDR"] = f"http://127.0.0.1:{self.server.server_port}" + + def stop(self): + logger.info("Stopping mock Nomad server") + + self.server.shutdown() + self.server.server_close() + + if self.old_NOMAD_ADDR: + os.environ["NOMAD_ADDR"] = self.old_NOMAD_ADDR + + def __enter__(self): + self.start() + return self + + def __exit__(self, _type, _value, _traceback): + self.stop() + + +if __name__ == "__main__": + import time + + with NomadMock(): + time.sleep(120) diff --git a/SAS/TMSS/backend/services/tuna/requirements.txt b/SAS/TMSS/backend/services/tuna/requirements.txt new file mode 100644 index 0000000000000000000000000000000000000000..537da6719165731d988610eca91a9c67e33dad5d --- /dev/null +++ b/SAS/TMSS/backend/services/tuna/requirements.txt @@ -0,0 +1 @@ +python-nomad >= 2.0 # MIT diff --git a/SAS/TMSS/backend/services/tuna/test/CMakeLists.txt b/SAS/TMSS/backend/services/tuna/test/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..0293792e5dcb6c54aeea07f551c4dcbd9870fb52 --- /dev/null +++ b/SAS/TMSS/backend/services/tuna/test/CMakeLists.txt @@ -0,0 +1,7 @@ +# $Id: CMakeLists.txt 32679 2015-10-26 09:31:56Z schaap $ + +if(BUILD_TESTING) + include(LofarCTest) + + lofar_add_test(t_tuna) +endif() diff --git a/SAS/TMSS/backend/services/tuna/test/t_tuna.py b/SAS/TMSS/backend/services/tuna/test/t_tuna.py new file mode 100644 index 0000000000000000000000000000000000000000..ca0f89699747eeb097f0e67db96a6f477835be34 --- /dev/null +++ b/SAS/TMSS/backend/services/tuna/test/t_tuna.py @@ -0,0 +1,136 @@ +from lofar.sas.tmss.services.tuna.job_dispatcher import ( + JobGroup, + StatisticsAggregateJob, + JobDispatcher, +) +from lofar.sas.tmss.services.tuna.nomad_mock import NomadMock +import lofar.sas.tmss.services.tuna # required as it activates our nomad patch in _patch_nomad_job + +import unittest + +import nomad + + +class TestJobGroup(unittest.TestCase): + def test_station_antennafield_expansion(self): + jobs = JobGroup(StatisticsAggregateJob)( + observation_id="obs_id4", + subtask_id="12345", + station=["cs001", "rs123"], + antenna_field={"cs001": ["lba", "hba0", "hba1"], "rs123": ["lba", "hba"]}, + begin="2024-06-20T00:29:15.050771+00:00", + end="2024-06-20T00:40:15.050771+00:00", + destination="s3://dataproducts/", + ) + + stations = set([j.station for j in jobs]) + + self.assertEqual({"cs001", "rs123"}, stations) + + rs123_antenna_fields = set( + [j.antenna_field for j in jobs if j.station == "rs123"] + ) + self.assertEqual({"lba", "hba"}, rs123_antenna_fields) + + cs001_antenna_fields = set( + [j.antenna_field for j in jobs if j.station == "cs001"] + ) + self.assertEqual({"lba", "hba0", "hba1"}, cs001_antenna_fields) + + +class TestJobDispatcher(unittest.TestCase): + def test_summarise_status(self): + self.assertEqual( + "queued", JobDispatcher._summarise_status(["pending", "pending", "pending"]) + ) + self.assertEqual( + "started", + JobDispatcher._summarise_status(["pending", "pending", "running"]), + ) + self.assertEqual( + "started", + JobDispatcher._summarise_status(["pending", "pending", "finished"]), + ) + self.assertEqual( + "started", JobDispatcher._summarise_status(["pending", "pending", "dead"]) + ) + self.assertEqual( + "started", JobDispatcher._summarise_status(["pending", "running", "dead"]) + ) + self.assertEqual( + "started", JobDispatcher._summarise_status(["pending", "dead", "dead"]) + ) + self.assertEqual( + "started", JobDispatcher._summarise_status(["running", "dead", "dead"]) + ) + self.assertEqual( + "finished", JobDispatcher._summarise_status(["dead", "dead", "dead"]) + ) + + def test_nomad_mock(self): + with NomadMock("job_id"): + n = nomad.Nomad() + + def test_job_dispatch_single_job(self): + with NomadMock("job_id"): + n = nomad.Nomad() + + job = StatisticsAggregateJob( + observation_id="obs_id4", + subtask_id="12345", + station="cs001", + antenna_field="lba", + begin="2024-06-20T00:29:15.050771+00:00", + end="2024-06-20T00:40:15.050771+00:00", + destination="s3://dataproducts/", + ) + + dispatcher = JobDispatcher("job_id") + + dispatched_jobs = dispatcher.dispatch(job) + self.assertEqual(1, len(dispatched_jobs)) + + def test_job_dispatch_job_groups(self): + with NomadMock("job_id"): + n = nomad.Nomad() + + jobs = JobGroup(StatisticsAggregateJob)( + observation_id="obs_id4", + subtask_id="12345", + station=["cs001", "cs002"], + antenna_field=["lba"], + begin="2024-06-20T00:29:15.050771+00:00", + end="2024-06-20T00:40:15.050771+00:00", + destination="s3://dataproducts/", + ) + + dispatcher = JobDispatcher("job_id") + + dispatched_jobs = dispatcher.dispatch(jobs) + self.assertEqual(2, len(dispatched_jobs)) + + def test_job_dispatch_get_job(self): + with NomadMock("job_id"): + n = nomad.Nomad() + + jobs = JobGroup(StatisticsAggregateJob)( + observation_id="obs_id4", + subtask_id="12345", + station=["cs001", "cs002"], + antenna_field=["lba"], + begin="2024-06-20T00:29:15.050771+00:00", + end="2024-06-20T00:40:15.050771+00:00", + destination="s3://dataproducts/", + ) + + dispatcher = JobDispatcher("job_id") + + _ = dispatcher.dispatch(jobs) + + retrieved_jobs = dispatcher.get_job(jobs) + + self.assertEqual(2, len(retrieved_jobs)) + + +if __name__ == "__main__": + unittest.main() diff --git a/SAS/TMSS/backend/services/tuna/test/t_tuna.run b/SAS/TMSS/backend/services/tuna/test/t_tuna.run new file mode 100755 index 0000000000000000000000000000000000000000..fc6b76ee6ecac7e33cde36372004fdbb6a3f822e --- /dev/null +++ b/SAS/TMSS/backend/services/tuna/test/t_tuna.run @@ -0,0 +1,4 @@ +#!/bin/bash + +python3 t_tuna.py + diff --git a/SAS/TMSS/backend/services/tuna/test/t_tuna.sh b/SAS/TMSS/backend/services/tuna/test/t_tuna.sh new file mode 100755 index 0000000000000000000000000000000000000000..46f3a8a9ad6876c844b979376b6bb4aaccae24ba --- /dev/null +++ b/SAS/TMSS/backend/services/tuna/test/t_tuna.sh @@ -0,0 +1,3 @@ +#!/bin/sh + +./runctest.sh t_tuna diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/populate.py b/SAS/TMSS/backend/src/tmss/tmssapp/populate.py index 5e54a9aecc4c7d4fdcfbbcfaf3c7b5d66eccd51d..696a5f2bc22f0e40b448d383b95e42ec7e593401 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/populate.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/populate.py @@ -611,6 +611,7 @@ def populate_resources(apps, schema_editor): def populate_misc(apps, schema_editor): + # CEP4 cluster cep4_cluster = Cluster.objects.create(name="CEP4", location="CIT", archive_site=False) from lofar.common import isProductionEnvironment, isTestEnvironment if isProductionEnvironment(): @@ -620,6 +621,15 @@ def populate_misc(apps, schema_editor): else: Filesystem.objects.create(name="Development Test FS", cluster=cep4_cluster, capacity=1e9, directory="/tmp") + # Cental S3 storage (accessible through s3://directory/directory/filename) + s3_cluster = Cluster.objects.create(name="S3", location="CIT", archive_site=False) + if isProductionEnvironment(): + Filesystem.objects.create(name="Production Storage (CIT)", cluster=s3_cluster, capacity=18e12, directory="dataproducts") + elif isTestEnvironment(): + Filesystem.objects.create(name="Test Storage (CIT)", cluster=s3_cluster, capacity=18e12, directory="dataproducts") + else: + Filesystem.objects.create(name="Development Test FS", cluster=s3_cluster, capacity=1e9, directory="/tmp") + # LTA-site clusters sara_cluster = Cluster.objects.create(name="SARA", location="SARA", archive_site=True) juelich_cluster = Cluster.objects.create(name="Jülich", location="Jülich", archive_site=True) diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/schemas/dataproduct_feedback_template/qa-statistics-1.json b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/dataproduct_feedback_template/qa-statistics-1.json new file mode 100644 index 0000000000000000000000000000000000000000..986f66377d50a0269a096af10fb65eac623b5ea3 --- /dev/null +++ b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/dataproduct_feedback_template/qa-statistics-1.json @@ -0,0 +1,175 @@ +{ + "description": "<no description>", + "name": "qa-statistics feedback", + "purpose": "technical_commissioning", + "schema": { + "$id": "https://tmss.lofar.eu/api/schemas/dataproductfeedbacktemplate/qa-statistics%20feedback/1#", + "$schema": "http://json-schema.org/draft-06/schema#", + "additionalProperties": false, + "default": {}, + "description": "<no description>", + "patternProperties": { + "^[$]schema$": {} + }, + "properties": { + "antennas": { + "$ref": "https://tmss.lofar.eu/api/schemas/commonschematemplate/stations/9/#/definitions/antennas", + "default": {} + }, + "statistics_type": { + "default": "SST", + "enum": [ + "BST", + "SST", + "XST" + ], + "title": "Type of statistics", + "type": "string" + }, + "frequency": { + "default": {}, + "properties": { + "central_frequencies": { + "default": [], + "items": { + "default": 0.0, + "minimum": 0.0, + "title": "frequency", + "type": "number" + }, + "title": "Central frequencies", + "type": "array" + }, + "subbands": { + "default": [ + 0 + ], + "items": { + "maximum": 511, + "minimum": 0, + "title": "Subband", + "type": "integer" + }, + "maxItems": 488, + "minItems": 1, + "title": "Subbands", + "type": "array" + } + }, + "required": [ + "subbands", + "central_frequencies" + ], + "title": "Frequency", + "type": "object" + }, + "percentage_written": { + "default": 0, + "title": "Percentage written", + "type": "integer" + }, + "samples": { + "default": {}, + "properties": { + "bits": { + "default": 32, + "enum": [ + 4, + 8, + 16, + 32, + 64 + ], + "title": "Bits per sample", + "type": "integer" + }, + "complex": { + "default": true, + "title": "Complex values", + "type": "boolean" + }, + "polarisations": { + "default": [ + "XX", + "XY", + "YX", + "YY" + ], + "items": { + "default": "I", + "enum": [ + "XX", + "XY", + "YX", + "YY", + "I", + "Q", + "U", + "V", + "Xre", + "Xim", + "Yre", + "Yim" + ], + "title": "Polarisation", + "type": "string" + }, + "title": "Polarisations", + "type": "array" + }, + "type": { + "default": "float", + "enum": [ + "float", + "integer" + ], + "title": "Type", + "type": "string" + } + }, + "required": [ + "polarisations", + "type", + "complex", + "bits" + ], + "title": "Samples", + "type": "object" + }, + "time": { + "default": {}, + "properties": { + "duration": { + "default": 0.0, + "title": "Duration", + "type": "number" + }, + "sample_width": { + "default": 0.0, + "title": "Sample width", + "type": "number" + }, + "start_time": { + "$ref": "https://tmss.lofar.eu/api/schemas/commonschematemplate/datetime/9/#/definitions/timestamp", + "default": "1970-01-01T00:00:00Z", + "title": "Start time" + } + }, + "required": [ + "start_time", + "duration", + "sample_width" + ], + "title": "Time", + "type": "object" + } + }, + "required": [ + ], + "title": "qa-statistics feedback", + "type": "object", + "version": 1 + }, + "state": "active", + "version": 1 +} diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py index 9265b834f16680eac0b07611232545255ac1613c..8f83e2782489dcf81f399c683d53b9cda9f2bb17 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py @@ -759,7 +759,7 @@ def create_qastatistics_subtask_from_task_blueprint(task_blueprint: TaskBlueprin "specifications_template": qa_subtask_template, "specifications_doc": qa_subtask_spec, "primary": False, - "cluster": obs_subtask.cluster} + "cluster": Cluster.objects.get(name="S3") } qa_subtask = Subtask.objects.create(**qa_subtask_data) # create and link subtask input/output @@ -1440,12 +1440,14 @@ def schedule_qastatistics_subtask(qastats_subtask: Subtask): qastats_subtask.save() observation_subtask = qastats_subtask.inputs.first().producer.subtask - directory = os.path.join(_output_root_directory(observation_subtask), 'xst') + + # TODO: files are not on CEP4, but on the central S3 store. + directory = _output_root_directory(observation_subtask) # create output dataproducts, and link these to the output dp_spec_template = DataproductSpecificationsTemplate.get_version_or_latest(name="empty") dp_spec_doc = dp_spec_template.get_default_json_document_for_schema() - dp_feedback_template = DataproductFeedbackTemplate.get_version_or_latest(name="empty") + dp_feedback_template = DataproductFeedbackTemplate.get_version_or_latest(name="qa-statistics feedback") # stub dp_feedback_doc = dp_feedback_template.get_default_json_document_for_schema() dataformat = Dataformat.objects.get(value=Dataformat.Choices.QA_STATISTICS_HDF5.value) @@ -1455,14 +1457,16 @@ def schedule_qastatistics_subtask(qastats_subtask: Subtask): # only lofar2 stations generate new qa statistics files used_lofar2_stations = sorted(list(set(observation_subtask.stations).intersection(set(get_lofar2_stations())))) + # create the dataproducts qastats_subtask_dataproducts = [] for station in used_lofar2_stations: fields = antennafields_for_antennaset_and_station(antennaset, station) for field in fields: for stat_type in ('xst', 'bst', 'sst'): + # filename as constructed in stingray/infra/jobs/central/aggregate.levant.nomad qastats_subtask_dataproducts.append(Dataproduct(filename="L%s_%s_%s_%s.h5" % (observation_subtask.id, station, field, stat_type), - directory=directory, + directory=os.path.join(directory, stat_type), dataformat=dataformat, datatype=datatype, producer=qastats_output, diff --git a/SAS/TMSS/deploy/qastatistics/docker-compose.yml b/SAS/TMSS/deploy/qastatistics/docker-compose.yml new file mode 100644 index 0000000000000000000000000000000000000000..d3e6b6801d615e12ddef96261ced84fb7b05f014 --- /dev/null +++ b/SAS/TMSS/deploy/qastatistics/docker-compose.yml @@ -0,0 +1,23 @@ +version: '3' + +services: + qa_statistics: + container_name: tmss_qa_statistics + image: tmss_qa_statistics + build: + context: ../app + dockerfile: Dockerfile + args: + SOURCE_IMAGE: ${SOURCE_IMAGE} + HOME: "/localhome/lofarsys" + restart: unless-stopped + env_file: + - env + environment: + - USER=lofarsys + - HOME=/localhome/lofarsys + command: /bin/bash -c 'source /opt/lofar/lofarinit.sh; exec tmss_qa_statistics_service' + logging: + driver: journald + options: + tag: tmss_qa_statistics