Skip to content
Snippets Groups Projects
Commit ce4291a5 authored by Chiara Liotta's avatar Chiara Liotta
Browse files

add step_id to edges, clean code

parent 3e0f7a2c
Branches
No related tags found
No related merge requests found
...@@ -2,9 +2,8 @@ from pathlib import Path ...@@ -2,9 +2,8 @@ from pathlib import Path
import re import re
from neo4j import Driver from neo4j import Driver
from graph_creation.utils import extract_js_expression_dependencies, get_input_source, process_control_dependencies, process_in_param, process_parameter_source from graph_creation.utils import extract_js_expression_dependencies, get_input_source, process_control_dependencies, process_in_param, process_parameter_source
from neo4j_dependency_queries.create_node_queries import ensure_component_node, ensure_git_node, ensure_in_parameter_node, ensure_out_parameter_node from neo4j_dependency_queries.create_node_queries import ensure_git_node, ensure_in_parameter_node, ensure_out_parameter_node
from neo4j_dependency_queries.create_edge_queries import create_control_relationship, create_data_relationship, create_out_param_relationship, create_references_relationship from neo4j_dependency_queries.create_edge_queries import create_out_param_relationship, create_references_relationship
from neo4j_dependency_queries.utils import get_is_workflow from neo4j_dependency_queries.utils import get_is_workflow
def process_cwl_inputs(driver: Driver, cwl_entity: dict) -> None: def process_cwl_inputs(driver: Driver, cwl_entity: dict) -> None:
...@@ -82,7 +81,7 @@ def process_cwl_outputs(driver: Driver, cwl_entity: dict, step_lookup) -> None: ...@@ -82,7 +81,7 @@ def process_cwl_outputs(driver: Driver, cwl_entity: dict, step_lookup) -> None:
for source_id in output['outputSource']: for source_id in output['outputSource']:
process_parameter_source(driver, out_param_node_internal_id, source_id, component_id, step_lookup) process_parameter_source(driver, out_param_node_internal_id, source_id, component_id, step_lookup)
def process_cwl_steps(driver: Driver, cwl_entity: dict, tool_paths: list[str], step_lookup) -> None: def process_cwl_steps(driver: Driver, cwl_entity: dict, step_lookup) -> None:
""" """
Processes the steps of a CWL entity, creating necessary nodes and relationships Processes the steps of a CWL entity, creating necessary nodes and relationships
for each step. The function handles the inputs, outputs, and control dependencies associated with each step for each step. The function handles the inputs, outputs, and control dependencies associated with each step
...@@ -120,30 +119,20 @@ def process_cwl_steps(driver: Driver, cwl_entity: dict, tool_paths: list[str], s ...@@ -120,30 +119,20 @@ def process_cwl_steps(driver: Driver, cwl_entity: dict, tool_paths: list[str], s
# Get the resolved path of the step from the step_lookup # Get the resolved path of the step from the step_lookup
step_path = step_lookup[step['id']] step_path = step_lookup[step['id']]
is_tool = step_path in tool_paths
# Create the step component node if it's a tool
if step_path in tool_paths:
is_tool = True
s_node = ensure_component_node(driver, step_path)
s_node_internal_id = s_node[0]
# Process the list of inputs of the step # Process the list of inputs of the step
for input in step['in']: for input in step['in']:
# Create in-parameter node with ID as defined in the component and component ID equal to the path of the step # Create in-parameter node with ID as defined in the component and component ID equal to the path of the step
param_node = ensure_in_parameter_node(driver, input['id'], step_path) param_node = ensure_in_parameter_node(driver, input['id'], step_path)
param_node_internal_id = param_node[0] param_node_internal_id = param_node[0]
if is_tool:
create_data_relationship(driver, param_node_internal_id, s_node_internal_id, step_path, input['id'])
# Inputs can have one or multiple data sources (data nodes) # Inputs can have one or multiple data sources (data nodes)
if 'source' in input: if 'source' in input:
if isinstance(input['source'], str): if isinstance(input['source'], str):
source_id = input['source'] source_id = input['source']
process_parameter_source(driver, param_node_internal_id, source_id, workflow_id, step_lookup) process_parameter_source(driver, param_node_internal_id, source_id, workflow_id, step_lookup, step['id'])
elif isinstance(input['source'], list): elif isinstance(input['source'], list):
for source_id in input['source']: for source_id in input['source']:
process_parameter_source(driver, param_node_internal_id, source_id, workflow_id, step_lookup) process_parameter_source(driver, param_node_internal_id, source_id, workflow_id, step_lookup, step['id'])
# Process the "when" field, aka control dependencies # Process the "when" field, aka control dependencies
if 'when' in step: if 'when' in step:
...@@ -158,26 +147,12 @@ def process_cwl_steps(driver: Driver, cwl_entity: dict, tool_paths: list[str], s ...@@ -158,26 +147,12 @@ def process_cwl_steps(driver: Driver, cwl_entity: dict, tool_paths: list[str], s
if source: if source:
if isinstance(source, list): if isinstance(source, list):
for source_id in source: for source_id in source:
process_control_dependencies(driver, source_id, workflow_id, step_path, step_lookup) process_control_dependencies(driver, source_id, workflow_id, step_path, step_lookup, step['id'])
else: else:
process_control_dependencies(driver, source, workflow_id, step_path, step_lookup) process_control_dependencies(driver, source, workflow_id, step_path, step_lookup, step['id'])
control_relationships[step_path] = () control_relationships[step_path] = ()
# Process the list of outputs of the step
for output in step['out']:
# An output can be defined as a dictionary or simply as a string (ID only)
# Create out-parameter node with ID as defined in the component and component ID equal to the path of the step
if isinstance(output, dict):
output_id = output['id']
else:
output_id = output
param_node = ensure_out_parameter_node(driver, output_id, step_path)
param_node_internal_id = param_node[0]
if is_tool:
# Create a data edge from out-parameter node to the step component node
create_data_relationship(driver, s_node_internal_id, param_node_internal_id, step_path, output_id)
def process_cwl_base_commands(driver, entity, links): def process_cwl_base_commands(driver, entity, links):
base_command_key = "baseCommand" base_command_key = "baseCommand"
......
...@@ -39,14 +39,12 @@ def process_repos(repo_list: list[str], driver: Driver) -> None: ...@@ -39,14 +39,12 @@ def process_repos(repo_list: list[str], driver: Driver) -> None:
print(f'Processing: {entity["path"]}') print(f'Processing: {entity["path"]}')
is_workflow = get_is_workflow(entity) is_workflow = get_is_workflow(entity)
steps = None steps = None
if not is_workflow: if is_workflow:
ensure_component_node(driver, entity['path'])
else:
steps = process_step_lookup(entity) steps = process_step_lookup(entity)
process_cwl_inputs(driver, entity) process_cwl_inputs(driver, entity)
process_cwl_outputs(driver, entity, steps) process_cwl_outputs(driver, entity, steps)
if steps: if steps:
process_cwl_steps(driver, entity, tool_paths, steps) process_cwl_steps(driver, entity, steps)
# elif entity['class'] == 'ExpressionTool': # elif entity['class'] == 'ExpressionTool':
# process_cwl_expression(driver, entity) # process_cwl_expression(driver, entity)
# elif entity['class'] == 'CommandLineTool': # elif entity['class'] == 'CommandLineTool':
......
...@@ -2,7 +2,7 @@ from pathlib import Path ...@@ -2,7 +2,7 @@ from pathlib import Path
from neo4j import Driver from neo4j import Driver
import re import re
from graph_creation.cwl_parsing import get_cwl_from_repo from graph_creation.cwl_parsing import get_cwl_from_repo
from neo4j_dependency_queries.create_node_queries import ensure_in_parameter_node, ensure_out_parameter_node from neo4j_dependency_queries.create_node_queries import ensure_component_node, ensure_in_parameter_node, ensure_out_parameter_node
from neo4j_dependency_queries.create_edge_queries import create_control_relationship, create_data_relationship, create_in_param_relationship from neo4j_dependency_queries.create_edge_queries import create_control_relationship, create_data_relationship, create_in_param_relationship
from neo4j_dependency_queries.processing_queries import get_all_in_parameter_nodes_of_entity from neo4j_dependency_queries.processing_queries import get_all_in_parameter_nodes_of_entity
...@@ -47,9 +47,10 @@ def process_in_param(driver: Driver, param_id: str, component_id: str, param_typ ...@@ -47,9 +47,10 @@ def process_in_param(driver: Driver, param_id: str, component_id: str, param_typ
param_node = ensure_in_parameter_node(driver, param_id, component_id, param_type, entity_type) param_node = ensure_in_parameter_node(driver, param_id, component_id, param_type, entity_type)
if entity_type != "Workflow": if entity_type != "Workflow":
ensure_component_node(driver, component_id)
create_in_param_relationship(driver, component_id, param_node[0]) create_in_param_relationship(driver, component_id, param_node[0])
def process_parameter_source(driver: Driver, param_node_internal_id: int, source_id: str, workflow_id: str, step_lookup: dict) -> None: def process_parameter_source(driver: Driver, param_node_internal_id: int, source_id: str, workflow_id: str, step_lookup: dict, step_id: str = "") -> None:
""" """
Processes a parameter source by creating a data relationship between a parameter node and its source. Processes a parameter source by creating a data relationship between a parameter node and its source.
...@@ -67,7 +68,7 @@ def process_parameter_source(driver: Driver, param_node_internal_id: int, source ...@@ -67,7 +68,7 @@ def process_parameter_source(driver: Driver, param_node_internal_id: int, source
source_param_node = get_source_node(driver, source_id, workflow_id, step_lookup) source_param_node = get_source_node(driver, source_id, workflow_id, step_lookup)
# Create a relationship between the parameter node and its source # Create a relationship between the parameter node and its source
create_data_relationship(driver, source_param_node, param_node_internal_id,workflow_id, source_id) create_data_relationship(driver, source_param_node, param_node_internal_id,workflow_id, source_id, step_id)
def get_source_node(driver: Driver, source_id: str, workflow_id: str, step_lookup: dict) -> None: def get_source_node(driver: Driver, source_id: str, workflow_id: str, step_lookup: dict) -> None:
...@@ -100,7 +101,7 @@ def get_source_node(driver: Driver, source_id: str, workflow_id: str, step_looku ...@@ -100,7 +101,7 @@ def get_source_node(driver: Driver, source_id: str, workflow_id: str, step_looku
source_param_node = ensure_out_parameter_node(driver, source_parsed[1], sub_component_id)[0] source_param_node = ensure_out_parameter_node(driver, source_parsed[1], sub_component_id)[0]
return source_param_node return source_param_node
def process_control_dependencies(driver: Driver, source_id: str, workflow_id: str, component_id: str, step_lookup: dict): def process_control_dependencies(driver: Driver, source_id: str, workflow_id: str, component_id: str, step_lookup: dict, step_id: str):
""" """
Processes control dependencies by creating control relationships between the given source and Processes control dependencies by creating control relationships between the given source and
all in-parameters of the specified component. all in-parameters of the specified component.
...@@ -120,7 +121,7 @@ def process_control_dependencies(driver: Driver, source_id: str, workflow_id: st ...@@ -120,7 +121,7 @@ def process_control_dependencies(driver: Driver, source_id: str, workflow_id: st
in_parameters = get_all_in_parameter_nodes_of_entity(session, component_id) in_parameters = get_all_in_parameter_nodes_of_entity(session, component_id)
node_ids = [record["nodeId"] for record in in_parameters] node_ids = [record["nodeId"] for record in in_parameters]
for node_id in node_ids: for node_id in node_ids:
create_control_relationship(driver, node_id, source_param_node, workflow_id, source_id) create_control_relationship(driver, node_id, source_param_node, workflow_id, source_id, step_id)
def resolve_relative_path(path: Path)-> Path: def resolve_relative_path(path: Path)-> Path:
......
...@@ -77,9 +77,9 @@ class DependencyTraversalDFS: ...@@ -77,9 +77,9 @@ class DependencyTraversalDFS:
for node_id in start_nodes: for node_id in start_nodes:
workflow_set = {start_component_id} workflow_set = {start_component_id}
self._dfs_traverse_paths(session, start_component_id, node_id, workflow_set,deque([]), bookkeeping) self._dfs_traverse_paths(session, node_id, workflow_set,deque([]), bookkeeping)
def _dfs_traverse_paths(self, session: Session, workflow_id: str, node_id: int, workflow_set: set, entity_queue: deque, bookkeeping: dict[str, list]): def _dfs_traverse_paths(self, session: Session, node_id: int, workflow_set: set, entity_queue: deque, bookkeeping: dict[str, list]):
"""Recursively performs DFS traversal and saves the subgraph.""" """Recursively performs DFS traversal and saves the subgraph."""
component_id, current_node_labels, entity_type = get_node_details(session, node_id) component_id, current_node_labels, entity_type = get_node_details(session, node_id)
...@@ -107,9 +107,7 @@ class DependencyTraversalDFS: ...@@ -107,9 +107,7 @@ class DependencyTraversalDFS:
current_list = list(entity_queue) current_list = list(entity_queue)
if node_id in bookkeeping: if node_id in bookkeeping:
for existing_list in bookkeeping[node_id]: for existing_list in bookkeeping[node_id]:
if existing_list == current_list: if len(existing_list) >= len(current_list):
return
elif len(existing_list) > len(current_list):
if existing_list[-len(current_list):] == current_list: if existing_list[-len(current_list):] == current_list:
return return
else: else:
...@@ -132,7 +130,7 @@ class DependencyTraversalDFS: ...@@ -132,7 +130,7 @@ class DependencyTraversalDFS:
update_workflow_list_of_edge(session, edge_id, sorted(list(edge_workflow_set))) update_workflow_list_of_edge(session, edge_id, sorted(list(edge_workflow_set)))
new_queue = copy.deepcopy(entity_queue) new_queue = copy.deepcopy(entity_queue)
# Recursively continue DFS # Recursively continue DFS
self._dfs_traverse_paths(session, workflow_id, next_node_id, edge_workflow_set, new_queue, bookkeeping) self._dfs_traverse_paths(session, next_node_id, edge_workflow_set, new_queue, bookkeeping)
def traverse_graph_create_flows(self): def traverse_graph_create_flows(self):
with self.driver.session() as session: with self.driver.session() as session:
......
from neo4j import Driver from neo4j import Driver
from neo4j_dependency_queries.create_node_queries import ensure_component_node
from neo4j_dependency_queries.utils import clean_component_id from neo4j_dependency_queries.utils import clean_component_id
def create_in_param_relationship(driver: Driver, prefixed_component_id: str, parameter_internal_id: int) -> tuple[str,str]: def create_in_param_relationship(driver: Driver, prefixed_component_id: str, parameter_internal_id: int) -> tuple[str,str]:
...@@ -18,6 +19,7 @@ def create_in_param_relationship(driver: Driver, prefixed_component_id: str, par ...@@ -18,6 +19,7 @@ def create_in_param_relationship(driver: Driver, prefixed_component_id: str, par
tuple[str,str]: the component ID of the component, the parameter ID of the parameter tuple[str,str]: the component ID of the component, the parameter ID of the parameter
""" """
component_id = clean_component_id(prefixed_component_id) component_id = clean_component_id(prefixed_component_id)
ensure_component_node(driver, component_id)
query = """ query = """
MATCH (c:Component {component_id: $component_id}), (p:InParameter) MATCH (c:Component {component_id: $component_id}), (p:InParameter)
WHERE elementId(p) = $parameter_internal_id WHERE elementId(p) = $parameter_internal_id
...@@ -52,6 +54,7 @@ def create_out_param_relationship(driver: Driver, prefixed_component_id: str, pa ...@@ -52,6 +54,7 @@ def create_out_param_relationship(driver: Driver, prefixed_component_id: str, pa
tuple[str,str]: the component ID of the component, the parameter ID of the parameter tuple[str,str]: the component ID of the component, the parameter ID of the parameter
""" """
component_id = clean_component_id(prefixed_component_id) component_id = clean_component_id(prefixed_component_id)
ensure_component_node(driver, component_id)
query = """ query = """
MATCH (c:Component {component_id: $component_id}), (p: OutParameter) MATCH (c:Component {component_id: $component_id}), (p: OutParameter)
WHERE elementId(p) = $parameter_internal_id WHERE elementId(p) = $parameter_internal_id
...@@ -70,7 +73,7 @@ def create_out_param_relationship(driver: Driver, prefixed_component_id: str, pa ...@@ -70,7 +73,7 @@ def create_out_param_relationship(driver: Driver, prefixed_component_id: str, pa
return result return result
def create_data_relationship(driver: Driver, from_internal_node_id: int, to_internal_node_id: int, component_id: str, data_id: str) -> tuple[int,int]: def create_data_relationship(driver: Driver, from_internal_node_id: int, to_internal_node_id: int, component_id: str, data_id: str, step_id: str = "") -> tuple[int,int]:
""" """
Creates a data dependency relationship in Neo4j between the two nodes with Neo4j internal IDs given as parameters. Creates a data dependency relationship in Neo4j between the two nodes with Neo4j internal IDs given as parameters.
This relationship is an outgoing data edge from the node with internal ID from_internal_node_id This relationship is an outgoing data edge from the node with internal ID from_internal_node_id
...@@ -88,7 +91,7 @@ def create_data_relationship(driver: Driver, from_internal_node_id: int, to_inte ...@@ -88,7 +91,7 @@ def create_data_relationship(driver: Driver, from_internal_node_id: int, to_inte
query = """ query = """
MATCH (a), (b) MATCH (a), (b)
WHERE elementId(a) = $from_internal_node_id AND elementId(b) = $to_internal_node_id WHERE elementId(a) = $from_internal_node_id AND elementId(b) = $to_internal_node_id
MERGE (a)-[r:DATA_FLOW {component_id: $component_id}]->(b) MERGE (a)-[r:DATA_FLOW {component_id: $component_id, step_id: $step_id}]->(b)
SET r.data_ids = SET r.data_ids =
CASE CASE
WHEN r.data_ids IS NULL THEN [$data_id] WHEN r.data_ids IS NULL THEN [$data_id]
...@@ -99,12 +102,14 @@ def create_data_relationship(driver: Driver, from_internal_node_id: int, to_inte ...@@ -99,12 +102,14 @@ def create_data_relationship(driver: Driver, from_internal_node_id: int, to_inte
""" """
with driver.session() as session: with driver.session() as session:
result = session.run(query, from_internal_node_id=from_internal_node_id, result = session.run(query, from_internal_node_id=from_internal_node_id,
to_internal_node_id=to_internal_node_id, component_id= clean_id, data_id=data_id) to_internal_node_id=to_internal_node_id, component_id= clean_id, data_id=data_id,
step_id=step_id)
record = result.single() record = result.single()
return record["id_1"], record["id_2"] return record["id_1"], record["id_2"]
def create_control_relationship(driver: Driver, from_internal_node_id: int, to_internal_node_id: int, component_id: str, data_id: str) -> tuple[int,int]: def create_control_relationship(driver: Driver, from_internal_node_id: int, to_internal_node_id: int, component_id: str,
data_id: str, step_id: str) -> tuple[int,int]:
""" """
Creates a control dependency relationship in Neo4j between the two nodes with Neo4j internal IDs given as parameters. Creates a control dependency relationship in Neo4j between the two nodes with Neo4j internal IDs given as parameters.
This relationship is an outgoing control edge from the node with internal ID from_internal_node_id This relationship is an outgoing control edge from the node with internal ID from_internal_node_id
...@@ -122,7 +127,7 @@ def create_control_relationship(driver: Driver, from_internal_node_id: int, to_i ...@@ -122,7 +127,7 @@ def create_control_relationship(driver: Driver, from_internal_node_id: int, to_i
query = """ query = """
MATCH (a), (b) MATCH (a), (b)
WHERE elementId(a) = $from_internal_node_id AND elementId(b) = $to_internal_node_id WHERE elementId(a) = $from_internal_node_id AND elementId(b) = $to_internal_node_id
MERGE (a)-[r:CONTROL_DEPENDENCY {component_id: $component_id}]->(b) MERGE (a)-[r:CONTROL_DEPENDENCY {component_id: $component_id, step_id: $step_id}]->(b)
SET r.data_ids = SET r.data_ids =
CASE CASE
WHEN r.data_ids IS NULL THEN [$data_id] WHEN r.data_ids IS NULL THEN [$data_id]
...@@ -133,7 +138,8 @@ def create_control_relationship(driver: Driver, from_internal_node_id: int, to_i ...@@ -133,7 +138,8 @@ def create_control_relationship(driver: Driver, from_internal_node_id: int, to_i
""" """
with driver.session() as session: with driver.session() as session:
result = session.run(query, from_internal_node_id=from_internal_node_id, result = session.run(query, from_internal_node_id=from_internal_node_id,
to_internal_node_id=to_internal_node_id, component_id=clean_id, data_id=data_id) to_internal_node_id=to_internal_node_id, component_id=clean_id,
data_id=data_id, step_id=step_id)
record = result.single() record = result.single()
return record["id_1"], record["id_2"] return record["id_1"], record["id_2"]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment