Skip to content
Snippets Groups Projects
Commit 1e83a20f authored by Chiara Liotta's avatar Chiara Liotta
Browse files

delete workflow nodes, different data edge type and in/out param types for color coding

parent 6f1c8163
No related branches found
No related tags found
No related merge requests found
from neo4j import Driver
from graph_creation.cst_processing import traverse_and_create, traverse_when_statement_extract_dependencies
from graph_creation.utils import create_input_nodes_and_relationships, create_main_input_nodes_and_relationships, process_source_relationship, resolve_relative_path
from neo4j_queries.node_queries import ensure_component_node, ensure_data_node, ensure_in_parameter_node, ensure_main_out_parameter_node, ensure_out_parameter_node
from neo4j_queries.edge_queries import create_control_relationship, create_data_relationship, create_main_out_param_relationship, create_out_param_relationship
from graph_creation.utils import create_input_nodes_and_relationships, create_main_input_nodes_and_relationships, create_right_control_relationship, create_right_data_relationship, process_source_relationship, resolve_relative_path
from neo4j_queries.node_queries import ensure_component_node, ensure_data_node, ensure_in_parameter_node, ensure_main_component_node, ensure_main_in_parameter_node, ensure_main_out_parameter_node, ensure_out_parameter_node
from neo4j_queries.edge_queries import create_data_relationship, create_main_out_param_relationship, create_out_param_relationship
from pathlib import Path
from parsers.javascript_parsing import parse_javascript_expression_string, parse_javascript_string
......@@ -119,7 +119,7 @@ def process_cwl_main_outputs(driver: Driver, cwl_entity: dict) -> None:
param_node_internal_id = param_node[0]
# Create out-parameter node with the parameter ID as defined in the component
# and component ID equal to the path of the componet
create_main_out_param_relationship(driver, component_id, param_node_internal_id)
# create_main_out_param_relationship(driver, component_id, param_node_internal_id)
# Create a data node with component ID of the component and data ID equal to output source defined in the component
# and a data edge from the out-parameter node to the data node
if 'outputSource' in output:
......@@ -169,15 +169,20 @@ def process_cwl_steps(driver: Driver, cwl_entity: dict) -> None:
step_path = str(resolve_relative_path(full_step_path))
# Create the step component node with ID equal to the step
if "compress_pipeline.cwl" not in step_path:
s_node = ensure_component_node(driver, step_path)
s_node_internal_id = s_node[0]
# Process the list of inputs of the step
for input in step['in']:
# Create in-parameter node with ID as defined in the component and component ID equal to the path of the step
if "compress_pipeline.cwl" in step_path:
param_node = ensure_main_in_parameter_node(driver, input['id'], step_path)
else:
param_node = ensure_in_parameter_node(driver, input['id'], step_path)
param_node_internal_id = param_node[0]
# Create a data edge from the step component node to the in-parameter node
if "compress_pipeline.cwl" not in step_path:
create_data_relationship(driver, s_node_internal_id, param_node_internal_id)
# Inputs can have one or multiple data sources (data nodes)
......@@ -200,6 +205,9 @@ def process_cwl_steps(driver: Driver, cwl_entity: dict) -> None:
for ref in when_refs:
ref_id = ref[1]
if ref[0] == "parameter":
if "compress_pipeline.cwl" in step_path:
input_data = ensure_main_in_parameter_node(driver, ref_id, step_path)[0]
else:
input_data = ensure_in_parameter_node(driver, ref_id, step_path)[0]
nodes.append(input_data)
elif ref[0] == "step_output":
......@@ -207,7 +215,7 @@ def process_cwl_steps(driver: Driver, cwl_entity: dict) -> None:
nodes.append(step_output)
for node in nodes:
create_control_relationship(driver, s_node_internal_id, node, cwl_entity['path'])
create_right_control_relationship(driver, s_node_internal_id, node, cwl_entity['path'])
# Process the list of outputs of the step
for output in step['out']:
......@@ -217,16 +225,21 @@ def process_cwl_steps(driver: Driver, cwl_entity: dict) -> None:
else:
output_id = output
# Create out-parameter node with ID as defined in the component and component ID equal to the path of the step
if "compress_pipeline.cwl" in step_path:
param_node = ensure_main_out_parameter_node(driver, output_id, step_path)
else:
param_node = ensure_out_parameter_node(driver, output_id, step_path)
param_node_internal_id = param_node[0]
# Create a data edge from out-parameter node to the step component node
if "compress_pipeline.cwl" not in step_path:
create_data_relationship(driver, param_node_internal_id, s_node_internal_id)
# Create data node with id equal to step_id/output_id and component ID equal to the path of the outer workflow
outer_output_id = f"{step['id']}/{output_id}"
data_node = ensure_data_node(driver, outer_output_id, cwl_entity['path'])
data_node_internal_id = data_node[0]
nice_id = Path(cwl_entity['path']).stem
# Create a data edge from the data node to the out-parameter node
create_data_relationship(driver, data_node_internal_id, param_node_internal_id)
create_right_data_relationship(driver, data_node_internal_id, param_node_internal_id, nice_id)
def process_cwl_expression(driver: Driver, entity: dict) -> None:
expression = entity['expression']
......
......@@ -21,7 +21,7 @@ def process_repos(repo_list: list[str], driver: Driver) -> None:
pipelines = ["compress_pipeline.cwl"]
if any(pipeline in entity['path'] for pipeline in pipelines):
component_id = entity['path']
ensure_main_component_node(driver, component_id)
# ensure_main_component_node(driver, component_id)
process_cwl_main_inputs(driver, entity)
process_cwl_main_outputs(driver, entity)
if entity['class'] == 'Workflow':
......
from pathlib import Path
from neo4j import Driver
from neo4j_queries.node_queries import ensure_data_node, ensure_in_parameter_node, ensure_main_in_parameter_node
from neo4j_queries.edge_queries import create_data_relationship, create_in_param_relationship, create_main_in_param_relationship
from neo4j_queries.edge_queries import create_compress_control_relationship, create_compress_data_relationship, create_control_relationship, create_data_relationship, create_download_control_relationship, create_download_data_relationship, create_in_param_relationship, create_main_in_param_relationship
def create_right_data_relationship(driver: Driver, node_internal_id_1: int, node_internal_id_2: int, nice_id: str) -> None:
if "compress_pipeline" == nice_id:
create_compress_data_relationship(driver, node_internal_id_1, node_internal_id_2)
elif "download_and_compress_pipeline" == nice_id:
create_download_data_relationship(driver, node_internal_id_1, node_internal_id_2)
else:
create_data_relationship(driver, node_internal_id_1, node_internal_id_2)
def create_right_control_relationship(driver: Driver, node_internal_id_1: int, node_internal_id_2: int, component_id: str) -> None:
nice_id = Path(component_id).stem
if "compress_pipeline" == nice_id:
create_compress_control_relationship(driver, node_internal_id_1, node_internal_id_2, component_id)
elif "download_and_compress_pipeline" == nice_id:
create_download_control_relationship(driver, node_internal_id_1, node_internal_id_2, component_id)
else:
create_control_relationship(driver, node_internal_id_1, node_internal_id_2, component_id)
def create_input_nodes_and_relationships(driver: Driver, input_id: str, component_id: str) -> None:
"""
......@@ -17,6 +36,7 @@ def create_input_nodes_and_relationships(driver: Driver, input_id: str, componen
input_id (str): the ID of the input as defined in the CWL component
component_id (str): the unique ID of the CWL component (its path)
"""
nice_id = Path(component_id).stem
# Create in-parameter with the parameter ID as defined in the component and component ID equal to the path of the componet
param_node = ensure_in_parameter_node(driver, input_id, component_id)
param_node_internal_id = param_node[0]
......@@ -26,7 +46,7 @@ def create_input_nodes_and_relationships(driver: Driver, input_id: str, componen
data_node = ensure_data_node(driver, input_id, component_id)
data_node_internal_id = data_node[0]
# Create a data edge from the data node to the the in-parameter node
create_data_relationship(driver, data_node_internal_id, param_node_internal_id)
create_right_data_relationship(driver, data_node_internal_id, param_node_internal_id, nice_id)
def create_main_input_nodes_and_relationships(driver: Driver, input_id: str, component_id: str) -> None:
"""
......@@ -42,16 +62,17 @@ def create_main_input_nodes_and_relationships(driver: Driver, input_id: str, com
input_id (str): the ID of the input as defined in the CWL component
component_id (str): the unique ID of the CWL component (its path)
"""
nice_id = Path(component_id).stem
# Create in-parameter with the parameter ID as defined in the component and component ID equal to the path of the componet
param_node = ensure_main_in_parameter_node(driver, input_id, component_id)
param_node_internal_id = param_node[0]
# Create a data edge from the component node to the in-parameter node
create_main_in_param_relationship(driver, component_id, param_node_internal_id)
# create_main_in_param_relationship(driver, component_id, param_node_internal_id)
# Create a data node with component ID of the component and data ID equal to the parameter ID
data_node = ensure_data_node(driver, input_id, component_id)
data_node_internal_id = data_node[0]
# Create a data edge from the data node to the the in-parameter node
create_data_relationship(driver, data_node_internal_id, param_node_internal_id)
create_right_data_relationship(driver, data_node_internal_id, param_node_internal_id, nice_id)
def process_source_relationship(driver: Driver, source_id: str, component_id: str, param_node_internal_id: int) -> None:
"""
......@@ -67,9 +88,11 @@ def process_source_relationship(driver: Driver, source_id: str, component_id: st
component_id (str): the unique ID of the CWL component (its path)
param_node_internal_id (int): the unique ID of the parameter node as defined internally by Neo4j
"""
nice_id = Path(component_id).stem
data_node = ensure_data_node(driver, source_id, component_id)
data_node_internal_id = data_node[0]
create_data_relationship(driver, param_node_internal_id, data_node_internal_id)
create_right_data_relationship(driver, param_node_internal_id, data_node_internal_id, nice_id)
def resolve_relative_path(path: Path)-> Path:
"""
......
......@@ -143,6 +143,58 @@ def create_data_relationship(driver: Driver, from_internal_node_id: int, to_inte
record = result.single()
return record["id_1"], record["id_2"]
def create_compress_data_relationship(driver: Driver, from_internal_node_id: int, to_internal_node_id: int) -> tuple[int,int]:
"""
Creates a data dependency relationship in Neo4j between the two nodes with Neo4j internal IDs given as parameters.
This relationship is an outgoing data edge from the node with internal ID from_internal_node_id
to the node with internal ID to_internal_node_id.
Parameters:
driver (Driver): the Neo4j driver
from_internal_node_id (int): the internal Neo4j ID of the first node
to_internal_node_id (int): the internal Neo4j ID of the second node
Returns:
tuple[int,int]: from_internal_node_id, to_internal_node_id
"""
query = """
MATCH (a), (b)
WHERE elementId(a) = $from_internal_node_id AND elementId(b) = $to_internal_node_id
MERGE (a)-[:DATA_COMPRESS]->(b)
RETURN elementId(a) AS id_1, elementId(b) AS id_2
"""
with driver.session() as session:
result = session.run(query, from_internal_node_id=from_internal_node_id,
to_internal_node_id=to_internal_node_id)
record = result.single()
return record["id_1"], record["id_2"]
def create_download_data_relationship(driver: Driver, from_internal_node_id: int, to_internal_node_id: int) -> tuple[int,int]:
"""
Creates a data dependency relationship in Neo4j between the two nodes with Neo4j internal IDs given as parameters.
This relationship is an outgoing data edge from the node with internal ID from_internal_node_id
to the node with internal ID to_internal_node_id.
Parameters:
driver (Driver): the Neo4j driver
from_internal_node_id (int): the internal Neo4j ID of the first node
to_internal_node_id (int): the internal Neo4j ID of the second node
Returns:
tuple[int,int]: from_internal_node_id, to_internal_node_id
"""
query = """
MATCH (a), (b)
WHERE elementId(a) = $from_internal_node_id AND elementId(b) = $to_internal_node_id
MERGE (a)-[:DATA_DOWNLOAD]->(b)
RETURN elementId(a) AS id_1, elementId(b) AS id_2
"""
with driver.session() as session:
result = session.run(query, from_internal_node_id=from_internal_node_id,
to_internal_node_id=to_internal_node_id)
record = result.single()
return record["id_1"], record["id_2"]
def create_control_relationship(driver: Driver, from_internal_node_id: int, to_internal_node_id: int, component_id: str) -> tuple[int,int]:
"""
......@@ -170,6 +222,58 @@ def create_control_relationship(driver: Driver, from_internal_node_id: int, to_i
record = result.single()
return record["id_1"], record["id_2"]
def create_compress_control_relationship(driver: Driver, from_internal_node_id: int, to_internal_node_id: int, component_id: str) -> tuple[int,int]:
"""
Creates a control dependency relationship in Neo4j between the two nodes with Neo4j internal IDs given as parameters.
This relationship is an outgoing control edge from the node with internal ID from_internal_node_id
to the node with internal ID to_internal_node_id.
Parameters:
driver (Driver): the Neo4j driver
from_internal_node_id (int): the internal Neo4j ID of the first node
to_internal_node_id (int): the internal Neo4j ID of the second node
Returns:
tuple[int,int]: from_internal_node_id, to_internal_node_id
"""
query = """
MATCH (a), (b)
WHERE elementId(a) = $from_internal_node_id AND elementId(b) = $to_internal_node_id
MERGE (a)-[:CONTROL_COMPRESS {component_id: $component_id}]->(b)
RETURN elementId(a) AS id_1, elementId(b) AS id_2
"""
with driver.session() as session:
result = session.run(query, from_internal_node_id=from_internal_node_id,
to_internal_node_id=to_internal_node_id, component_id=component_id)
record = result.single()
return record["id_1"], record["id_2"]
def create_download_control_relationship(driver: Driver, from_internal_node_id: int, to_internal_node_id: int, component_id: str) -> tuple[int,int]:
"""
Creates a control dependency relationship in Neo4j between the two nodes with Neo4j internal IDs given as parameters.
This relationship is an outgoing control edge from the node with internal ID from_internal_node_id
to the node with internal ID to_internal_node_id.
Parameters:
driver (Driver): the Neo4j driver
from_internal_node_id (int): the internal Neo4j ID of the first node
to_internal_node_id (int): the internal Neo4j ID of the second node
Returns:
tuple[int,int]: from_internal_node_id, to_internal_node_id
"""
query = """
MATCH (a), (b)
WHERE elementId(a) = $from_internal_node_id AND elementId(b) = $to_internal_node_id
MERGE (a)-[:CONTROL_DOWNLOAD {component_id: $component_id}]->(b)
RETURN elementId(a) AS id_1, elementId(b) AS id_2
"""
with driver.session() as session:
result = session.run(query, from_internal_node_id=from_internal_node_id,
to_internal_node_id=to_internal_node_id, component_id=component_id)
record = result.single()
return record["id_1"], record["id_2"]
def create_has_child_relationship(driver: Driver, parent_internal_node_id: int, child_internal_node_id: int) -> tuple[int,int]:
"""
Creates a "has child" relationship in Neo4j between the two nodes with Neo4j internal IDs given as parameters.
......@@ -204,6 +308,20 @@ def simplify_data_and_control_edges(driver: Driver):
"""
session.run(create_data_edges_query)
create_data_edges_query = """
MATCH (n1)-[:DATA_COMPRESS]->(n:Data), (n)-[:DATA_COMPRESS]->(n2)
WITH n, n1, n2, n.component_id AS component_id, n.data_id AS data_id
MERGE (n1)-[:DATA_COMPRESS {component_id: component_id, data_id: data_id}]->(n2)
"""
session.run(create_data_edges_query)
create_data_edges_query = """
MATCH (n1)-[:DATA_DOWNLOAD]->(n:Data), (n)-[:DATA_DOWNLOAD]->(n2)
WITH n, n1, n2, n.component_id AS component_id, n.data_id AS data_id
MERGE (n1)-[:DATA_DOWNLOAD {component_id: component_id, data_id: data_id}]->(n2)
"""
session.run(create_data_edges_query)
create_control_edges_query = """
MATCH (n1)-[:CONTROL]->(n:Data), (n)-[:DATA]->(n2)
WITH n, n1, n2, n.component_id AS component_id, n.data_id AS data_id
......
......@@ -94,8 +94,15 @@ def ensure_main_in_parameter_node(driver: Driver, node_id: str, prefixed_compone
tuple[int,str,str]: the Neoj4 internal ID of the parameter node, the parameter ID, the component ID
"""
component_id = clean_component_id(prefixed_component_id)
nice_id = Path(component_id).stem
if nice_id == "compress_pipeline":
query = """
MERGE (n:CompressInParameter {parameter_id: $node_id, component_id: $component_id})
RETURN elementId(n) AS node_internal_id, n.parameter_id AS id_property, n.component_id AS component_id_property
"""
else:
query = """
MERGE (n:MainInParameter {parameter_id: $node_id, component_id: $component_id})
MERGE (n:DownloadInParameter {parameter_id: $node_id, component_id: $component_id})
RETURN elementId(n) AS node_internal_id, n.parameter_id AS id_property, n.component_id AS component_id_property
"""
with driver.session() as session:
......@@ -146,8 +153,15 @@ def ensure_main_out_parameter_node(driver: Driver, node_id: str, prefixed_compon
tuple[int,str,str]: the Neoj4 internal ID of the parameter node, the parameter ID, the component ID
"""
component_id = clean_component_id(prefixed_component_id)
nice_id = Path(component_id).stem
if nice_id == "compress_pipeline":
query = """
MERGE (n:CompressOutParameter {parameter_id: $node_id, component_id: $component_id})
RETURN elementId(n) AS node_internal_id, n.parameter_id AS id_property, n.component_id AS component_id_property
"""
else:
query = """
MERGE (n:MainOutParameter {parameter_id: $node_id, component_id: $component_id})
MERGE (n:DownloadOutParameter {parameter_id: $node_id, component_id: $component_id})
RETURN elementId(n) AS node_internal_id, n.parameter_id AS id_property, n.component_id AS component_id_property
"""
with driver.session() as session:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment