Skip to content
Snippets Groups Projects
Commit 4df30cc4 authored by Chiara Liotta's avatar Chiara Liotta
Browse files

change the representation of control dependencies

parent bf120a71
No related branches found
No related tags found
No related merge requests found
from pathlib import Path from pathlib import Path
import re import re
from neo4j import Driver from neo4j import Driver
from graph_creation.utils import extract_js_expression_dependencies, process_in_param, process_parameter_source from graph_creation.utils import extract_js_expression_dependencies, get_input_source, process_control_dependencies, process_in_param, process_parameter_source
from neo4j_dependency_queries.create_node_queries import ensure_component_node, ensure_git_node, ensure_in_parameter_node, ensure_out_parameter_node from neo4j_dependency_queries.create_node_queries import ensure_component_node, ensure_git_node, ensure_in_parameter_node, ensure_out_parameter_node
from neo4j_dependency_queries.create_edge_queries import create_control_relationship, create_data_relationship, create_out_param_relationship, create_references_relationship from neo4j_dependency_queries.create_edge_queries import create_control_relationship, create_data_relationship, create_out_param_relationship, create_references_relationship
...@@ -111,7 +111,8 @@ def process_cwl_steps(driver: Driver, cwl_entity: dict, tool_paths: list[str], s ...@@ -111,7 +111,8 @@ def process_cwl_steps(driver: Driver, cwl_entity: dict, tool_paths: list[str], s
Returns: Returns:
None None
""" """
component_id = cwl_entity['path'] workflow_id = cwl_entity['path']
control_relationships = dict()
for step in cwl_entity['steps']: for step in cwl_entity['steps']:
...@@ -139,29 +140,25 @@ def process_cwl_steps(driver: Driver, cwl_entity: dict, tool_paths: list[str], s ...@@ -139,29 +140,25 @@ def process_cwl_steps(driver: Driver, cwl_entity: dict, tool_paths: list[str], s
if 'source' in input: if 'source' in input:
if isinstance(input['source'], str): if isinstance(input['source'], str):
source_id = input['source'] source_id = input['source']
process_parameter_source(driver, param_node_internal_id, source_id, component_id, step_lookup) process_parameter_source(driver, param_node_internal_id, source_id, workflow_id, step_lookup)
elif isinstance(input['source'], list): elif isinstance(input['source'], list):
for source_id in input['source']: for source_id in input['source']:
process_parameter_source(driver, param_node_internal_id, source_id, component_id, step_lookup) process_parameter_source(driver, param_node_internal_id, source_id, workflow_id, step_lookup)
# Process the "when" field, aka control dependencies # Process the "when" field, aka control dependencies
if 'when' in step: if 'when' in step:
when_expr = step['when'] when_expr = step['when']
when_refs = extract_js_expression_dependencies(when_expr) when_refs = extract_js_expression_dependencies(when_expr)
source = None
nodes = []
for ref in when_refs: for ref in when_refs:
print(ref)
ref_id = ref[1]
if ref[0] == "parameter": if ref[0] == "parameter":
input_data = ensure_in_parameter_node(driver, ref_id, step_path)[0] source = get_input_source(step['in'], ref[1])
nodes.append(input_data) else:
# elif ref[0] == "step_output": source = ref[1]
# step_output = ensure_out_parameter_node(driver, ref_id, cwl_entity['path'])[0] if source:
# nodes.append(step_output) process_control_dependencies(driver, source, workflow_id, step_path, step_lookup)
for node in nodes: control_relationships[step_path] = ()
create_control_relationship(driver, s_node_internal_id, node, cwl_entity['path'])
# Process the list of outputs of the step # Process the list of outputs of the step
for output in step['out']: for output in step['out']:
......
...@@ -3,7 +3,8 @@ from neo4j import Driver ...@@ -3,7 +3,8 @@ from neo4j import Driver
import re import re
from graph_creation.cwl_parsing import get_cwl_from_repo from graph_creation.cwl_parsing import get_cwl_from_repo
from neo4j_dependency_queries.create_node_queries import ensure_in_parameter_node, ensure_out_parameter_node from neo4j_dependency_queries.create_node_queries import ensure_in_parameter_node, ensure_out_parameter_node
from neo4j_dependency_queries.create_edge_queries import create_data_relationship, create_in_param_relationship from neo4j_dependency_queries.create_edge_queries import create_control_relationship, create_data_relationship, create_in_param_relationship
from neo4j_dependency_queries.processing_queries import get_all_in_parameter_nodes_of_entity
GITLAB_ASTRON ='https://git.astron.nl' GITLAB_ASTRON ='https://git.astron.nl'
...@@ -48,7 +49,7 @@ def process_in_param(driver: Driver, param_id: str, component_id: str, param_typ ...@@ -48,7 +49,7 @@ def process_in_param(driver: Driver, param_id: str, component_id: str, param_typ
if entity_type == "Workflow": if entity_type == "Workflow":
create_in_param_relationship(driver, component_id, param_node[0]) create_in_param_relationship(driver, component_id, param_node[0])
def process_parameter_source(driver: Driver, param_node_internal_id: int, source_id: str, component_id: str, step_lookup: dict) -> None: def process_parameter_source(driver: Driver, param_node_internal_id: int, source_id: str, workflow_id: str, step_lookup: dict) -> None:
""" """
Processes a parameter source by creating a data relationship between a parameter node and its source. Processes a parameter source by creating a data relationship between a parameter node and its source.
...@@ -57,25 +58,71 @@ def process_parameter_source(driver: Driver, param_node_internal_id: int, source ...@@ -57,25 +58,71 @@ def process_parameter_source(driver: Driver, param_node_internal_id: int, source
param_node_internal_id (int): The internal ID of the parameter node to which the relationship is being created param_node_internal_id (int): The internal ID of the parameter node to which the relationship is being created
source_id (str): The source identifier, which can be a single identifier (in case the source is an in-param of the workflow) source_id (str): The source identifier, which can be a single identifier (in case the source is an in-param of the workflow)
or include a subcomponent (e.g., "source" or "sub_component/source") or include a subcomponent (e.g., "source" or "sub_component/source")
component_id (str): The ID of the component owning the parameter node workflow_id (str): The ID of workflow to which the data relationships belong
step_lookup (dict): A mapping of subcomponent identifiers to their respective IDs within the workflow that calls them step_lookup (dict): A mapping of subcomponent identifiers to their respective unique paths within the workflow
Returns: Returns:
None None
""" """
source_param_node = get_source_node(driver, source_id, workflow_id, step_lookup)
# Create a relationship between the parameter node and its source
create_data_relationship(driver, param_node_internal_id, source_param_node, workflow_id, source_id)
def get_source_node(driver: Driver, source_id: str, workflow_id: str, step_lookup: dict) -> None:
"""
Retrieves the node corresponding to the given source identifier.
Parameters:
driver (Driver): The Neo4j driver used to execute queries.
source_id (str): The source identifier, which can be a single identifier (if the source is an in-param of the workflow component)
or include a subcomponent (e.g., "source" or "sub_component/source").
The second option can only be the case if the component is a workflow
workflow_id (str): The ID of the workflow owning the data source.
step_lookup (dict): A mapping of subcomponent identifiers to their respective unique paths within the workflow that is being analyzed.
Returns:
The internal ID of the source parameter node.
"""
# Parse the source_id to identify whether it refers to a workflow parameter or an output of a subcomponent (subcomponent/id) # Parse the source_id to identify whether it refers to a workflow parameter or an output of a subcomponent (subcomponent/id)
source_parsed = source_id.split("/") source_parsed = source_id.split("/")
source_param_node = None
if len(source_parsed) == 1: if len(source_parsed) == 1:
# Ensure the source exists in the parameter node and retrieve it # Ensure the source exists in the parameter node and retrieve it
source_param_node = ensure_in_parameter_node(driver, source_parsed[0], component_id)[0] source_param_node = ensure_in_parameter_node(driver, source_parsed[0], workflow_id)[0]
else: else:
# If source_id refers to an output subcomponent/sourc # If source_id refers to an output subcomponent/source
# Retrieve the subcomponent ID from the step_lookup dictionary # Retrieve the subcomponent ID from the step_lookup dictionary
sub_component_id = step_lookup[source_parsed[0]] sub_component_id = step_lookup[source_parsed[0]]
# Ensure the source exists in the output parameter node for the subcomponent # Ensure the source exists in the output parameter node for the subcomponent
source_param_node = ensure_out_parameter_node(driver, source_parsed[1], sub_component_id)[0] source_param_node = ensure_out_parameter_node(driver, source_parsed[1], sub_component_id)[0]
# Create a relationship between the parameter node and its source return source_param_node
create_data_relationship(driver, param_node_internal_id, source_param_node, component_id, source_id)
def process_control_dependencies(driver: Driver, source_id: str, workflow_id: str, component_id: str, step_lookup: dict):
"""
Processes control dependencies by creating control relationships between the given source and
all in-parameters of the specified component.
Parameters:
driver (Driver): The Neo4j driver used to execute queries.
source_id (str): The source identifier, which can be a single identifier or a subcomponent reference.
workflow_id (str): The ID of the workflow to which the dependencies belong.
component_id (str): The ID of the component owning the in-parameters.
step_lookup (dict): A mapping of subcomponent identifiers to their respective unique paths within the workflow.
Returns:
None
"""
source_param_node = get_source_node(driver, source_id, workflow_id, step_lookup)
with driver.session() as session:
in_parameters = get_all_in_parameter_nodes_of_entity(session, component_id)
node_ids = [record["nodeId"] for record in in_parameters]
print(node_ids)
for node_id in node_ids:
create_control_relationship(driver, node_id, source_param_node, workflow_id, source_id)
def resolve_relative_path(path: Path)-> Path: def resolve_relative_path(path: Path)-> Path:
""" """
...@@ -127,3 +174,9 @@ def extract_js_expression_dependencies(js_expression: str) -> list[tuple[str, st ...@@ -127,3 +174,9 @@ def extract_js_expression_dependencies(js_expression: str) -> list[tuple[str, st
ref_list.extend([("step_output", f"{step}/{output}") for step, output in step_output_matches]) ref_list.extend([("step_output", f"{step}/{output}") for step, output in step_output_matches])
return ref_list return ref_list
def get_input_source(inputs: list[dict], input_id: dict):
for inp in inputs:
if inp["id"] == input_id:
return inp.get("source") # returns None if 'source' doesn't exist
return None # returns None if input_id is not found
...@@ -104,7 +104,7 @@ def create_data_relationship(driver: Driver, from_internal_node_id: int, to_inte ...@@ -104,7 +104,7 @@ def create_data_relationship(driver: Driver, from_internal_node_id: int, to_inte
return record["id_1"], record["id_2"] return record["id_1"], record["id_2"]
def create_control_relationship(driver: Driver, from_internal_node_id: int, to_internal_node_id: int, component_id: str) -> tuple[int,int]: def create_control_relationship(driver: Driver, from_internal_node_id: int, to_internal_node_id: int, component_id: str, data_id: str) -> tuple[int,int]:
""" """
Creates a control dependency relationship in Neo4j between the two nodes with Neo4j internal IDs given as parameters. Creates a control dependency relationship in Neo4j between the two nodes with Neo4j internal IDs given as parameters.
This relationship is an outgoing control edge from the node with internal ID from_internal_node_id This relationship is an outgoing control edge from the node with internal ID from_internal_node_id
...@@ -118,15 +118,22 @@ def create_control_relationship(driver: Driver, from_internal_node_id: int, to_i ...@@ -118,15 +118,22 @@ def create_control_relationship(driver: Driver, from_internal_node_id: int, to_i
Returns: Returns:
tuple[int,int]: from_internal_node_id, to_internal_node_id tuple[int,int]: from_internal_node_id, to_internal_node_id
""" """
clean_id = clean_component_id(component_id)
query = """ query = """
MATCH (a), (b) MATCH (a), (b)
WHERE elementId(a) = $from_internal_node_id AND elementId(b) = $to_internal_node_id WHERE elementId(a) = $from_internal_node_id AND elementId(b) = $to_internal_node_id
MERGE (a)-[:CONTROL {component_id: $component_id}]->(b) MERGE (a)-[r:CONTROL {component_id: $component_id}]->(b)
SET r.data_ids =
CASE
WHEN r.data_ids IS NULL THEN [$data_id]
WHEN NOT $data_id IN r.data_ids THEN r.data_ids + [$data_id]
ELSE r.data_ids
END
RETURN elementId(a) AS id_1, elementId(b) AS id_2 RETURN elementId(a) AS id_1, elementId(b) AS id_2
""" """
with driver.session() as session: with driver.session() as session:
result = session.run(query, from_internal_node_id=from_internal_node_id, result = session.run(query, from_internal_node_id=from_internal_node_id,
to_internal_node_id=to_internal_node_id, component_id=component_id) to_internal_node_id=to_internal_node_id, component_id=clean_id, data_id=data_id)
record = result.single() record = result.single()
return record["id_1"], record["id_2"] return record["id_1"], record["id_2"]
......
...@@ -36,7 +36,7 @@ def ensure_git_node(driver: Driver, git_url: str) -> tuple[int,str]: ...@@ -36,7 +36,7 @@ def ensure_git_node(driver: Driver, git_url: str) -> tuple[int,str]:
record = result.single() record = result.single()
return record["node_internal_id"], record["git_url"] return record["node_internal_id"], record["git_url"]
def ensure_in_parameter_node(driver: Driver, node_id: str, prefixed_component_id: str, param_type: str = None, entity_type: str = None) \ def ensure_in_parameter_node(driver: Driver, parameter_id: str, prefixed_component_id: str, param_type: str = None, entity_type: str = None) \
-> tuple[int,str,str,str]: -> tuple[int,str,str,str]:
""" """
Ensures that there exists an in-parameter node with ID node_id Ensures that there exists an in-parameter node with ID node_id
...@@ -46,7 +46,7 @@ def ensure_in_parameter_node(driver: Driver, node_id: str, prefixed_component_id ...@@ -46,7 +46,7 @@ def ensure_in_parameter_node(driver: Driver, node_id: str, prefixed_component_id
Parameters: Parameters:
driver (Driver): the Neo4j driver driver (Driver): the Neo4j driver
node_id (str): the ID of the parameter parameter_id (str): the ID of the parameter
prefixed_component_id (str): the local relative path of the component prefixed_component_id (str): the local relative path of the component
type (str): the parameter type type (str): the parameter type
...@@ -55,14 +55,14 @@ def ensure_in_parameter_node(driver: Driver, node_id: str, prefixed_component_id ...@@ -55,14 +55,14 @@ def ensure_in_parameter_node(driver: Driver, node_id: str, prefixed_component_id
""" """
component_id = clean_component_id(prefixed_component_id) component_id = clean_component_id(prefixed_component_id)
query_type = """ query_type = """
MERGE (n:InParameter {parameter_id: $node_id, component_id: $component_id}) MERGE (n:InParameter {parameter_id: $parameter_id, component_id: $component_id})
SET n.type = $type SET n.type = $type
SET n.entity_type = $entity_type SET n.entity_type = $entity_type
RETURN elementId(n) AS node_internal_id, n.parameter_id AS id_property, n.component_id AS component_id_property RETURN elementId(n) AS node_id, n.parameter_id AS parameter_id, n.component_id AS component_id
""" """
query_check = """ query_check = """
MERGE (n:InParameter {parameter_id: $node_id, component_id: $component_id}) MERGE (n:InParameter {parameter_id: $parameter_id, component_id: $component_id})
RETURN elementId(n) AS node_internal_id, n.parameter_id AS id_property, n.component_id AS component_id_property RETURN elementId(n) AS node_id, n.parameter_id AS parameter_id, n.component_id AS component_id
""" """
with driver.session() as session: with driver.session() as session:
if param_type and entity_type: if param_type and entity_type:
...@@ -75,11 +75,11 @@ def ensure_in_parameter_node(driver: Driver, node_id: str, prefixed_component_id ...@@ -75,11 +75,11 @@ def ensure_in_parameter_node(driver: Driver, node_id: str, prefixed_component_id
elif isinstance(param_type, list): elif isinstance(param_type, list):
new_list = [str(type) for type in param_type] new_list = [str(type) for type in param_type]
new_param_type = " OR ".join(new_list) new_param_type = " OR ".join(new_list)
result = session.run(query, node_id=node_id, component_id=component_id, type=new_param_type, entity_type=entity_type) result = session.run(query, parameter_id=parameter_id, component_id=component_id, type=new_param_type, entity_type=entity_type)
record = result.single() record = result.single()
return record["node_internal_id"], record["id_property"], record["component_id_property"] return record["node_id"], record["parameter_id"], record["component_id"]
def ensure_out_parameter_node(driver: Driver, node_id: str, prefixed_component_id: str, param_type: str = None, entity_type: str = None) \ def ensure_out_parameter_node(driver: Driver, parameter_id: str, prefixed_component_id: str, param_type: str = None, entity_type: str = None) \
-> tuple[int,str,str,str]: -> tuple[int,str,str,str]:
""" """
Ensures that there exists an out-parameter node with ID node_id Ensures that there exists an out-parameter node with ID node_id
...@@ -89,7 +89,7 @@ def ensure_out_parameter_node(driver: Driver, node_id: str, prefixed_component_i ...@@ -89,7 +89,7 @@ def ensure_out_parameter_node(driver: Driver, node_id: str, prefixed_component_i
Parameters: Parameters:
driver (Driver): the Neo4j driver driver (Driver): the Neo4j driver
node_id (str): the ID of the parameter parameter_id (str): the ID of the parameter
prefixed_component_id (str): the local relative path of the component prefixed_component_id (str): the local relative path of the component
type (str): the parameter type type (str): the parameter type
...@@ -98,14 +98,14 @@ def ensure_out_parameter_node(driver: Driver, node_id: str, prefixed_component_i ...@@ -98,14 +98,14 @@ def ensure_out_parameter_node(driver: Driver, node_id: str, prefixed_component_i
""" """
component_id = clean_component_id(prefixed_component_id) component_id = clean_component_id(prefixed_component_id)
query_type = """ query_type = """
MERGE (n:OutParameter {parameter_id: $node_id, component_id: $component_id}) MERGE (n:OutParameter {parameter_id: $parameter_id, component_id: $component_id})
SET n.type = $type SET n.type = $type
SET n.entity_type = $entity_type SET n.entity_type = $entity_type
RETURN elementId(n) AS node_internal_id, n.parameter_id AS id_property, n.component_id AS component_id_property RETURN elementId(n) AS node_id, n.parameter_id AS parameter_id, n.component_id AS component_id
""" """
query_check = """ query_check = """
MERGE (n:OutParameter {parameter_id: $node_id, component_id: $component_id}) MERGE (n:OutParameter {parameter_id: $parameter_id, component_id: $component_id})
RETURN elementId(n) AS node_internal_id, n.parameter_id AS id_property, n.component_id AS component_id_property RETURN elementId(n) AS node_id, n.parameter_id AS parameter_id, n.component_id AS component_id
""" """
...@@ -121,9 +121,9 @@ def ensure_out_parameter_node(driver: Driver, node_id: str, prefixed_component_i ...@@ -121,9 +121,9 @@ def ensure_out_parameter_node(driver: Driver, node_id: str, prefixed_component_i
elif isinstance(param_type, list): elif isinstance(param_type, list):
new_list = [str(type) for type in param_type] new_list = [str(type) for type in param_type]
new_param_type = " OR ".join(new_list) new_param_type = " OR ".join(new_list)
result = session.run(query, node_id=node_id, component_id=component_id, type=new_param_type, entity_type=entity_type) result = session.run(query, parameter_id=parameter_id, component_id=component_id, type=new_param_type, entity_type=entity_type)
record = result.single() record = result.single()
return record["node_internal_id"], record["id_property"], record["component_id_property"] return record["node_id"], record["parameter_id"], record["component_id"]
def ensure_data_node(driver: Driver, node_id: str, prefixed_component_id: str) -> tuple[int,str,str]: def ensure_data_node(driver: Driver, node_id: str, prefixed_component_id: str) -> tuple[int,str,str]:
""" """
......
from neo4j import Session from neo4j import Session
from neo4j_dependency_queries.utils import clean_component_id
def get_node_details(session: Session, node_id): def get_node_details(session: Session, node_id):
query = """ query = """
...@@ -113,6 +115,15 @@ def get_all_out_parameter_nodes_of_entity(session: Session, component_id: str): ...@@ -113,6 +115,15 @@ def get_all_out_parameter_nodes_of_entity(session: Session, component_id: str):
result = session.run(query, component_id=component_id) result = session.run(query, component_id=component_id)
return result return result
def get_all_in_parameter_nodes_of_entity(session: Session, component_id: str):
clean_id = clean_component_id(component_id)
query = """
MATCH (n:InParameter {component_id: $component_id})
RETURN elementId(n) AS nodeId, n.entity_type AS entityType
"""
result = session.run(query, component_id=clean_id)
return result
def get_all_outer_out_parameter_nodes(session: Session): def get_all_outer_out_parameter_nodes(session: Session):
query = """ query = """
MATCH (n:OutParameter) MATCH (n:OutParameter)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment