Skip to content
Snippets Groups Projects
Commit 9dd72b9f authored by Jan David Mol's avatar Jan David Mol
Browse files

TMSS-920: Specify parallellism in nr of tasks for backwards compatibility, and...

TMSS-920: Specify parallellism in nr of tasks for backwards compatibility, and group cluster & partition explicitly together.
parent ab8ab89d
No related branches found
No related tags found
2 merge requests!634WIP: COBALT commissioning delta,!587TMSS-920: Add cluster_resource settings for task & subtask
......@@ -435,11 +435,9 @@ def _common_parset_dict_for_pipeline_schemas(subtask: models.Subtask) -> dict:
cluster_resources = spec['cluster_resources']
cluster_name, partition_name = cluster_resources['cluster_partition'].split()
parset["Observation.Cluster.ProcessingCluster.clusterName"] = subtask.cluster.name # is equal to cluster_name
parset["Observation.Cluster.ProcessingCluster.clusterPartition"] = partition_name
parset["Observation.Cluster.ProcessingCluster.numberOfTasks"] = cluster_resources['reservation_size'] / cluster_resources['cores_per_task']
parset["Observation.Cluster.ProcessingCluster.clusterName"] = subtask.cluster.name # is equal to cluster_resources['where']['cluster']
parset["Observation.Cluster.ProcessingCluster.clusterPartition"] = cluster_resources['where']['partition']
parset["Observation.Cluster.ProcessingCluster.numberOfTasks"] = cluster_resources['parallel_tasks']
parset["Observation.Cluster.ProcessingCluster.numberOfCoresPerTask"] = cluster_resources['cores_per_task']
return parset
......
......@@ -21,16 +21,31 @@
"description": "Which cluster resources to claim for this pipeline.",
"additionalProperties": false,
"properties": {
"cluster_partition": {
"type": "string",
"title": "Cluster & Partition",
"description": "Where to run this pipeline (cluster and partition)",
"default": "CEP4 cpu",
"enum": [
"CEP4 cpu",
"CEP4 gpu",
"CEP4 testing"
]
"where": {
"type": "object":
"title": "Where",
"description": "Where to run this pipeline.",
"additionalProperties": false,
"properties": {
"cluster": {
"type": "string",
"title": "Cluster",
"default": "CEP4",
"enum": [
"CEP4"
]
},
"partition": {
"type": "string",
"title": "Partition",
"default": "cpu",
"enum": [
"cpu",
"gpu",
"testing"
]
}
}
},
"cores_per_task": {
"type": "integer",
......@@ -40,13 +55,13 @@
"minimum": 1,
"maximum": 24
},
"reservation_size": {
"parallel_tasks": {
"type": "integer",
"title": "Reservation size (cores)",
"description": "Number of cores to use in parallel (in cores, not tasks!). Determines the size of the reservation. A smaller reservation results in a longer pipeline run. A bigger reservation results in more in resource waste.",
"default": 244,
"title": "Max parallel tasks",
"description": "Maximum number of tasks to run in parallel. Determines the size of the reservation. A smaller reservation results in a longer pipeline run. A bigger reservation results in more in resource waste.",
"default": 122,
"minimum": 1,
"maximum": 980
"maximum": 488
}
}
}
......
......@@ -612,7 +612,7 @@ def create_observation_control_subtask_from_task_blueprint(task_blueprint: TaskB
raise SubtaskCreationException("Total number of subbands %d exceeds the maximum of 488 for task_blueprint id=%s" % (len(all_subbands), task_blueprint.id))
# step 1: create subtask in defining state
cluster_name, partition_name = task_blueprint.specifications_doc['cluster_resources']['cluster'].split()
cluster_name = task_blueprint.specifications_doc['cluster_resources']['where']['cluster']
subtask_data = { "scheduled_on_sky_start_time": None,
"scheduled_on_sky_stop_time": None,
"state": SubtaskState.objects.get(value=SubtaskState.Choices.DEFINING.value),
......@@ -800,7 +800,7 @@ def create_pipeline_subtask_from_task_blueprint(task_blueprint: TaskBlueprint, s
task_specs_with_defaults = task_blueprint.specifications_template.add_defaults_to_json_object_for_schema(task_blueprint.specifications_doc)
subtask_specs = generate_subtask_specs_from_task_spec_func(task_specs_with_defaults, default_subtask_specs)
cluster_name, partition_name = task_blueprint.specifications_doc['cluster_resources']['partition'].split()
cluster_name = task_blueprint.specifications_doc['cluster_resources']['where']['cluster']
subtask_data = { "scheduled_on_sky_start_time": None,
"scheduled_on_sky_stop_time": None,
"state": SubtaskState.objects.get(value=SubtaskState.Choices.DEFINING.value),
......@@ -899,9 +899,9 @@ def create_cleanup_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) ->
subtask_specs = subtask_template.get_default_json_document_for_schema()
try:
cluster_name, partition_name = task_blueprint.specifications_doc['cluster_resources']['cluster'].split()
cluster_name = task_blueprint.specifications_doc['cluster_resources']['where']['cluster']
except KeyError:
cluster_name, partition_name = "CEP4", "cpu"
cluster_name = "CEP4"
subtask_data = {"scheduled_on_sky_start_time": None,
"scheduled_on_sky_stop_time": None,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment