diff --git a/SAS/TMSS/src/tmss/tmssapp/models/scheduling.py b/SAS/TMSS/src/tmss/tmssapp/models/scheduling.py index 895df381f28df2acd54c20e66af73a4e5090c17e..70171487824102080df61cbc910582a36e058ad9 100644 --- a/SAS/TMSS/src/tmss/tmssapp/models/scheduling.py +++ b/SAS/TMSS/src/tmss/tmssapp/models/scheduling.py @@ -213,7 +213,7 @@ class SubtaskStateLog(BasicCommon): class SubtaskInput(BasicCommon): - subtask = ForeignKey('Subtask', null=False, on_delete=CASCADE, help_text='Subtask to which this input specification refers.') + subtask = ForeignKey('Subtask', null=False, on_delete=CASCADE, related_name='inputs', help_text='Subtask to which this input specification refers.') task_relation_blueprint = ForeignKey('TaskRelationBlueprint', null=True, on_delete=SET_NULL, help_text='Task Relation Blueprint which this Subtask Input implements (NULLable).') connector = ForeignKey('SubtaskConnector', null=True, on_delete=SET_NULL, help_text='Which connector this Task Input implements.') producer = ForeignKey('SubtaskOutput', on_delete=PROTECT, help_text='The Subtask Output providing the input dataproducts.') @@ -229,7 +229,7 @@ class SubtaskInput(BasicCommon): class SubtaskOutput(BasicCommon): - subtask = ForeignKey('Subtask', null=False, on_delete=CASCADE, help_text='Subtask to which this output specification refers.') + subtask = ForeignKey('Subtask', null=False, on_delete=CASCADE, related_name='outputs', help_text='Subtask to which this output specification refers.') connector = ForeignKey('SubtaskConnector', null=True, on_delete=SET_NULL, help_text='Which connector this Subtask Output implements.') @@ -247,7 +247,7 @@ class Dataproduct(BasicCommon): pinned_since = DateTimeField(null=True, help_text='When this dataproduct was pinned to disk, that is, forbidden to be removed, or NULL if not pinned (NULLable).') specifications_doc = JSONField(help_text='Dataproduct properties (f.e. beam, subband), to distinguish them when produced by the same task, and to act as input for selections in the Task Input and Work Request Relation Blueprint objects.') specifications_template = ForeignKey('DataproductSpecificationsTemplate', null=False, on_delete=CASCADE, help_text='Schema used for specifications_doc.') - producer = ForeignKey('SubtaskOutput', on_delete=PROTECT, help_text='Subtask Output which generates this dataproduct.') + producer = ForeignKey('SubtaskOutput', on_delete=PROTECT, related_name='dataproducts', help_text='Subtask Output which generates this dataproduct.') do_cancel = DateTimeField(null=True, help_text='When this dataproduct was cancelled (NULLable). Cancelling a dataproduct triggers cleanup if necessary.') expected_size = BigIntegerField(null=True, help_text='Expected size of dataproduct size, in bytes. Used for scheduling purposes. NULL if size is unknown (NULLable).') size = BigIntegerField(null=True, help_text='Dataproduct size, in bytes. Used for accounting purposes. NULL if size is (yet) unknown (NULLable).') diff --git a/SAS/TMSS/src/tmss/tmssapp/populate.py b/SAS/TMSS/src/tmss/tmssapp/populate.py index f7495808e34296f3c2a1b66c37f6e1655c0d750a..0dd510d7a4e44986904ec2195ab971afd8095498 100644 --- a/SAS/TMSS/src/tmss/tmssapp/populate.py +++ b/SAS/TMSS/src/tmss/tmssapp/populate.py @@ -20,7 +20,7 @@ from lofar.sas.tmss.tmss.tmssapp.models.specification import Role, Datatype, Dat from lofar.sas.tmss.tmss.tmssapp.models.scheduling import SubtaskState, SubtaskType, SubtaskTemplate, Subtask, \ StationType, Algorithm, ScheduleMethod, Cluster, Filesystem from lofar.common.json_utils import * - +from lofar.sas.tmss.tmss.tmssapp.subtasks import connect_observation_subtask_to_preprocessing_subtask def populate_choices(apps, schema_editor): ''' @@ -228,7 +228,6 @@ def _populate_example_data(): filename="L%d_SB%03d_uv.MS" % ( subtask.id, sb_nr))) - for i in range(10): task_template = models.TaskTemplate.objects.get(name='preprocessing schema') task_draft_data = TaskDraft_test_data(name="my test pipeline", specifications_template=task_template) task_draft = models.TaskDraft.objects.create(**task_draft_data) @@ -237,21 +236,13 @@ def _populate_example_data(): task_blueprint = models.TaskBlueprint.objects.create(**task_blueprint_data) subtask_template = models.SubtaskTemplate.objects.get(name='pipelinecontrol schema') - specifications_doc = { - "storagemanager": "dysco" - } - + specifications_doc = {} specifications_doc = add_defaults_to_json_object_for_schema(specifications_doc, subtask_template.schema) subtask_data = Subtask_test_data(task_blueprint=task_blueprint, subtask_template=subtask_template, specifications_doc=specifications_doc, cluster=cluster) - subtask = models.Subtask.objects.create(**subtask_data) + pipe_subtask = models.Subtask.objects.create(**subtask_data) - subtask_output = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=subtask)) - # todo: create some dataproducts. (What goes in/out here?) - # for sb_nr in specifications_doc['stations']['digital_pointings'][0]['subbands']: - # models.Dataproduct.objects.create(**Dataproduct_test_data(producer=subtask_output, - # directory="CEP4:/data/test-projects/TMSS_test/L%d/uv/" % (subtask.id,), - # filename="L%d_SB%03d_uv.MS"%(subtask.id, sb_nr))) + connect_observation_subtask_to_preprocessing_subtask(subtask, pipe_subtask) except ImportError: pass diff --git a/SAS/TMSS/src/tmss/tmssapp/subtasks.py b/SAS/TMSS/src/tmss/tmssapp/subtasks.py new file mode 100644 index 0000000000000000000000000000000000000000..6887ae9bda538f745e189eedd9a52c9ef33f6a32 --- /dev/null +++ b/SAS/TMSS/src/tmss/tmssapp/subtasks.py @@ -0,0 +1,58 @@ +from datetime import datetime, timedelta +from lofar.common.datetimeutils import parseDatetime +from lofar.sas.tmss.tmss.tmssapp.models.specification import Dataformat, Role, Datatype +from lofar.sas.tmss.tmss.tmssapp.models.scheduling import Subtask, SubtaskType, SubtaskState, ScheduleMethod,\ + SubtaskTemplate, SubtaskInput, SubtaskOutput, SubtaskConnector, SubtaskInputSelectionTemplate, \ + Dataproduct, DataproductSpecificationsTemplate, DataproductFeedbackTemplate, DataproductTransform + + +def connect_observation_subtask_to_preprocessing_subtask(observation_subtask: Subtask, pipeline_subtask: Subtask): + if observation_subtask.specifications_template.type.value != SubtaskType.Choices.OBSERVATION.value or \ + pipeline_subtask.specifications_template.type.value != SubtaskType.Choices.PIPELINE.value: + raise ValueError("Cannot connect subtask %s type=%s to subtask id=%d type=%s. Expecting types %s and %s." % ( + observation_subtask.pk, observation_subtask.specifications_template.type, + pipeline_subtask.pk, pipeline_subtask.specifications_template.type, + SubtaskType.Choices.OBSERVATION.value, SubtaskType.Choices.PIPELINE.value)) + + print("Connecting subtask %s type=%s to subtask id=%d type=%s") + + if observation_subtask.stop_time and isinstance(observation_subtask.stop_time, datetime): + pipeline_subtask.start_time = max(datetime.utcnow(), observation_subtask.stop_time) + + # todo: use existing and reasonable selection and specification templates when we have those, for now, make sure there is one... + SubtaskInputSelectionTemplate.objects.create( + name="test", + description='test subtask input selection template', + schema={}, + tags=["TMSS", "TESTING"]) + DataproductSpecificationsTemplate.objects.create( + name="test", + description='test dataproduct specifications template', + schema={}, + tags=["TMSS", "TESTING"]) + + # use observation output as pipeline input + pipeline_subtask_input = SubtaskInput.objects.create(subtask=pipeline_subtask, + producer=observation_subtask.outputs.first(), + selection_doc={}, + selection_template=SubtaskInputSelectionTemplate.objects.first()) + pipeline_subtask_input.dataproducts.set(observation_subtask.outputs.first().dataproducts.all()) + + # specify pipeline output (map input dataproducts 1:1, but with pipeline subtask ID) + pipeline_subtask_output = SubtaskOutput.objects.create(subtask=pipeline_subtask) + output_dps = [] + for input_dp in pipeline_subtask_input.dataproducts.all(): + output_dp = Dataproduct.objects.create(filename="L%s_%s" % (pipeline_subtask.pk, input_dp.filename.split('_', 1)[1]), + directory=input_dp.directory, + dataformat=Dataformat.objects.get(value="MeasurementSet"), + producer=pipeline_subtask_output, + specifications_doc={}, + specifications_template=DataproductSpecificationsTemplate.objects.first(), + feedback_doc="", + feedback_template=DataproductFeedbackTemplate.objects.first() + ) + DataproductTransform.objects.create(input=input_dp, output=output_dp, identity=False) + output_dps.append(output_dp) + pipeline_subtask_output.dataproducts.set(output_dps) + + # todo: specify a SubtaskConnector? TaskRelation \ No newline at end of file diff --git a/SAS/TMSS/src/tmss/tmssapp/viewsets/scheduling.py b/SAS/TMSS/src/tmss/tmssapp/viewsets/scheduling.py index edc9ab43d276c032739d4764e663f84d1eb0f264..28762021ff04a0eee20e3e087a2fbbf5df8069e7 100644 --- a/SAS/TMSS/src/tmss/tmssapp/viewsets/scheduling.py +++ b/SAS/TMSS/src/tmss/tmssapp/viewsets/scheduling.py @@ -231,7 +231,7 @@ class SubtaskViewSetJSONeditorOnline(LOFARViewSet): def parset(self, request, pk=None): subtask = get_object_or_404(models.Subtask, pk=pk) parset = convert_to_parset(subtask) - parset_str = "# THIS PARSET WAS GENERATED BY TMSS FROM THE SPECICATION OF SUBTASK ID=%d ON %s url: %s\n%s" % ( + parset_str = "# THIS PARSET WAS GENERATED BY TMSS FROM THE SPECIFICATION OF SUBTASK ID=%d ON %s url: %s\n%s" % ( subtask.pk, formatDatetime(datetime.utcnow()), request._request.get_raw_uri(), diff --git a/SAS/TMSS/src/util.py b/SAS/TMSS/src/util.py index 12f8b3d6ab1e812e2c19fc51722996a5c71379a2..629b1fe7e1a207d64ee1fb7493c3bb2d71236d98 100644 --- a/SAS/TMSS/src/util.py +++ b/SAS/TMSS/src/util.py @@ -5,36 +5,45 @@ logger = logging.getLogger(__file__) # usage example: # -# with TMSSsession('paulus', 'pauluspass') as session: -# response = session.get(url='http://localhost:8008/api/task_draft/') +# with TMSSsession('paulus', 'pauluspass', 'http://localhost') as session: +# response = session.get(url='http://localhost/api/task_draft/') # print(response) + class TMSSsession(object): - def __init__(self, username, password, host): + OPENID = "openid" + BASICAUTH = "basicauth" + + def __init__(self, username, password, host, authentication_method=OPENID): self.session = requests.session() self.username = username self.password = password self.host = host + self.authentication_method = authentication_method def __enter__(self): self.session.__enter__() self.session.verify = False - # get authentication page of OIDC through TMSS redirect - response = self.session.get(self.host + '/oidc/authenticate/', allow_redirects=True) - csrftoken = self.session.cookies['csrftoken'] + if self.authentication_method == self.OPENID: + # get authentication page of OIDC through TMSS redirect + response = self.session.get(self.host + '/oidc/authenticate/', allow_redirects=True) + csrftoken = self.session.cookies['csrftoken'] + + # post user credentials to login page, also pass csrf token + data = {'username': self.username, 'password': self.password, 'csrfmiddlewaretoken': csrftoken} + response = self.session.post(url=response.url, data=data, allow_redirects=True) - # post user credentials to login page, also pass csrf token - data = {'username': self.username, 'password': self.password, 'csrfmiddlewaretoken': csrftoken} - response = self.session.post(url=response.url, data=data, allow_redirects=True) + # raise when sth went wrong + if "The username and/or password you specified are not correct" in response.content.decode('utf8'): + raise ValueError("The username and/or password you specified are not correct") + if response.status_code != 200: + raise ConnectionError(response.content.decode('utf8')) - # raise when sth went wrong - if "The username and/or password you specified are not correct" in response.content.decode('utf8'): - raise ValueError("The username and/or password you specified are not correct") - if response.status_code != 200: - raise ConnectionError(response.content.decode('utf8')) + if self.authentication_method == self.BASICAUTH: + self.session.auth = (self.username, self.password) # return the authenticated session as user context return self.session