From a59a5de0f40a132886c8943c756f27dabd8a6c60 Mon Sep 17 00:00:00 2001 From: Jorrit Schaap <schaap@astron.nl> Date: Mon, 1 Feb 2021 11:51:46 +0100 Subject: [PATCH] TMSS-307: while working on TMSS commissioning with TMSS-573 it showed that feedback processing was not robust against undeterministic feedback message order. Now it is. --- .../lib/feedback_handling.py | 20 +-- .../test/t_feedback_handling_service.py | 169 ++++++++---------- .../src/tmss/tmssapp/adapters/feedback.py | 155 ++++++++-------- .../src/tmss/tmssapp/models/scheduling.py | 5 + .../src/tmss/tmssapp/viewsets/scheduling.py | 17 +- SAS/TMSS/client/lib/tmss_http_rest_client.py | 29 +-- 6 files changed, 180 insertions(+), 215 deletions(-) diff --git a/SAS/TMSS/backend/services/feedback_handling/lib/feedback_handling.py b/SAS/TMSS/backend/services/feedback_handling/lib/feedback_handling.py index b19cb6ef130..e5c6b048192 100644 --- a/SAS/TMSS/backend/services/feedback_handling/lib/feedback_handling.py +++ b/SAS/TMSS/backend/services/feedback_handling/lib/feedback_handling.py @@ -49,13 +49,10 @@ class TMSSFeedbackListener: self._tmss_client = TMSSsession.create_from_dbcreds_for_ldap(rest_client_creds_id) self._qpid_broker = qpid_broker - def append_feedback_to_tmss_subtask_raw_feedback(self, subtask_id: int, raw_feedback: str): - logger.info('Appending feedback to TMSS subtask %s' % subtask_id) - self._tmss_client.append_to_subtask_raw_feedback(subtask_id, raw_feedback) - - def process_subtask_feedback_and_set_finished(self, subtask_id: int): - logger.info('Calling TMSS to process feedback of subtask %s' % subtask_id) - self._tmss_client.process_subtask_feedback_and_set_finished(subtask_id) + def process_feedback_and_set_to_finished_if_complete(self, subtask_id: int, feedback: str): + logger.info('submitting feedback for subtask id=%s to TMSS', subtask_id) + updated_subtask = self._tmss_client.process_feedback_and_set_to_finished_if_complete(subtask_id=subtask_id, feedback=feedback) + logger.info('subtask id=%s with the processed feedback has state %s', subtask_id, updated_subtask['state_value']) def __enter__(self): self.start_handling() @@ -84,14 +81,11 @@ class TMSSFeedbackListener: # We know that TMSS sets its subtask_id in the parset in the Observation.ObsID field, # so we can fetch the TMSS subtask_id from the msg's sasid. tmss_subtask_id = content.sasid + feedback = content.payload - logger.info("feedback for TMSS subtask id=%s payload=%s", tmss_subtask_id, content.payload) - - # add contained feedback to TMSS - self.append_feedback_to_tmss_subtask_raw_feedback(tmss_subtask_id, content.payload) + logger.info("feedback for TMSS subtask id=%s feedback=%s", tmss_subtask_id, feedback) - # try processing it, which will will fail until feedback of the subtask is complete. - self.process_subtask_feedback_and_set_finished(tmss_subtask_id) + self.process_feedback_and_set_to_finished_if_complete(tmss_subtask_id, feedback) except TimeoutError: pass except Exception as e: diff --git a/SAS/TMSS/backend/services/feedback_handling/test/t_feedback_handling_service.py b/SAS/TMSS/backend/services/feedback_handling/test/t_feedback_handling_service.py index 902a34c7148..3ff0cf9790e 100755 --- a/SAS/TMSS/backend/services/feedback_handling/test/t_feedback_handling_service.py +++ b/SAS/TMSS/backend/services/feedback_handling/test/t_feedback_handling_service.py @@ -28,9 +28,9 @@ from lofar.sas.tmss.test.test_utils import TMSSTestEnvironment from lofar.sas.tmss.test.tmss_test_data_rest import TMSSRESTTestDataCreator from lofar.sas.tmss.services.feedback_handling import TMSSFeedbackListener -from lofar.common.test_utils import integration_test -from lofar.messagebus.messagebus import broker_feedback, ToBus -from lofar.messagebus.protocols import TaskFeedbackProcessing, TaskFeedbackDataproducts +from lofar.common.test_utils import integration_test, exit_with_skipped_code_if_skip_integration_tests + +exit_with_skipped_code_if_skip_integration_tests() @integration_test class TestFeedbackHandlingService(unittest.TestCase): @@ -38,111 +38,94 @@ class TestFeedbackHandlingService(unittest.TestCase): Tests for the FeedbackHandlingService ''' - feedback_1 = """feedback_version=03.01.00 -LOFAR.ObsSW.Observation.DataProducts.Output_Correlated_[0].centralFrequency=33593750.0 -LOFAR.ObsSW.Observation.DataProducts.Output_Correlated_[0].channelsPerSubband=32 -LOFAR.ObsSW.Observation.DataProducts.Output_Correlated_[0].channelWidth=6103.515625""" - - feedback_2 = """Observation.Correlator.channelWidth=3051.7578125 + # a chunk of feedback as it comes from the correlator + # string can be filled in with .format(nrOfOutput_Correlated=...) + feedback_correlator_chunk = """Observation.Correlator.channelWidth=3051.7578125 Observation.Correlator.channelsPerSubband=64 Observation.Correlator.integrationInterval=1.00663296 -Observation.DataProducts.Output_Correlated_[0].SAP=0 -Observation.DataProducts.Output_Correlated_[0].centralFrequency=30468750.000000 -Observation.DataProducts.Output_Correlated_[0].channelWidth=3051.757812""" +Observation.DataProducts.nrOfOutput_Beamformed_=0 +Observation.DataProducts.nrOfOutput_Correlated_={nrOfOutput_Correlated} +_isCobalt=T +feedback_version=03.01.00""" + + # a chunk of feedback as it comes from the correlator for each dataproduct + # string can be filled in with .format(subband=...) + feedback_dataproduct_chunk = """Observation.DataProducts.Output_Correlated_[{subband}].SAP=0 +Observation.DataProducts.Output_Correlated_[{subband}].centralFrequency=102734375.000000 +Observation.DataProducts.Output_Correlated_[{subband}].channelWidth=3051.757812 +Observation.DataProducts.Output_Correlated_[{subband}].channelsPerSubband=64 +Observation.DataProducts.Output_Correlated_[{subband}].duration=0 +Observation.DataProducts.Output_Correlated_[{subband}].fileFormat=AIPS++/CASA +Observation.DataProducts.Output_Correlated_[{subband}].filename=L2000000_SAP000_SB{subband:03d}_uv.MS +Observation.DataProducts.Output_Correlated_[{subband}].integrationInterval=1.006633 +Observation.DataProducts.Output_Correlated_[{subband}].location=CEP4:/data/test-projects/high/L2000000/uv +Observation.DataProducts.Output_Correlated_[{subband}].percentageWritten=0 +Observation.DataProducts.Output_Correlated_[{subband}].size=0 +Observation.DataProducts.Output_Correlated_[{subband}].startTime=2021-01-29 12:39:00 +Observation.DataProducts.Output_Correlated_[{subband}].stationSubband={subband} +Observation.DataProducts.Output_Correlated_[{subband}].storageWriter=LOFAR +Observation.DataProducts.Output_Correlated_[{subband}].storageWriterVersion=3 +Observation.DataProducts.Output_Correlated_[{subband}].subband={subband}""" @classmethod def setUpClass(cls) -> None: - cls.tmss_test_env = TMSSTestEnvironment() + cls.tmss_test_env = TMSSTestEnvironment(populate_schemas=True) cls.tmss_test_env.start() cls.test_data_creator = cls.tmss_test_env.create_test_data_creator() cls.feedback_listener = TMSSFeedbackListener(rest_client_creds_id=cls.tmss_test_env.client_credentials.dbcreds_id) - cls.feedback_listener.start_handling() + + # do not start_handling, cause it would connect to the old and obsolete qpid bus, for which we don't have a test broker. + # So, we skip testing the qpid messagebus part, and only test the handling of raw feedback in this service and tmss. + cls.feedback_listener._tmss_client.open() @classmethod def tearDownClass(cls) -> None: - cls.feedback_listener.stop_handling() + cls.feedback_listener._tmss_client.close() cls.tmss_test_env.stop() - - @integration_test - @unittest.skip('requires old Qpid environment') - def test_feedback_arriving_on_messagebus_is_added_to_tmss_subtask(self): - """ - ! This does not work yet, unfortunately, messages are sent, but for some reason not received. - ! I assume that this is some exchange/queue/routing issue with Qpid and that it should work against a proper broker setup.... - - Note that this test only works against an old Qpid broker, not RabbitMQ, because the feedback messages are legacy - for MoM compatibility and have not been converted to the new messaging library we use nowadays. - - In the SAS CI container, I stopped rabbitmq and ran this instead: - - > yum install qpid-cpp-server - > yum install qpid-tools - > qpid-config add queue devel.otdb.task.feedback.processing - > qpid-config add queue devel.otdb.task.feedback.dataproducts - > qpidd & - - Not sure how to best run rabbitmq and qpid in parallel... - """ - subtask = self.test_data_creator.post_data_and_get_response_as_json_object(self.test_data_creator.Subtask(), '/subtask/') - subtask_id = subtask['id'] - - with self.tmss_test_env.create_tmss_client() as tmss_client: - def send_feedback_to_exchange_and_assert_in_subtask(exchange, feedback, subtask_id): - subtask = tmss_client.get_subtask(subtask_id) - - # send feedback on messagebus - bus = ToBus(exchange, broker=broker_feedback) - msg = TaskFeedbackProcessing( - "tmss.test", - "", - "Test feedback emerging from the tombs of LOFAR", - subtask_id, - subtask_id, - self.feedback_1) - bus.send(msg) - - # wait for service to update subtask - start = datetime.datetime.utcnow() - subtask_updated_at = subtask['updated_at'] - while subtask_updated_at == subtask["updated_at"]: - subtask = tmss_client.get_subtask(subtask_id) - sleep(0.5) - if datetime.datetime.utcnow() - start > datetime.timedelta(seconds=2): - raise TimeoutError() - - # assert feedback is on the subtask - self.assertIsNotNone(subtask['raw_feedback']) - self.assertTrue(feedback in subtask['raw_feedback']) - - # send and assert two feedback snippets - send_feedback_to_exchange_and_assert_in_subtask("otdb.task.feedback.dataproducts", self.feedback_1, subtask_id) - send_feedback_to_exchange_and_assert_in_subtask("otdb.task.feedback.processing", self.feedback_2, subtask_id) - - # assert once more that BOTH feedbacks are present to make sure it gets appended and not replaced - subtask = tmss_client.get_subtask(subtask_id) - logger.warning(subtask) - self.assertTrue(self.feedback_1 in subtask["raw_feedback"] and self.feedback_2 in subtask["raw_feedback"]) - - @integration_test - def test_append_feedback_to_tmss_subtask_raw_feedback_updates_subtask(self): - - # create subtask - subtask = self.test_data_creator.post_data_and_get_response_as_json_object(self.test_data_creator.Subtask(), '/subtask/') - subtask_id = subtask['id'] - + def test_01_feedback_arriving_on_messagebus_is_added_to_tmss_subtask(self): with self.tmss_test_env.create_tmss_client() as tmss_client: - - # append bits of feedback - self.feedback_listener.append_feedback_to_tmss_subtask_raw_feedback(subtask_id, self.feedback_1) - self.feedback_listener.append_feedback_to_tmss_subtask_raw_feedback(subtask_id, self.feedback_2) - - # assert all feedback is there - subtask = tmss_client.get_subtask(subtask_id) - self.assertIsNotNone(subtask['raw_feedback']) - self.maxDiff = None - self.assertEqual(self.feedback_1 + '\n' + self.feedback_2, subtask['raw_feedback']) + # create a subtask with some output dataproducts with initial emtpy feedback + dataproduct_feedback_templates = tmss_client.get_path_as_json_object('dataproduct_feedback_template') + empty_dataproduct_feedback_template = next(x for x in dataproduct_feedback_templates if x['name']=='empty') + + dataproduct_feedback_templates = tmss_client.get_path_as_json_object('subtask_template') + obs_subtask_template = next(x for x in dataproduct_feedback_templates if x['name']=='observation control') + + subtask = self.test_data_creator.post_data_and_get_response_as_json_object(self.test_data_creator.Subtask(specifications_template_url=obs_subtask_template['url']), '/subtask/') + subtask_id = subtask['id'] + subtask_output = self.test_data_creator.post_data_and_get_response_as_json_object(self.test_data_creator.SubtaskOutput(subtask_url=subtask['url']), '/subtask_output/') + NUM_DATAPRODUCTS = 4 + for i in range(NUM_DATAPRODUCTS): + self.test_data_creator.post_data_and_get_response_as_json_object(self.test_data_creator.Dataproduct(subtask_output_url=subtask_output['url'], + filename="L%d_SAP000_SB%03d_uv.MS" % (subtask_id, i), + dataproduct_feedback_template_url=empty_dataproduct_feedback_template['url']), + '/dataproduct/') + + # check the intial dataproducts have empty feedback + dataproducts = tmss_client.get_subtask_output_dataproducts(subtask_id=subtask_id) + self.assertEqual(NUM_DATAPRODUCTS, len(dataproducts)) + for dataproduct in dataproducts: + self.assertEqual(empty_dataproduct_feedback_template['url'], dataproduct['feedback_template']) + + # TMSS only accepts feedback in finishing state + tmss_client.set_subtask_status(subtask_id=subtask_id, status='finishing') + + # mimich incoming feedback chunck via the messagebus + # assume the old qpid messagebus just works, and delivers proper feedback chuncks in the payload. + + # create the chunks of actual feedback and upload/process them + feedback_cor = self.feedback_correlator_chunk.format(nrOfOutput_Correlated=NUM_DATAPRODUCTS) + self.feedback_listener.process_feedback_and_set_to_finished_if_complete(subtask_id, feedback_cor) + for i in range(NUM_DATAPRODUCTS): + feedback_dp = self.feedback_dataproduct_chunk.format(subband=i) + self.feedback_listener.process_feedback_and_set_to_finished_if_complete(subtask_id, feedback_dp) + + + # the_dataproduct_feedback_template = next(x for x in dataproduct_feedback_templates if x['name']=='feedback') + from lofar.common.util import waitForInterrupt + waitForInterrupt() logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/feedback.py b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/feedback.py index ba3170f6dd7..216a54e7e4a 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/feedback.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/feedback.py @@ -20,6 +20,9 @@ from dateutil import parser from lofar.sas.tmss.tmss.tmssapp.models import * from lofar.sas.tmss.tmss.tmssapp.conversions import antennafields_for_antennaset_and_station +from lofar.parameterset import parameterset +from lofar.common.util import single_line_with_single_spaces + import logging logger = logging.getLogger(__name__) @@ -50,44 +53,49 @@ def check_feedback_is_complete(raw_feedback): return not empty -def process_subtask_feedback(subtask:Subtask): - logger.info('Now processing feedback of subtask id=%s type=%s' % (subtask.id, subtask.specifications_template.type.value)) - logger.info(subtask.raw_feedback) - feedback_dict = parse_feedback(subtask.raw_feedback) - logger.info(feedback_dict) +def process_feedback_into_subtask_dataproducts(subtask:Subtask, feedback: parameterset) -> Subtask: + if subtask.specifications_template.type.value not in [SubtaskType.Choices.OBSERVATION.value, SubtaskType.Choices.PIPELINE.value]: + raise ValueError("Cannot process feedback for subtask id=%s since type=%s not in %s" % + (subtask.id, subtask.specifications_template.type.value, + [SubtaskType.Choices.OBSERVATION.value, SubtaskType.Choices.PIPELINE.value])) + + logger.info('processing feedback into the dataproducts of subtask id=%s type=%s feedback: %s', subtask.id, subtask.specifications_template.type.value, single_line_with_single_spaces(str(feedback))) - dataproduct_feedback_docs = {} + # create a subset in dict-form with the dataproduct information if subtask.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value: - prefix = 'Observation.DataProducts.' + dataproducts_feedback = feedback.makeSubset('Observation.DataProducts.') elif subtask.specifications_template.type.value == SubtaskType.Choices.PIPELINE.value: - prefix = 'LOFAR.ObsSW.Observation.DataProducts.' - else: - raise ValueError("Cannot process feedback of subtask id=%s since type=%s not in %s" % - (subtask.id, subtask.specifications_template.type.value, - [SubtaskType.Choices.OBSERVATION.value, SubtaskType.Choices.PIPELINE.value])) + dataproducts_feedback = feedback.makeSubset('LOFAR.ObsSW.Observation.DataProducts.') - for dataproduct_type in ['Correlated', 'Beamformed']: - # iterate over dataproducts in feedback - i = 0 - while True: - dpkey = "%sOutput_%s_[%s]" % (prefix, dataproduct_type, i) - if dpkey + '.subband' not in feedback_dict.keys(): - break - - # determine corresponding TMSS dataproduct - dataproduct = Dataproduct.objects.get(filename=feedback_dict[dpkey+'.filename']) + # extract the unique dataproduct keys, so we can loop over them + dp_keys = sorted(list(set([key[:key.find('.')] for key in dataproducts_feedback.keys() if key.startswith('Output_')]))) + + # process each dataproduct subset + for dp_key in dp_keys: + dp_feedback = dataproducts_feedback.makeSubset(dp_key+'.').dict() + + # determine corresponding TMSS dataproduct + try: + dataproduct = subtask.output_dataproducts.get(filename=dp_feedback['filename']) + except Dataproduct.DoesNotExist: + logger.error("cannot process feedback for %s. No such output dataproduct known for subtask id=%s", dp_feedback['filename'], subtask.id) + continue + + try: + logger.info('processing feedback for dataproduct id=%s filename=%s of subtask id=%s feedback: %s', dataproduct.id, dataproduct.filename, subtask.id, single_line_with_single_spaces(str(dp_feedback))) + + # set the feedback_template, so we can fill the feedback json doc for the dataproduct dataproduct.feedback_template = DataproductFeedbackTemplate.objects.get(name='feedback') - logger.debug('Found dataproduct %s' % dataproduct.filename) # derive values or collect for different subtask types - storagewriter = feedback_dict[dpkey + '.storageWriter'].lower() + storagewriter = dp_feedback['storageWriter'].lower() if storagewriter == "casa": storagewriter = "standard" # todo: is that correct? elif storagewriter == "lofar": storagewriter = "lofarstman" if subtask.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value: - subbands = [int(feedback_dict[dpkey+'.stationSubband'])] + subbands = [int(dp_feedback['stationSubband'])] duration = (subtask.stop_time - subtask.start_time).total_seconds() antennaset = subtask.specifications_doc['stations']['antenna_set'] stationlist = subtask.specifications_doc['stations']['station_list'] @@ -96,12 +104,12 @@ def process_subtask_feedback(subtask:Subtask): for station in stationlist: fields = antennafields_for_antennaset_and_station(antennaset, station) antennafields += [{"station": station, "field": field, "type": antennatype} for field in fields] - pointing = subtask.specifications_doc['stations']['digital_pointings'][int(feedback_dict[dpkey+'.SAP'])]['pointing'] + pointing = subtask.specifications_doc['stations']['digital_pointings'][int(dp_feedback['SAP'])]['pointing'] else: input_dataproduct = DataproductTransform.objects.get(output=dataproduct).input logger.debug('Found input dataproduct %s' % input_dataproduct.filename) subbands = input_dataproduct.feedback_doc["frequency"]['subbands'] - duration = float(feedback_dict[dpkey + '.duration']) + duration = float(dp_feedback['duration']) antennaset = input_dataproduct.feedback_doc["antennas"]['set'] antennafields = input_dataproduct.feedback_doc["antennas"]['fields'] pointing = input_dataproduct.feedback_doc["target"]['pointing'] @@ -109,17 +117,17 @@ def process_subtask_feedback(subtask:Subtask): # add feedback doc to dataproduct dataproduct.feedback_doc={ - "percentage_written": int(feedback_dict[dpkey+'.percentageWritten']), + "percentage_written": int(dp_feedback['percentageWritten']), "frequency": { "subbands": subbands, - "central_frequencies": [float(feedback_dict[dpkey+'.centralFrequency'])], - "channel_width": float(feedback_dict[dpkey + '.channelWidth']), - "channels_per_subband": int(feedback_dict[dpkey + '.channelsPerSubband']) + "central_frequencies": [float(dp_feedback['centralFrequency'])], + "channel_width": float(dp_feedback['channelWidth']), + "channels_per_subband": int(dp_feedback['channelsPerSubband']) }, "time": { - "start_time": parser.parse(feedback_dict[dpkey+'.startTime'], ignoretz=True).isoformat(), + "start_time": parser.parse(dp_feedback['startTime'], ignoretz=True).isoformat()+'Z', "duration": duration, - "sample_width": float(feedback_dict[dpkey+'.integrationInterval']), + "sample_width": float(dp_feedback['integrationInterval']), }, "antennas": { "set": antennaset, @@ -133,61 +141,54 @@ def process_subtask_feedback(subtask:Subtask): "type": "float", # fixed "bits": 32, # fixed "writer": storagewriter, - "writer_version": feedback_dict[dpkey + '.storageWriterVersion'], + "writer_version": dp_feedback['storageWriterVersion'], "complex": True # fixed } } - i += 1 + dataproduct.save() - logger.info('Saved %s %s' % (dataproduct.filename, dataproduct.feedback_doc)) + logger.info('saved processed feedback into dataproduct id=%s filename=%s feedback_doc=%s', dataproduct.id, dataproduct.filename, dataproduct.feedback_doc) + except Exception as e: + logger.error('error while processing feedback for dataproduct id=%s filename=%s feedback=%s error: %s', dataproduct.id, dataproduct.filename, dp_feedback, e) + return subtask + + +def append_to_subtask_raw_feedback(subtask: Subtask, feedback: parameterset) -> Subtask: + """ append/merge the given feedback into the already stored raw_feedback + by using this parset-merging strategy we ensure a properly formatted plain text raw_feedback doc with no duplicate entries + """ + feedback_parset = parameterset.fromString(subtask.raw_feedback or "") + feedback_parset.adoptDict(feedback.dict()) + subtask.raw_feedback = str(feedback_parset) + subtask.save() + return subtask -def generate_dataproduct_feedback_from_subtask_feedback_and_set_finished(subtask:Subtask): +def process_feedback_for_subtask_and_set_to_finished_if_complete(subtask: Subtask, feedback_doc: str) -> Subtask: """ Translates raw feedback from a subtask (which has been provided by Cobalt or pipelines) and translate it to json documents for the individual dataproducts. """ # check we are in finishing state and all feedback has arrived - if subtask.state != SubtaskState.objects.get(value='finishing'): - raise ValueError('Subtask id=%s state=%s is not in state %s' % (subtask.id, subtask.state, SubtaskState.Choices.FINISHING.value)) - raw_feedback = subtask.raw_feedback - try: - check_feedback_is_complete(raw_feedback) - except ValueError as original_error: - raise ValueError("Feedback of subtask_id=%s is not complete: %s " % (subtask.id, original_error)) - - # convert raw feedback to dataproduct feedback docs - process_subtask_feedback(subtask) - - # set subtask state to finished - subtask.state = SubtaskState.objects.get(value='finished') - subtask.save() + if subtask.state.value != SubtaskState.objects.get(value='finishing').value: + raise ValueError("Cannot process feedback for subtask id=%s because the state is '%s' and not '%s'" % (subtask.id, subtask.state, SubtaskState.Choices.FINISHING.value)) + + # the submitted feedback_doc is (should be) a plain text document in parset format + # so, treat it as a parset + new_feedback_parset = parameterset.fromString(feedback_doc) + + # store the new feedback + subtask = append_to_subtask_raw_feedback(subtask=subtask, feedback=new_feedback_parset) + + # and process it + subtask = process_feedback_into_subtask_dataproducts(subtask, new_feedback_parset) + + # if complete, set subtask state to finished + if subtask.is_feedback_complete: + logger.info("Feedback for subtask id=%s is complete. Setting state to 'finished'", subtask.id) + subtask.state = SubtaskState.objects.get(value='finished') + subtask.save() + return subtask -print(''' -Observation.DataProducts.Output_Correlated_[14].SAP=0 -Observation.DataProducts.Output_Correlated_[14].centralFrequency=102734375.000000 -Observation.DataProducts.Output_Correlated_[14].channelWidth=3051.757812 -Observation.DataProducts.Output_Correlated_[14].channelsPerSubband=64 -Observation.DataProducts.Output_Correlated_[14].duration=0 -Observation.DataProducts.Output_Correlated_[14].fileFormat=AIPS++/CASA -Observation.DataProducts.Output_Correlated_[14].filename=L2000000_SAP000_SB001_uv.MS -Observation.DataProducts.Output_Correlated_[14].integrationInterval=1.006633 -Observation.DataProducts.Output_Correlated_[14].location=CEP4:/data/test-projects/high/L2000000/uv -Observation.DataProducts.Output_Correlated_[14].percentageWritten=0 -Observation.DataProducts.Output_Correlated_[14].size=0 -Observation.DataProducts.Output_Correlated_[14].startTime=2021-01-29 12:39:00 -Observation.DataProducts.Output_Correlated_[14].stationSubband=14 -Observation.DataProducts.Output_Correlated_[14].storageWriter=LOFAR -Observation.DataProducts.Output_Correlated_[14].storageWriterVersion=3 -Observation.DataProducts.Output_Correlated_[14].subband=14 - -Observation.Correlator.channelWidth=3051.7578125 -Observation.Correlator.channelsPerSubband=64 -Observation.Correlator.integrationInterval=1.00663296 -Observation.DataProducts.nrOfOutput_Beamformed_=0 -Observation.DataProducts.nrOfOutput_Correlated_=16 -_isCobalt=T -feedback_version=03.01.00 -''') diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/models/scheduling.py b/SAS/TMSS/backend/src/tmss/tmssapp/models/scheduling.py index dd6aa9e6c53..7372d9e8b4c 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/models/scheduling.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/models/scheduling.py @@ -233,6 +233,11 @@ class Subtask(BasicCommon): '''return the transformed output dataproduct for the given input_dataproduct_id.''' return self.output_dataproducts.get(producers__input_id=input_dataproduct_id) + @property + def is_feedback_complete(self) -> bool: + '''returns True if the feedback for all output dataproducts is filled in for a non-"empty"-template''' + return self.output_dataproducts.filter(feedback_template__isnull=False).exclude(feedback_template__name="empty").count() == self.output_dataproducts.count() + @property def progress(self) -> float: '''Get the progress of this subtask ranging from 0.0 before it is started, up to 1.0 when finished.''' diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/viewsets/scheduling.py b/SAS/TMSS/backend/src/tmss/tmssapp/viewsets/scheduling.py index 2a4046f4881..7e65ff45998 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/viewsets/scheduling.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/viewsets/scheduling.py @@ -290,16 +290,17 @@ class SubtaskViewSet(LOFARViewSet): return RestResponse(serializer.data) - @swagger_auto_schema(responses={200: 'The finished version of this subtask.', + @swagger_auto_schema(responses={200: 'The updated (maybe) finished version of this subtask with the posted feedback appended to the raw_feedback property.', 403: 'forbidden', - 500: 'The feedback of this subtask could not be processed'}, - operation_description="Generate feedback_doc of subtask output dataproducts from the subtask raw_feedback and set subtask state to finished.") - @action(methods=['post'], detail=True, url_path='process_feedback_and_set_finished') - def process_feedback_and_set_finished(self, request, pk=None): - from lofar.sas.tmss.tmss.tmssapp.adapters.feedback import generate_dataproduct_feedback_from_subtask_feedback_and_set_finished + 500: 'The feedback for this subtask could not be processed'}, + operation_description="Process the feedback_doc (which can be for one or more or all dataproducts), store/append it in the subtask's raw_feedback, and process it into json feedback per dataproduct. Sets the subtask to finished if all dataproducts are processed, which may require multiple postings of partial feedback docs.") + @action(methods=['post'], detail=True, url_path='process_feedback_and_set_to_finished_if_complete') + def process_feedback_and_set_to_finished_if_complete(self, request, pk=None): + from lofar.sas.tmss.tmss.tmssapp.adapters.feedback import process_feedback_for_subtask_and_set_to_finished_if_complete subtask = get_object_or_404(models.Subtask, pk=pk) - finished_subtask = generate_dataproduct_feedback_from_subtask_feedback_and_set_finished(subtask) - serializer = self.get_serializer(finished_subtask) + feedback_doc = request.body.decode('utf-8') + updated_subtask = process_feedback_for_subtask_and_set_to_finished_if_complete(subtask, feedback_doc) + serializer = self.get_serializer(updated_subtask) return RestResponse(serializer.data) @swagger_auto_schema(responses={200: 'Get progress of this subtask ranging from 0.0 when just started up to 1.0 when finished.', diff --git a/SAS/TMSS/client/lib/tmss_http_rest_client.py b/SAS/TMSS/client/lib/tmss_http_rest_client.py index 7349c268df1..ddad0eb67f8 100644 --- a/SAS/TMSS/client/lib/tmss_http_rest_client.py +++ b/SAS/TMSS/client/lib/tmss_http_rest_client.py @@ -352,30 +352,11 @@ class TMSSsession(object): else: raise Exception("Could not POST template: " + response.text) - def append_to_subtask_raw_feedback(self, subtask_id: int, feedback: str) -> {}: - '''append the raw_feedback for the given subtask, and return the subtask with its new state, or raise an error''' - subtask = self.get_path_as_json_object('/subtask/%s/' % (subtask_id,)) - existing_feedback = subtask.get('raw_feedback','') - if existing_feedback is None or existing_feedback is "": - new_feedback = feedback - else: - new_feedback = "%s\n%s" % (existing_feedback, feedback) - response = self.session.patch(url=self.get_full_url_for_path('/subtask/%s/' % (subtask_id,)), - json={'raw_feedback': new_feedback}, - params={'format': 'json'}) - - if response.status_code >= 200 and response.status_code < 300: - return json.loads(response.content.decode('utf-8')) - - content = response.content.decode('utf-8') - raise Exception("Could not append feedback to subtask with url %s - %s %s - %s" % ( - response.request.url, response.status_code, responses.get(response.status_code), content)) - - def process_subtask_feedback_and_set_finished(self, subtask_id: int) -> {}: - '''process the raw_feedback of a given subtask and set the subtask to finished on succes. Return the subtask - with its new state, or raise an error''' - response = self.session.post(url=self.get_full_url_for_path('/subtask/%s/process_feedback_and_set_finished' % (subtask_id,)), - params={'format': 'json'}) + def process_feedback_and_set_to_finished_if_complete(self, subtask_id: int, feedback: str) -> {}: + '''Process the feedback_doc (which can be for one or more or all dataproducts), store/append it in the subtask's raw_feedback, and process it into json feedback per dataproduct. Sets the subtask to finished if all dataproducts are processed, which may require multiple postings of partial feedback docs. + Return the updated subtask, or raise an error''' + response = self.session.post(url=self.get_full_url_for_path('/subtask/%s/process_feedback_and_set_to_finished_if_complete' % (subtask_id,)), + data=feedback) if response.status_code >= 200 and response.status_code < 300: return json.loads(response.content.decode('utf-8')) -- GitLab