From 2d5d55790b3491ab3f11e2856c98263c2a1a5695 Mon Sep 17 00:00:00 2001 From: jkuensem <jkuensem@physik.uni-bielefeld.de> Date: Wed, 13 May 2020 17:44:24 +0200 Subject: [PATCH] TMSS-61: add functionality to connect observation output to preprocessing pipeline and use that populate demo stuff --- .../src/tmss/tmssapp/models/scheduling.py | 6 +- SAS/TMSS/src/tmss/tmssapp/populate.py | 17 ++---- SAS/TMSS/src/tmss/tmssapp/subtasks.py | 58 +++++++++++++++++++ .../src/tmss/tmssapp/viewsets/scheduling.py | 2 +- SAS/TMSS/src/util.py | 37 +++++++----- 5 files changed, 89 insertions(+), 31 deletions(-) create mode 100644 SAS/TMSS/src/tmss/tmssapp/subtasks.py diff --git a/SAS/TMSS/src/tmss/tmssapp/models/scheduling.py b/SAS/TMSS/src/tmss/tmssapp/models/scheduling.py index 895df381f28..70171487824 100644 --- a/SAS/TMSS/src/tmss/tmssapp/models/scheduling.py +++ b/SAS/TMSS/src/tmss/tmssapp/models/scheduling.py @@ -213,7 +213,7 @@ class SubtaskStateLog(BasicCommon): class SubtaskInput(BasicCommon): - subtask = ForeignKey('Subtask', null=False, on_delete=CASCADE, help_text='Subtask to which this input specification refers.') + subtask = ForeignKey('Subtask', null=False, on_delete=CASCADE, related_name='inputs', help_text='Subtask to which this input specification refers.') task_relation_blueprint = ForeignKey('TaskRelationBlueprint', null=True, on_delete=SET_NULL, help_text='Task Relation Blueprint which this Subtask Input implements (NULLable).') connector = ForeignKey('SubtaskConnector', null=True, on_delete=SET_NULL, help_text='Which connector this Task Input implements.') producer = ForeignKey('SubtaskOutput', on_delete=PROTECT, help_text='The Subtask Output providing the input dataproducts.') @@ -229,7 +229,7 @@ class SubtaskInput(BasicCommon): class SubtaskOutput(BasicCommon): - subtask = ForeignKey('Subtask', null=False, on_delete=CASCADE, help_text='Subtask to which this output specification refers.') + subtask = ForeignKey('Subtask', null=False, on_delete=CASCADE, related_name='outputs', help_text='Subtask to which this output specification refers.') connector = ForeignKey('SubtaskConnector', null=True, on_delete=SET_NULL, help_text='Which connector this Subtask Output implements.') @@ -247,7 +247,7 @@ class Dataproduct(BasicCommon): pinned_since = DateTimeField(null=True, help_text='When this dataproduct was pinned to disk, that is, forbidden to be removed, or NULL if not pinned (NULLable).') specifications_doc = JSONField(help_text='Dataproduct properties (f.e. beam, subband), to distinguish them when produced by the same task, and to act as input for selections in the Task Input and Work Request Relation Blueprint objects.') specifications_template = ForeignKey('DataproductSpecificationsTemplate', null=False, on_delete=CASCADE, help_text='Schema used for specifications_doc.') - producer = ForeignKey('SubtaskOutput', on_delete=PROTECT, help_text='Subtask Output which generates this dataproduct.') + producer = ForeignKey('SubtaskOutput', on_delete=PROTECT, related_name='dataproducts', help_text='Subtask Output which generates this dataproduct.') do_cancel = DateTimeField(null=True, help_text='When this dataproduct was cancelled (NULLable). Cancelling a dataproduct triggers cleanup if necessary.') expected_size = BigIntegerField(null=True, help_text='Expected size of dataproduct size, in bytes. Used for scheduling purposes. NULL if size is unknown (NULLable).') size = BigIntegerField(null=True, help_text='Dataproduct size, in bytes. Used for accounting purposes. NULL if size is (yet) unknown (NULLable).') diff --git a/SAS/TMSS/src/tmss/tmssapp/populate.py b/SAS/TMSS/src/tmss/tmssapp/populate.py index f7495808e34..0dd510d7a4e 100644 --- a/SAS/TMSS/src/tmss/tmssapp/populate.py +++ b/SAS/TMSS/src/tmss/tmssapp/populate.py @@ -20,7 +20,7 @@ from lofar.sas.tmss.tmss.tmssapp.models.specification import Role, Datatype, Dat from lofar.sas.tmss.tmss.tmssapp.models.scheduling import SubtaskState, SubtaskType, SubtaskTemplate, Subtask, \ StationType, Algorithm, ScheduleMethod, Cluster, Filesystem from lofar.common.json_utils import * - +from lofar.sas.tmss.tmss.tmssapp.subtasks import connect_observation_subtask_to_preprocessing_subtask def populate_choices(apps, schema_editor): ''' @@ -228,7 +228,6 @@ def _populate_example_data(): filename="L%d_SB%03d_uv.MS" % ( subtask.id, sb_nr))) - for i in range(10): task_template = models.TaskTemplate.objects.get(name='preprocessing schema') task_draft_data = TaskDraft_test_data(name="my test pipeline", specifications_template=task_template) task_draft = models.TaskDraft.objects.create(**task_draft_data) @@ -237,21 +236,13 @@ def _populate_example_data(): task_blueprint = models.TaskBlueprint.objects.create(**task_blueprint_data) subtask_template = models.SubtaskTemplate.objects.get(name='pipelinecontrol schema') - specifications_doc = { - "storagemanager": "dysco" - } - + specifications_doc = {} specifications_doc = add_defaults_to_json_object_for_schema(specifications_doc, subtask_template.schema) subtask_data = Subtask_test_data(task_blueprint=task_blueprint, subtask_template=subtask_template, specifications_doc=specifications_doc, cluster=cluster) - subtask = models.Subtask.objects.create(**subtask_data) + pipe_subtask = models.Subtask.objects.create(**subtask_data) - subtask_output = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=subtask)) - # todo: create some dataproducts. (What goes in/out here?) - # for sb_nr in specifications_doc['stations']['digital_pointings'][0]['subbands']: - # models.Dataproduct.objects.create(**Dataproduct_test_data(producer=subtask_output, - # directory="CEP4:/data/test-projects/TMSS_test/L%d/uv/" % (subtask.id,), - # filename="L%d_SB%03d_uv.MS"%(subtask.id, sb_nr))) + connect_observation_subtask_to_preprocessing_subtask(subtask, pipe_subtask) except ImportError: pass diff --git a/SAS/TMSS/src/tmss/tmssapp/subtasks.py b/SAS/TMSS/src/tmss/tmssapp/subtasks.py new file mode 100644 index 00000000000..6887ae9bda5 --- /dev/null +++ b/SAS/TMSS/src/tmss/tmssapp/subtasks.py @@ -0,0 +1,58 @@ +from datetime import datetime, timedelta +from lofar.common.datetimeutils import parseDatetime +from lofar.sas.tmss.tmss.tmssapp.models.specification import Dataformat, Role, Datatype +from lofar.sas.tmss.tmss.tmssapp.models.scheduling import Subtask, SubtaskType, SubtaskState, ScheduleMethod,\ + SubtaskTemplate, SubtaskInput, SubtaskOutput, SubtaskConnector, SubtaskInputSelectionTemplate, \ + Dataproduct, DataproductSpecificationsTemplate, DataproductFeedbackTemplate, DataproductTransform + + +def connect_observation_subtask_to_preprocessing_subtask(observation_subtask: Subtask, pipeline_subtask: Subtask): + if observation_subtask.specifications_template.type.value != SubtaskType.Choices.OBSERVATION.value or \ + pipeline_subtask.specifications_template.type.value != SubtaskType.Choices.PIPELINE.value: + raise ValueError("Cannot connect subtask %s type=%s to subtask id=%d type=%s. Expecting types %s and %s." % ( + observation_subtask.pk, observation_subtask.specifications_template.type, + pipeline_subtask.pk, pipeline_subtask.specifications_template.type, + SubtaskType.Choices.OBSERVATION.value, SubtaskType.Choices.PIPELINE.value)) + + print("Connecting subtask %s type=%s to subtask id=%d type=%s") + + if observation_subtask.stop_time and isinstance(observation_subtask.stop_time, datetime): + pipeline_subtask.start_time = max(datetime.utcnow(), observation_subtask.stop_time) + + # todo: use existing and reasonable selection and specification templates when we have those, for now, make sure there is one... + SubtaskInputSelectionTemplate.objects.create( + name="test", + description='test subtask input selection template', + schema={}, + tags=["TMSS", "TESTING"]) + DataproductSpecificationsTemplate.objects.create( + name="test", + description='test dataproduct specifications template', + schema={}, + tags=["TMSS", "TESTING"]) + + # use observation output as pipeline input + pipeline_subtask_input = SubtaskInput.objects.create(subtask=pipeline_subtask, + producer=observation_subtask.outputs.first(), + selection_doc={}, + selection_template=SubtaskInputSelectionTemplate.objects.first()) + pipeline_subtask_input.dataproducts.set(observation_subtask.outputs.first().dataproducts.all()) + + # specify pipeline output (map input dataproducts 1:1, but with pipeline subtask ID) + pipeline_subtask_output = SubtaskOutput.objects.create(subtask=pipeline_subtask) + output_dps = [] + for input_dp in pipeline_subtask_input.dataproducts.all(): + output_dp = Dataproduct.objects.create(filename="L%s_%s" % (pipeline_subtask.pk, input_dp.filename.split('_', 1)[1]), + directory=input_dp.directory, + dataformat=Dataformat.objects.get(value="MeasurementSet"), + producer=pipeline_subtask_output, + specifications_doc={}, + specifications_template=DataproductSpecificationsTemplate.objects.first(), + feedback_doc="", + feedback_template=DataproductFeedbackTemplate.objects.first() + ) + DataproductTransform.objects.create(input=input_dp, output=output_dp, identity=False) + output_dps.append(output_dp) + pipeline_subtask_output.dataproducts.set(output_dps) + + # todo: specify a SubtaskConnector? TaskRelation \ No newline at end of file diff --git a/SAS/TMSS/src/tmss/tmssapp/viewsets/scheduling.py b/SAS/TMSS/src/tmss/tmssapp/viewsets/scheduling.py index edc9ab43d27..28762021ff0 100644 --- a/SAS/TMSS/src/tmss/tmssapp/viewsets/scheduling.py +++ b/SAS/TMSS/src/tmss/tmssapp/viewsets/scheduling.py @@ -231,7 +231,7 @@ class SubtaskViewSetJSONeditorOnline(LOFARViewSet): def parset(self, request, pk=None): subtask = get_object_or_404(models.Subtask, pk=pk) parset = convert_to_parset(subtask) - parset_str = "# THIS PARSET WAS GENERATED BY TMSS FROM THE SPECICATION OF SUBTASK ID=%d ON %s url: %s\n%s" % ( + parset_str = "# THIS PARSET WAS GENERATED BY TMSS FROM THE SPECIFICATION OF SUBTASK ID=%d ON %s url: %s\n%s" % ( subtask.pk, formatDatetime(datetime.utcnow()), request._request.get_raw_uri(), diff --git a/SAS/TMSS/src/util.py b/SAS/TMSS/src/util.py index 12f8b3d6ab1..629b1fe7e1a 100644 --- a/SAS/TMSS/src/util.py +++ b/SAS/TMSS/src/util.py @@ -5,36 +5,45 @@ logger = logging.getLogger(__file__) # usage example: # -# with TMSSsession('paulus', 'pauluspass') as session: -# response = session.get(url='http://localhost:8008/api/task_draft/') +# with TMSSsession('paulus', 'pauluspass', 'http://localhost') as session: +# response = session.get(url='http://localhost/api/task_draft/') # print(response) + class TMSSsession(object): - def __init__(self, username, password, host): + OPENID = "openid" + BASICAUTH = "basicauth" + + def __init__(self, username, password, host, authentication_method=OPENID): self.session = requests.session() self.username = username self.password = password self.host = host + self.authentication_method = authentication_method def __enter__(self): self.session.__enter__() self.session.verify = False - # get authentication page of OIDC through TMSS redirect - response = self.session.get(self.host + '/oidc/authenticate/', allow_redirects=True) - csrftoken = self.session.cookies['csrftoken'] + if self.authentication_method == self.OPENID: + # get authentication page of OIDC through TMSS redirect + response = self.session.get(self.host + '/oidc/authenticate/', allow_redirects=True) + csrftoken = self.session.cookies['csrftoken'] + + # post user credentials to login page, also pass csrf token + data = {'username': self.username, 'password': self.password, 'csrfmiddlewaretoken': csrftoken} + response = self.session.post(url=response.url, data=data, allow_redirects=True) - # post user credentials to login page, also pass csrf token - data = {'username': self.username, 'password': self.password, 'csrfmiddlewaretoken': csrftoken} - response = self.session.post(url=response.url, data=data, allow_redirects=True) + # raise when sth went wrong + if "The username and/or password you specified are not correct" in response.content.decode('utf8'): + raise ValueError("The username and/or password you specified are not correct") + if response.status_code != 200: + raise ConnectionError(response.content.decode('utf8')) - # raise when sth went wrong - if "The username and/or password you specified are not correct" in response.content.decode('utf8'): - raise ValueError("The username and/or password you specified are not correct") - if response.status_code != 200: - raise ConnectionError(response.content.decode('utf8')) + if self.authentication_method == self.BASICAUTH: + self.session.auth = (self.username, self.password) # return the authenticated session as user context return self.session -- GitLab