diff --git a/graph_creation/cwl_processing.py b/graph_creation/cwl_processing.py index 8c890b3de4f9e84dd77c317978d0e8b27fbd6c07..34c4f490f1cce09e5367b13118108168fb46ccc6 100644 --- a/graph_creation/cwl_processing.py +++ b/graph_creation/cwl_processing.py @@ -1,8 +1,8 @@ from neo4j import Driver from graph_creation.cst_processing import traverse_and_create, traverse_when_statement_extract_dependencies -from graph_creation.utils import create_input_nodes_and_relationships, create_main_input_nodes_and_relationships, process_source_relationship, resolve_relative_path -from neo4j_queries.node_queries import ensure_component_node, ensure_data_node, ensure_in_parameter_node, ensure_main_out_parameter_node, ensure_out_parameter_node -from neo4j_queries.edge_queries import create_control_relationship, create_data_relationship, create_main_out_param_relationship, create_out_param_relationship +from graph_creation.utils import create_input_nodes_and_relationships, create_main_input_nodes_and_relationships, create_right_control_relationship, create_right_data_relationship, process_source_relationship, resolve_relative_path +from neo4j_queries.node_queries import ensure_component_node, ensure_data_node, ensure_in_parameter_node, ensure_main_component_node, ensure_main_in_parameter_node, ensure_main_out_parameter_node, ensure_out_parameter_node +from neo4j_queries.edge_queries import create_data_relationship, create_main_out_param_relationship, create_out_param_relationship from pathlib import Path from parsers.javascript_parsing import parse_javascript_expression_string, parse_javascript_string @@ -119,7 +119,7 @@ def process_cwl_main_outputs(driver: Driver, cwl_entity: dict) -> None: param_node_internal_id = param_node[0] # Create out-parameter node with the parameter ID as defined in the component # and component ID equal to the path of the componet - create_main_out_param_relationship(driver, component_id, param_node_internal_id) + # create_main_out_param_relationship(driver, component_id, param_node_internal_id) # Create a data node with component ID of the component and data ID equal to output source defined in the component # and a data edge from the out-parameter node to the data node if 'outputSource' in output: @@ -169,16 +169,21 @@ def process_cwl_steps(driver: Driver, cwl_entity: dict) -> None: step_path = str(resolve_relative_path(full_step_path)) # Create the step component node with ID equal to the step - s_node = ensure_component_node(driver, step_path) - s_node_internal_id = s_node[0] + if "compress_pipeline.cwl" not in step_path: + s_node = ensure_component_node(driver, step_path) + s_node_internal_id = s_node[0] # Process the list of inputs of the step for input in step['in']: # Create in-parameter node with ID as defined in the component and component ID equal to the path of the step - param_node = ensure_in_parameter_node(driver, input['id'], step_path) + if "compress_pipeline.cwl" in step_path: + param_node = ensure_main_in_parameter_node(driver, input['id'], step_path) + else: + param_node = ensure_in_parameter_node(driver, input['id'], step_path) param_node_internal_id = param_node[0] # Create a data edge from the step component node to the in-parameter node - create_data_relationship(driver, s_node_internal_id, param_node_internal_id) + if "compress_pipeline.cwl" not in step_path: + create_data_relationship(driver, s_node_internal_id, param_node_internal_id) # Inputs can have one or multiple data sources (data nodes) # A data edge is drawn from the in-parameter node to the data node of the source @@ -200,14 +205,17 @@ def process_cwl_steps(driver: Driver, cwl_entity: dict) -> None: for ref in when_refs: ref_id = ref[1] if ref[0] == "parameter": - input_data = ensure_in_parameter_node(driver, ref_id, step_path)[0] + if "compress_pipeline.cwl" in step_path: + input_data = ensure_main_in_parameter_node(driver, ref_id, step_path)[0] + else: + input_data = ensure_in_parameter_node(driver, ref_id, step_path)[0] nodes.append(input_data) elif ref[0] == "step_output": step_output = ensure_data_node(driver, ref_id, cwl_entity['path'])[0] nodes.append(step_output) for node in nodes: - create_control_relationship(driver, s_node_internal_id, node, cwl_entity['path']) + create_right_control_relationship(driver, s_node_internal_id, node, cwl_entity['path']) # Process the list of outputs of the step for output in step['out']: @@ -217,16 +225,21 @@ def process_cwl_steps(driver: Driver, cwl_entity: dict) -> None: else: output_id = output # Create out-parameter node with ID as defined in the component and component ID equal to the path of the step - param_node = ensure_out_parameter_node(driver, output_id, step_path) + if "compress_pipeline.cwl" in step_path: + param_node = ensure_main_out_parameter_node(driver, output_id, step_path) + else: + param_node = ensure_out_parameter_node(driver, output_id, step_path) param_node_internal_id = param_node[0] # Create a data edge from out-parameter node to the step component node - create_data_relationship(driver, param_node_internal_id, s_node_internal_id) + if "compress_pipeline.cwl" not in step_path: + create_data_relationship(driver, param_node_internal_id, s_node_internal_id) # Create data node with id equal to step_id/output_id and component ID equal to the path of the outer workflow outer_output_id = f"{step['id']}/{output_id}" data_node = ensure_data_node(driver, outer_output_id, cwl_entity['path']) data_node_internal_id = data_node[0] + nice_id = Path(cwl_entity['path']).stem # Create a data edge from the data node to the out-parameter node - create_data_relationship(driver, data_node_internal_id, param_node_internal_id) + create_right_data_relationship(driver, data_node_internal_id, param_node_internal_id, nice_id) def process_cwl_expression(driver: Driver, entity: dict) -> None: expression = entity['expression'] diff --git a/graph_creation/repo_processing.py b/graph_creation/repo_processing.py index bfe8214afedbd0e898e174d5579cfaabaa2bd688..d430708c8ab1a9abbd1385932f97e48fa16960d9 100644 --- a/graph_creation/repo_processing.py +++ b/graph_creation/repo_processing.py @@ -21,7 +21,7 @@ def process_repos(repo_list: list[str], driver: Driver) -> None: pipelines = ["compress_pipeline.cwl"] if any(pipeline in entity['path'] for pipeline in pipelines): component_id = entity['path'] - ensure_main_component_node(driver, component_id) + # ensure_main_component_node(driver, component_id) process_cwl_main_inputs(driver, entity) process_cwl_main_outputs(driver, entity) if entity['class'] == 'Workflow': diff --git a/graph_creation/utils.py b/graph_creation/utils.py index 4c75442d7d6ec08b09695d3fb0dbe16156362502..f06046d03fd5cb72145c3cbbd205775d63a404f4 100644 --- a/graph_creation/utils.py +++ b/graph_creation/utils.py @@ -1,7 +1,26 @@ from pathlib import Path from neo4j import Driver from neo4j_queries.node_queries import ensure_data_node, ensure_in_parameter_node, ensure_main_in_parameter_node -from neo4j_queries.edge_queries import create_data_relationship, create_in_param_relationship, create_main_in_param_relationship +from neo4j_queries.edge_queries import create_compress_control_relationship, create_compress_data_relationship, create_control_relationship, create_data_relationship, create_download_control_relationship, create_download_data_relationship, create_in_param_relationship, create_main_in_param_relationship + + +def create_right_data_relationship(driver: Driver, node_internal_id_1: int, node_internal_id_2: int, nice_id: str) -> None: + if "compress_pipeline" == nice_id: + create_compress_data_relationship(driver, node_internal_id_1, node_internal_id_2) + elif "download_and_compress_pipeline" == nice_id: + create_download_data_relationship(driver, node_internal_id_1, node_internal_id_2) + else: + create_data_relationship(driver, node_internal_id_1, node_internal_id_2) + +def create_right_control_relationship(driver: Driver, node_internal_id_1: int, node_internal_id_2: int, component_id: str) -> None: + nice_id = Path(component_id).stem + if "compress_pipeline" == nice_id: + create_compress_control_relationship(driver, node_internal_id_1, node_internal_id_2, component_id) + elif "download_and_compress_pipeline" == nice_id: + create_download_control_relationship(driver, node_internal_id_1, node_internal_id_2, component_id) + else: + create_control_relationship(driver, node_internal_id_1, node_internal_id_2, component_id) + def create_input_nodes_and_relationships(driver: Driver, input_id: str, component_id: str) -> None: """ @@ -17,6 +36,7 @@ def create_input_nodes_and_relationships(driver: Driver, input_id: str, componen input_id (str): the ID of the input as defined in the CWL component component_id (str): the unique ID of the CWL component (its path) """ + nice_id = Path(component_id).stem # Create in-parameter with the parameter ID as defined in the component and component ID equal to the path of the componet param_node = ensure_in_parameter_node(driver, input_id, component_id) param_node_internal_id = param_node[0] @@ -26,7 +46,7 @@ def create_input_nodes_and_relationships(driver: Driver, input_id: str, componen data_node = ensure_data_node(driver, input_id, component_id) data_node_internal_id = data_node[0] # Create a data edge from the data node to the the in-parameter node - create_data_relationship(driver, data_node_internal_id, param_node_internal_id) + create_right_data_relationship(driver, data_node_internal_id, param_node_internal_id, nice_id) def create_main_input_nodes_and_relationships(driver: Driver, input_id: str, component_id: str) -> None: """ @@ -42,16 +62,17 @@ def create_main_input_nodes_and_relationships(driver: Driver, input_id: str, com input_id (str): the ID of the input as defined in the CWL component component_id (str): the unique ID of the CWL component (its path) """ + nice_id = Path(component_id).stem # Create in-parameter with the parameter ID as defined in the component and component ID equal to the path of the componet param_node = ensure_main_in_parameter_node(driver, input_id, component_id) param_node_internal_id = param_node[0] # Create a data edge from the component node to the in-parameter node - create_main_in_param_relationship(driver, component_id, param_node_internal_id) + # create_main_in_param_relationship(driver, component_id, param_node_internal_id) # Create a data node with component ID of the component and data ID equal to the parameter ID data_node = ensure_data_node(driver, input_id, component_id) data_node_internal_id = data_node[0] # Create a data edge from the data node to the the in-parameter node - create_data_relationship(driver, data_node_internal_id, param_node_internal_id) + create_right_data_relationship(driver, data_node_internal_id, param_node_internal_id, nice_id) def process_source_relationship(driver: Driver, source_id: str, component_id: str, param_node_internal_id: int) -> None: """ @@ -67,9 +88,11 @@ def process_source_relationship(driver: Driver, source_id: str, component_id: st component_id (str): the unique ID of the CWL component (its path) param_node_internal_id (int): the unique ID of the parameter node as defined internally by Neo4j """ + nice_id = Path(component_id).stem data_node = ensure_data_node(driver, source_id, component_id) data_node_internal_id = data_node[0] - create_data_relationship(driver, param_node_internal_id, data_node_internal_id) + create_right_data_relationship(driver, param_node_internal_id, data_node_internal_id, nice_id) + def resolve_relative_path(path: Path)-> Path: """ diff --git a/neo4j_queries/edge_queries.py b/neo4j_queries/edge_queries.py index 4ae474f659b9c6a380ed7b14ddfae8720e3bcc9e..cd38faad739bd5617e73f657cd859d4ee4b6e81e 100644 --- a/neo4j_queries/edge_queries.py +++ b/neo4j_queries/edge_queries.py @@ -143,6 +143,58 @@ def create_data_relationship(driver: Driver, from_internal_node_id: int, to_inte record = result.single() return record["id_1"], record["id_2"] +def create_compress_data_relationship(driver: Driver, from_internal_node_id: int, to_internal_node_id: int) -> tuple[int,int]: + """ + Creates a data dependency relationship in Neo4j between the two nodes with Neo4j internal IDs given as parameters. + This relationship is an outgoing data edge from the node with internal ID from_internal_node_id + to the node with internal ID to_internal_node_id. + + Parameters: + driver (Driver): the Neo4j driver + from_internal_node_id (int): the internal Neo4j ID of the first node + to_internal_node_id (int): the internal Neo4j ID of the second node + + Returns: + tuple[int,int]: from_internal_node_id, to_internal_node_id + """ + query = """ + MATCH (a), (b) + WHERE elementId(a) = $from_internal_node_id AND elementId(b) = $to_internal_node_id + MERGE (a)-[:DATA_COMPRESS]->(b) + RETURN elementId(a) AS id_1, elementId(b) AS id_2 + """ + with driver.session() as session: + result = session.run(query, from_internal_node_id=from_internal_node_id, + to_internal_node_id=to_internal_node_id) + record = result.single() + return record["id_1"], record["id_2"] + +def create_download_data_relationship(driver: Driver, from_internal_node_id: int, to_internal_node_id: int) -> tuple[int,int]: + """ + Creates a data dependency relationship in Neo4j between the two nodes with Neo4j internal IDs given as parameters. + This relationship is an outgoing data edge from the node with internal ID from_internal_node_id + to the node with internal ID to_internal_node_id. + + Parameters: + driver (Driver): the Neo4j driver + from_internal_node_id (int): the internal Neo4j ID of the first node + to_internal_node_id (int): the internal Neo4j ID of the second node + + Returns: + tuple[int,int]: from_internal_node_id, to_internal_node_id + """ + query = """ + MATCH (a), (b) + WHERE elementId(a) = $from_internal_node_id AND elementId(b) = $to_internal_node_id + MERGE (a)-[:DATA_DOWNLOAD]->(b) + RETURN elementId(a) AS id_1, elementId(b) AS id_2 + """ + with driver.session() as session: + result = session.run(query, from_internal_node_id=from_internal_node_id, + to_internal_node_id=to_internal_node_id) + record = result.single() + return record["id_1"], record["id_2"] + def create_control_relationship(driver: Driver, from_internal_node_id: int, to_internal_node_id: int, component_id: str) -> tuple[int,int]: """ @@ -170,6 +222,58 @@ def create_control_relationship(driver: Driver, from_internal_node_id: int, to_i record = result.single() return record["id_1"], record["id_2"] +def create_compress_control_relationship(driver: Driver, from_internal_node_id: int, to_internal_node_id: int, component_id: str) -> tuple[int,int]: + """ + Creates a control dependency relationship in Neo4j between the two nodes with Neo4j internal IDs given as parameters. + This relationship is an outgoing control edge from the node with internal ID from_internal_node_id + to the node with internal ID to_internal_node_id. + + Parameters: + driver (Driver): the Neo4j driver + from_internal_node_id (int): the internal Neo4j ID of the first node + to_internal_node_id (int): the internal Neo4j ID of the second node + + Returns: + tuple[int,int]: from_internal_node_id, to_internal_node_id + """ + query = """ + MATCH (a), (b) + WHERE elementId(a) = $from_internal_node_id AND elementId(b) = $to_internal_node_id + MERGE (a)-[:CONTROL_COMPRESS {component_id: $component_id}]->(b) + RETURN elementId(a) AS id_1, elementId(b) AS id_2 + """ + with driver.session() as session: + result = session.run(query, from_internal_node_id=from_internal_node_id, + to_internal_node_id=to_internal_node_id, component_id=component_id) + record = result.single() + return record["id_1"], record["id_2"] + +def create_download_control_relationship(driver: Driver, from_internal_node_id: int, to_internal_node_id: int, component_id: str) -> tuple[int,int]: + """ + Creates a control dependency relationship in Neo4j between the two nodes with Neo4j internal IDs given as parameters. + This relationship is an outgoing control edge from the node with internal ID from_internal_node_id + to the node with internal ID to_internal_node_id. + + Parameters: + driver (Driver): the Neo4j driver + from_internal_node_id (int): the internal Neo4j ID of the first node + to_internal_node_id (int): the internal Neo4j ID of the second node + + Returns: + tuple[int,int]: from_internal_node_id, to_internal_node_id + """ + query = """ + MATCH (a), (b) + WHERE elementId(a) = $from_internal_node_id AND elementId(b) = $to_internal_node_id + MERGE (a)-[:CONTROL_DOWNLOAD {component_id: $component_id}]->(b) + RETURN elementId(a) AS id_1, elementId(b) AS id_2 + """ + with driver.session() as session: + result = session.run(query, from_internal_node_id=from_internal_node_id, + to_internal_node_id=to_internal_node_id, component_id=component_id) + record = result.single() + return record["id_1"], record["id_2"] + def create_has_child_relationship(driver: Driver, parent_internal_node_id: int, child_internal_node_id: int) -> tuple[int,int]: """ Creates a "has child" relationship in Neo4j between the two nodes with Neo4j internal IDs given as parameters. @@ -204,6 +308,20 @@ def simplify_data_and_control_edges(driver: Driver): """ session.run(create_data_edges_query) + create_data_edges_query = """ + MATCH (n1)-[:DATA_COMPRESS]->(n:Data), (n)-[:DATA_COMPRESS]->(n2) + WITH n, n1, n2, n.component_id AS component_id, n.data_id AS data_id + MERGE (n1)-[:DATA_COMPRESS {component_id: component_id, data_id: data_id}]->(n2) + """ + session.run(create_data_edges_query) + + create_data_edges_query = """ + MATCH (n1)-[:DATA_DOWNLOAD]->(n:Data), (n)-[:DATA_DOWNLOAD]->(n2) + WITH n, n1, n2, n.component_id AS component_id, n.data_id AS data_id + MERGE (n1)-[:DATA_DOWNLOAD {component_id: component_id, data_id: data_id}]->(n2) + """ + session.run(create_data_edges_query) + create_control_edges_query = """ MATCH (n1)-[:CONTROL]->(n:Data), (n)-[:DATA]->(n2) WITH n, n1, n2, n.component_id AS component_id, n.data_id AS data_id diff --git a/neo4j_queries/node_queries.py b/neo4j_queries/node_queries.py index fb2d02699daa51f151f69a7905229ab1c6d3a3f9..919f7f87fe4793bdefdbc11afe87e2441047846d 100644 --- a/neo4j_queries/node_queries.py +++ b/neo4j_queries/node_queries.py @@ -94,10 +94,17 @@ def ensure_main_in_parameter_node(driver: Driver, node_id: str, prefixed_compone tuple[int,str,str]: the Neoj4 internal ID of the parameter node, the parameter ID, the component ID """ component_id = clean_component_id(prefixed_component_id) - query = """ - MERGE (n:MainInParameter {parameter_id: $node_id, component_id: $component_id}) - RETURN elementId(n) AS node_internal_id, n.parameter_id AS id_property, n.component_id AS component_id_property - """ + nice_id = Path(component_id).stem + if nice_id == "compress_pipeline": + query = """ + MERGE (n:CompressInParameter {parameter_id: $node_id, component_id: $component_id}) + RETURN elementId(n) AS node_internal_id, n.parameter_id AS id_property, n.component_id AS component_id_property + """ + else: + query = """ + MERGE (n:DownloadInParameter {parameter_id: $node_id, component_id: $component_id}) + RETURN elementId(n) AS node_internal_id, n.parameter_id AS id_property, n.component_id AS component_id_property + """ with driver.session() as session: result = session.run(query, node_id=node_id, component_id=component_id) record = result.single() @@ -146,10 +153,17 @@ def ensure_main_out_parameter_node(driver: Driver, node_id: str, prefixed_compon tuple[int,str,str]: the Neoj4 internal ID of the parameter node, the parameter ID, the component ID """ component_id = clean_component_id(prefixed_component_id) - query = """ - MERGE (n:MainOutParameter {parameter_id: $node_id, component_id: $component_id}) - RETURN elementId(n) AS node_internal_id, n.parameter_id AS id_property, n.component_id AS component_id_property - """ + nice_id = Path(component_id).stem + if nice_id == "compress_pipeline": + query = """ + MERGE (n:CompressOutParameter {parameter_id: $node_id, component_id: $component_id}) + RETURN elementId(n) AS node_internal_id, n.parameter_id AS id_property, n.component_id AS component_id_property + """ + else: + query = """ + MERGE (n:DownloadOutParameter {parameter_id: $node_id, component_id: $component_id}) + RETURN elementId(n) AS node_internal_id, n.parameter_id AS id_property, n.component_id AS component_id_property + """ with driver.session() as session: result = session.run(query, node_id=node_id, component_id=component_id) record = result.single()