Skip to content
Snippets Groups Projects
Commit 97012a84 authored by Chiara Liotta's avatar Chiara Liotta
Browse files

transform data dependency into data flow

parent 4ef16003
No related branches found
No related tags found
No related merge requests found
...@@ -133,8 +133,7 @@ def process_cwl_steps(driver: Driver, cwl_entity: dict, tool_paths: list[str], s ...@@ -133,8 +133,7 @@ def process_cwl_steps(driver: Driver, cwl_entity: dict, tool_paths: list[str], s
param_node = ensure_in_parameter_node(driver, input['id'], step_path) param_node = ensure_in_parameter_node(driver, input['id'], step_path)
param_node_internal_id = param_node[0] param_node_internal_id = param_node[0]
if is_tool: if is_tool:
# Create a data edge from the step component node to the in-parameter node create_data_relationship(driver, param_node_internal_id, s_node_internal_id, step_path, input['id'])
create_data_relationship(driver, s_node_internal_id, param_node_internal_id, step_path, input['id'])
# Inputs can have one or multiple data sources (data nodes) # Inputs can have one or multiple data sources (data nodes)
if 'source' in input: if 'source' in input:
...@@ -176,7 +175,7 @@ def process_cwl_steps(driver: Driver, cwl_entity: dict, tool_paths: list[str], s ...@@ -176,7 +175,7 @@ def process_cwl_steps(driver: Driver, cwl_entity: dict, tool_paths: list[str], s
param_node_internal_id = param_node[0] param_node_internal_id = param_node[0]
if is_tool: if is_tool:
# Create a data edge from out-parameter node to the step component node # Create a data edge from out-parameter node to the step component node
create_data_relationship(driver, param_node_internal_id, s_node_internal_id, step_path, output_id) create_data_relationship(driver, s_node_internal_id, param_node_internal_id, step_path, output_id)
def process_cwl_base_commands(driver, entity, links): def process_cwl_base_commands(driver, entity, links):
......
...@@ -67,7 +67,7 @@ def process_parameter_source(driver: Driver, param_node_internal_id: int, source ...@@ -67,7 +67,7 @@ def process_parameter_source(driver: Driver, param_node_internal_id: int, source
source_param_node = get_source_node(driver, source_id, workflow_id, step_lookup) source_param_node = get_source_node(driver, source_id, workflow_id, step_lookup)
# Create a relationship between the parameter node and its source # Create a relationship between the parameter node and its source
create_data_relationship(driver, param_node_internal_id, source_param_node, workflow_id, source_id) create_data_relationship(driver, source_param_node, param_node_internal_id,workflow_id, source_id)
def get_source_node(driver: Driver, source_id: str, workflow_id: str, step_lookup: dict) -> None: def get_source_node(driver: Driver, source_id: str, workflow_id: str, step_lookup: dict) -> None:
......
from neo4j import Driver, GraphDatabase, Session from neo4j import Driver, GraphDatabase, Session
from neo4j_dependency_queries.processing_queries import get_all_out_parameter_nodes_of_entity, get_all_outer_out_parameter_nodes, get_all_outgoing_edges, get_node_details, get_nodes_with_control_edges, get_valid_connections, get_workflow_list_of_data_edge, update_workflow_list_of_edge, update_workflow_list_of_node from neo4j_dependency_queries.processing_queries import get_all_in_parameter_nodes_of_entity, get_all_out_parameter_nodes_of_entity, get_all_outer_out_parameter_nodes, get_all_outgoing_edges, get_node_details, get_nodes_with_control_edges, get_valid_connections, get_workflow_list_of_data_edge, update_workflow_list_of_edge, update_workflow_list_of_node
from neo4j_dependency_queries.utils import clean_component_id from neo4j_dependency_queries.utils import clean_component_id
from neo4j_flow_queries.create_queries import create_calculation_component_node, create_direct_flow, create_indirect_flow, create_sequential_indirect_flow from neo4j_flow_queries.create_queries import create_calculation_component_node, create_direct_flow, create_indirect_flow, create_sequential_indirect_flow
...@@ -38,7 +38,7 @@ class DependencyTraversalDFS: ...@@ -38,7 +38,7 @@ class DependencyTraversalDFS:
start_component_id = clean_component_id(component_id) start_component_id = clean_component_id(component_id)
# Find all starting nodes with the given component_id # Find all starting nodes with the given component_id
result = get_all_out_parameter_nodes_of_entity(session, start_component_id) result = get_all_in_parameter_nodes_of_entity(session, start_component_id)
start_nodes = [record["nodeId"] for record in result] start_nodes = [record["nodeId"] for record in result]
for node_id in start_nodes: for node_id in start_nodes:
...@@ -51,8 +51,8 @@ class DependencyTraversalDFS: ...@@ -51,8 +51,8 @@ class DependencyTraversalDFS:
component_id, current_node_labels, entity_type, workflow_list = get_node_details(session, node_id) component_id, current_node_labels, entity_type, workflow_list = get_node_details(session, node_id)
# If an OutParameter node has a new component_id, expand allowed list # If an InParameter node has a new component_id, expand allowed list
if "OutParameter" in current_node_labels and component_id not in allowed_component_ids: if "InParameter" in current_node_labels and component_id not in allowed_component_ids:
allowed_component_ids.add(component_id) allowed_component_ids.add(component_id)
if entity_type == "Workflow": if entity_type == "Workflow":
...@@ -66,11 +66,7 @@ class DependencyTraversalDFS: ...@@ -66,11 +66,7 @@ class DependencyTraversalDFS:
update_workflow_list_of_node(session, node_id, list(workflow_set.copy())) update_workflow_list_of_node(session, node_id, list(workflow_set.copy()))
if "OutParameter" in current_node_labels and component_id in allowed_component_ids:
if "InParameter" in current_node_labels and component_id not in allowed_component_ids:
raise ValueError()
if "InParameter" in current_node_labels and component_id in allowed_component_ids:
allowed_component_ids.remove(component_id) allowed_component_ids.remove(component_id)
if component_id in workflow_set: if component_id in workflow_set:
workflow_set.remove(component_id) workflow_set.remove(component_id)
...@@ -136,15 +132,13 @@ class DependencyTraversalDFS: ...@@ -136,15 +132,13 @@ class DependencyTraversalDFS:
if "OutParameter" in next_node_labels: if "InParameter" in next_node_labels:
create_calculation_component_node(session, next_component_id, next_entity_type) create_calculation_component_node(session, next_component_id, next_entity_type)
create_direct_flow(session, edge_component_id, next_component_id, edge_component_id, data_ids, workflow_list) create_direct_flow(session, edge_component_id, next_component_id, edge_component_id, data_ids, workflow_list)
create_indirect_flow(session, next_component_id, edge_component_id, edge_component_id, data_ids, workflow_list) create_indirect_flow(session, next_component_id, edge_component_id, edge_component_id, data_ids, workflow_list)
# If the relation is B -> A where B is InParameter and A is OutParameter if "OutParameter" in current_node_labels:
if "InParameter" in current_node_labels: create_sequential_indirect_flow(session, component_id, next_component_id, edge_component_id, data_ids, workflow_list)
# Then there is a direct flow from A to B
create_sequential_indirect_flow(session, next_component_id, component_id, edge_component_id, data_ids, workflow_list)
# Recursively continue DFS # Recursively continue DFS
self._dfs_traverse(session, next_node_id, visited_nodes, entities) self._dfs_traverse(session, next_node_id, visited_nodes, entities)
...@@ -21,7 +21,7 @@ def create_in_param_relationship(driver: Driver, prefixed_component_id: str, par ...@@ -21,7 +21,7 @@ def create_in_param_relationship(driver: Driver, prefixed_component_id: str, par
query = """ query = """
MATCH (c:Component {component_id: $component_id}), (p:InParameter) MATCH (c:Component {component_id: $component_id}), (p:InParameter)
WHERE elementId(p) = $parameter_internal_id WHERE elementId(p) = $parameter_internal_id
MERGE (c)-[r:DATA {component_id: $component_id}]->(p) MERGE (c)<-[r:DATA_FLOW {component_id: $component_id}]->(p)
SET r.data_ids = SET r.data_ids =
CASE CASE
WHEN r.data_ids IS NULL THEN [p.parameter_id] WHEN r.data_ids IS NULL THEN [p.parameter_id]
...@@ -55,7 +55,7 @@ def create_out_param_relationship(driver: Driver, prefixed_component_id: str, pa ...@@ -55,7 +55,7 @@ def create_out_param_relationship(driver: Driver, prefixed_component_id: str, pa
query = """ query = """
MATCH (c:Component {component_id: $component_id}), (p: OutParameter) MATCH (c:Component {component_id: $component_id}), (p: OutParameter)
WHERE elementId(p) = $parameter_internal_id WHERE elementId(p) = $parameter_internal_id
MERGE (c)<-[r:DATA {component_id: $component_id}]-(p) MERGE (c)-[r:DATA_FLOW {component_id: $component_id}]->(p)
SET r.data_ids = SET r.data_ids =
CASE CASE
WHEN r.data_ids IS NULL THEN [p.parameter_id] WHEN r.data_ids IS NULL THEN [p.parameter_id]
...@@ -88,7 +88,7 @@ def create_data_relationship(driver: Driver, from_internal_node_id: int, to_inte ...@@ -88,7 +88,7 @@ def create_data_relationship(driver: Driver, from_internal_node_id: int, to_inte
query = """ query = """
MATCH (a), (b) MATCH (a), (b)
WHERE elementId(a) = $from_internal_node_id AND elementId(b) = $to_internal_node_id WHERE elementId(a) = $from_internal_node_id AND elementId(b) = $to_internal_node_id
MERGE (a)-[r:DATA {component_id: $component_id}]->(b) MERGE (a)-[r:DATA_FLOW {component_id: $component_id}]->(b)
SET r.data_ids = SET r.data_ids =
CASE CASE
WHEN r.data_ids IS NULL THEN [$data_id] WHEN r.data_ids IS NULL THEN [$data_id]
...@@ -122,7 +122,7 @@ def create_control_relationship(driver: Driver, from_internal_node_id: int, to_i ...@@ -122,7 +122,7 @@ def create_control_relationship(driver: Driver, from_internal_node_id: int, to_i
query = """ query = """
MATCH (a), (b) MATCH (a), (b)
WHERE elementId(a) = $from_internal_node_id AND elementId(b) = $to_internal_node_id WHERE elementId(a) = $from_internal_node_id AND elementId(b) = $to_internal_node_id
MERGE (a)-[r:CONTROL {component_id: $component_id}]->(b) MERGE (a)-[r:CONTROL_DEPENDENCY {component_id: $component_id}]->(b)
SET r.data_ids = SET r.data_ids =
CASE CASE
WHEN r.data_ids IS NULL THEN [$data_id] WHEN r.data_ids IS NULL THEN [$data_id]
......
...@@ -30,7 +30,7 @@ def get_nodes_with_control_edges(session: Session): ...@@ -30,7 +30,7 @@ def get_nodes_with_control_edges(session: Session):
def get_valid_connections(session: Session, node_id, allowed_component_ids): def get_valid_connections(session: Session, node_id, allowed_component_ids):
query = """ query = """
MATCH (n)-[r: DATA]->(m) MATCH (n)-[r: DATA_FLOW]->(m)
WHERE elementId(n) = $node_id AND r.component_id IN $allowed_component_ids WHERE elementId(n) = $node_id AND r.component_id IN $allowed_component_ids
RETURN elementId(m) AS nextNodeId, elementId(r) AS relId, m.component_id AS nextComponentId, RETURN elementId(m) AS nextNodeId, elementId(r) AS relId, m.component_id AS nextComponentId,
labels(m) AS nodeLabels, type(r) AS relType, r.component_id AS relComponentId, labels(m) AS nodeLabels, type(r) AS relType, r.component_id AS relComponentId,
...@@ -69,7 +69,7 @@ def update_workflow_list_of_edge(session: Session, edge_id: str, workflow_ids: l ...@@ -69,7 +69,7 @@ def update_workflow_list_of_edge(session: Session, edge_id: str, workflow_ids: l
session.run(query, edge_id=edge_id, workflow_ids=workflow_ids) session.run(query, edge_id=edge_id, workflow_ids=workflow_ids)
def get_workflow_list_of_data_edge(session: Session, source_id: str, target_id:str, edge_component_id: str): def get_workflow_list_of_data_edge(session: Session, source_id: str, target_id:str, edge_component_id: str):
query = """MATCH (m)-[r:DATA]->(n) query = """MATCH (m)-[r:DATA_FLOW]->(n)
WHERE elementId(m) = $source_id AND elementId(n) = $target_id AND r.component_id = $component_id WHERE elementId(m) = $source_id AND elementId(n) = $target_id AND r.component_id = $component_id
RETURN r.workflow_list AS workflow_list""" RETURN r.workflow_list AS workflow_list"""
result = session.run(query, source_id=source_id, target_id=target_id, component_id=edge_component_id) result = session.run(query, source_id=source_id, target_id=target_id, component_id=edge_component_id)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment