From fcb9826f7f55c199e05e980094f9a4426b83c3e0 Mon Sep 17 00:00:00 2001 From: Chiara Liotta <liotta@astron.nl> Date: Fri, 21 Mar 2025 10:33:34 +0100 Subject: [PATCH] some fixes in graph creation and flow calculation --- graph_creation/cwl_parsing.py | 2 +- graph_creation/cwl_processing.py | 77 ++++++++++++------- graph_creation/repo_processing.py | 3 +- graph_creation/utils.py | 45 ++++++++--- .../metric_calculations/FlowCalculation.py | 10 +-- .../SubgraphPreprocessing.py | 13 ++-- neo4j_graph_queries/create_edge_queries.py | 2 - neo4j_graph_queries/create_node_queries.py | 34 ++------ 8 files changed, 106 insertions(+), 80 deletions(-) diff --git a/graph_creation/cwl_parsing.py b/graph_creation/cwl_parsing.py index 792d07d..3d43db9 100644 --- a/graph_creation/cwl_parsing.py +++ b/graph_creation/cwl_parsing.py @@ -53,4 +53,4 @@ def process_cwl_file(path: str) -> dict: # Add the file path to the dictionary for reference yaml_dict['path'] = path - return yaml_dict \ No newline at end of file + return yaml_dict \ No newline at end of file diff --git a/graph_creation/cwl_processing.py b/graph_creation/cwl_processing.py index 1695f35..f7b5677 100644 --- a/graph_creation/cwl_processing.py +++ b/graph_creation/cwl_processing.py @@ -1,11 +1,22 @@ from pathlib import Path import re from neo4j import Driver -from graph_creation.utils import extract_js_expression_dependencies, get_input_source, process_control_dependencies, process_in_param, process_parameter_source +from graph_creation.utils import extract_js_expression_dependencies, get_input_source, process_control_dependencies, process_in_param, process_output, process_parameter_source, process_step_lookup from neo4j_graph_queries.create_node_queries import ensure_git_node, ensure_in_parameter_node, ensure_out_parameter_node from neo4j_graph_queries.create_edge_queries import create_out_param_relationship, create_references_relationship from neo4j_graph_queries.utils import get_is_workflow + +def process_cwl_entity(driver, entity): + is_workflow = get_is_workflow(entity) + steps = None + if is_workflow: + steps = process_step_lookup(entity) + process_cwl_inputs(driver, entity) + process_cwl_outputs(driver, entity, steps) + if steps: + process_cwl_steps(driver, entity, steps) + def process_cwl_inputs(driver: Driver, cwl_entity: dict) -> None: """ Processes the inputs of a CWL entity, either as a list or a dictionary of inputs, @@ -29,8 +40,11 @@ def process_cwl_inputs(driver: Driver, cwl_entity: dict) -> None: elif isinstance(cwl_entity['inputs'], dict): # If 'inputs' is a dictionary, iterate over the keys (which are the input IDs) input_dict = cwl_entity['inputs'] - for key in input_dict.keys(): - process_in_param(driver, key, component_id, input_dict[key]['type'], cwl_entity['class']) + for key, value in input_dict.items(): + if isinstance(value, dict): + process_in_param(driver, key, component_id, value['type'], cwl_entity['class']) + else: + process_in_param(driver, key, component_id, value, cwl_entity['class']) def process_cwl_outputs(driver: Driver, cwl_entity: dict, step_lookup: dict) -> None: """ @@ -58,28 +72,29 @@ def process_cwl_outputs(driver: Driver, cwl_entity: dict, step_lookup: dict) -> None """ component_id = cwl_entity['path'] - for output in cwl_entity['outputs']: - if isinstance(output, dict): - # Create out-parameter node with the parameter ID as defined in the component - # and component ID equal to the path of the componet - out_param_node = ensure_out_parameter_node(driver, output['id'], component_id, output["type"], cwl_entity['class']) - out_param_node_internal_id = out_param_node[0] - - # If it's not a workflow, create a relationship between the component and the output parameter - is_worflow = get_is_workflow(cwl_entity) - if not is_worflow: - create_out_param_relationship(driver, component_id, out_param_node_internal_id, output['id']) - - # If the output has an 'outputSource', process the relationship(s) to the source(s) + outputs = cwl_entity['outputs'] + + if isinstance(outputs, list): + for output in cwl_entity['outputs']: + output_id = output['id'] + output_source = None if 'outputSource' in output: - # The output source can be a singular ID or a list of IDs - if isinstance(output['outputSource'], str): - source_id = output['outputSource'] - process_parameter_source(driver, out_param_node_internal_id, source_id, component_id, step_lookup) - elif isinstance(output['outputSource'], list): - for source_id in output['outputSource']: - process_parameter_source(driver, out_param_node_internal_id, source_id, component_id, step_lookup) - + output_source = output['outputSource'] + process_output(driver, output_id, output['type'], component_id, cwl_entity['class'], step_lookup, output_source) + else: + process_output(driver, output_id, output['type'], component_id, cwl_entity['class'], step_lookup, output_source) + elif isinstance(outputs, dict): + for output_id, details in outputs.items(): + if isinstance(details, str): + process_output(driver, output_id, details, component_id, cwl_entity['class'], step_lookup) + else: + if 'outputSource' in details: + output_source = details['outputSource'] + process_output(driver, output_id, output['type'], component_id, cwl_entity['class'], step_lookup, output_source) + else: + process_output(driver, output_id, details['type'], component_id, cwl_entity['class'], step_lookup) + + def process_cwl_steps(driver: Driver, cwl_entity: dict, step_lookup) -> None: """ Processes the steps of a CWL entity, creating necessary nodes and relationships @@ -110,7 +125,14 @@ def process_cwl_steps(driver: Driver, cwl_entity: dict, step_lookup) -> None: for step in cwl_entity['steps']: # Get the resolved path of the step from the step_lookup - step_path = step_lookup[step['id']] + step_path: str = step_lookup[step['id']] + + if not isinstance(step['run'], str): + run_dict = step['run'] + run_dict['path'] = step_path + print(f"processing {step_path}") + process_cwl_entity(driver, run_dict) + continue # Process the list of inputs of the step for input in step['in']: @@ -137,8 +159,9 @@ def process_cwl_steps(driver: Driver, cwl_entity: dict, step_lookup) -> None: if ref[0] == "parameter": # Retrieve the source of the referenced input parameter source = get_input_source(step['in'], ref[1]) - else: - # The reference already mentions the source (output of a step) + + if not source: + # The reference already mentions the source (output of a step or workflow input) source = ref[1] if source: # Create control dependencies from the in-parameters of the step to the source of the reference diff --git a/graph_creation/repo_processing.py b/graph_creation/repo_processing.py index 8555ccd..ef8ead4 100644 --- a/graph_creation/repo_processing.py +++ b/graph_creation/repo_processing.py @@ -4,6 +4,7 @@ from graph_creation.docker_parsing import parse_all_dockerfiles from graph_creation.utils import process_step_lookup from graph_creation.cwl_processing import process_cwl_commandline, process_cwl_inputs, process_cwl_outputs, process_cwl_steps from neo4j_graph_queries.utils import get_is_workflow +import pprint def process_repos(repo_list: list[str], driver: Driver) -> None: """ @@ -40,4 +41,4 @@ def process_repos(repo_list: list[str], driver: Driver) -> None: # elif entity['class'] == 'ExpressionTool': # process_cwl_expression(driver, entity) # elif entity['class'] == 'CommandLineTool': - # process_cwl_commandline(driver, entity, links) + # process_cwl_commandline(driver, entity, links) \ No newline at end of file diff --git a/graph_creation/utils.py b/graph_creation/utils.py index c4d69e2..30d9c23 100644 --- a/graph_creation/utils.py +++ b/graph_creation/utils.py @@ -2,7 +2,7 @@ from pathlib import Path from neo4j import Driver import re from neo4j_graph_queries.create_node_queries import ensure_component_node, ensure_in_parameter_node, ensure_out_parameter_node -from neo4j_graph_queries.create_edge_queries import create_control_relationship, create_data_relationship, create_in_param_relationship +from neo4j_graph_queries.create_edge_queries import create_control_relationship, create_data_relationship, create_in_param_relationship, create_out_param_relationship from neo4j_graph_queries.processing_queries import get_all_in_parameter_nodes_of_entity GITLAB_ASTRON ='https://git.astron.nl' @@ -19,14 +19,20 @@ def process_step_lookup(cwl_entity: dict) -> dict[str, str]: dict: A dictionary where each key is the ID of the step in the context of the workflow, and the value is the resolved file path of the step """ step_lookup = {} + # Retrieve the directory containing the workflow file + workflow_folder = Path(cwl_entity['path']).parent + while ".cwl" in str(workflow_folder): + workflow_folder = workflow_folder.parent + for step in cwl_entity['steps']: - # Retrieve the directory containing the workflow file - workflow_folder = Path(cwl_entity['path']).parent # Resolve the full path of the step file by combining the workflow folder and the step's 'run' path - full_step_path = workflow_folder / Path(step['run']) + if isinstance(step['run'], str): + full_step_path = workflow_folder / Path(step['run']) + step_path = resolve_relative_path(full_step_path) + else: + step_path = Path(cwl_entity['path']) / step['id'] # Resolve the path (deal with "./" and "../") - step_path = str(resolve_relative_path(full_step_path)) - step_lookup[step['id']] = step_path + step_lookup[step['id']] = str(step_path) return step_lookup def process_in_param(driver: Driver, param_id: str, component_id: str, param_type: str, component_type: str) -> None: @@ -43,10 +49,8 @@ def process_in_param(driver: Driver, param_id: str, component_id: str, param_typ Returns: None """ - param_node = ensure_in_parameter_node(driver, param_id, component_id, param_type, component_type) if component_type != "Workflow": - ensure_component_node(driver, component_id) create_in_param_relationship(driver, component_id, param_node[0], param_node[1]) def process_parameter_source(driver: Driver, param_node_internal_id: int, source_id: str, workflow_id: str, step_lookup: dict, step_id: str = "") -> None: @@ -67,7 +71,7 @@ def process_parameter_source(driver: Driver, param_node_internal_id: int, source source_param_node = get_source_node(driver, source_id, workflow_id, step_lookup) # Create a relationship between the parameter node and its source - create_data_relationship(driver, source_param_node, param_node_internal_id,workflow_id, source_id, step_id) + create_data_relationship(driver, source_param_node, param_node_internal_id, workflow_id, source_id, step_id) def get_source_node(driver: Driver, source_id: str, workflow_id: str, step_lookup: dict) -> int: @@ -189,3 +193,26 @@ def get_input_source(inputs: list[dict], input_id: str) -> str | None: if inp["id"] == input_id: return inp.get("source") # returns None if 'source' doesn't exist return None # returns None if input_id is not found + +def process_output(driver, output_id, output_type, component_id, component_type, step_lookup, output_source = None): + # Create out-parameter node with the parameter ID as defined in the component + # and component ID equal to the path of the componet + out_param_node = ensure_out_parameter_node(driver, output_id, component_id, output_type, component_type) + out_param_node_internal_id = out_param_node[0] + + # If it's not a workflow, create a relationship between the component and the output parameter + if component_type != "Workflow": + create_out_param_relationship(driver, component_id, out_param_node_internal_id, output_id) + + # If the output has an 'outputSource', process the relationship(s) to the source(s) + if output_source: + # The output source can be a singular ID or a list of IDs + if isinstance(output_source, str): + source_id = output_source + print(source_id) + process_parameter_source(driver, out_param_node_internal_id, source_id, component_id, step_lookup) + elif isinstance(output_source, list): + for source_id in output_source: + print(source_id) + process_parameter_source(driver, out_param_node_internal_id, source_id, component_id, step_lookup) + diff --git a/graph_traversal/metric_calculations/FlowCalculation.py b/graph_traversal/metric_calculations/FlowCalculation.py index 58f7754..bcfa305 100644 --- a/graph_traversal/metric_calculations/FlowCalculation.py +++ b/graph_traversal/metric_calculations/FlowCalculation.py @@ -62,9 +62,8 @@ class FlowCalculation: paths: dict[str, dict[str, list]] = {} workflow_ids = sorted_components for workflow in workflow_ids: - if "example" not in workflow: - print(f"Preprocessing: {workflow}") - self.bsf_traverse_paths_change_impact(session, workflow, bookkeeping, paths) + print(f"Preprocessing: {workflow}") + self.bsf_traverse_paths_change_impact(session, workflow, bookkeeping, paths) with open("flow_paths.json", "w") as json_file: json.dump(paths, json_file, indent=4) @@ -201,8 +200,9 @@ class FlowCalculation: self.process_direct_indirect_flow_of_node_id(node_id, component_id, outer_workflows, component_stack, step_stack, bookkeeping, paths, direct=True) - # Increment depth as we move deeper into the traversal - depth = depth + 1 + # Increment depth as we move deeper into the traversal, unless we just entered a workflow + if component_type != "Workflow": + depth = depth + 1 # If the stack is empty, return early if not component_stack: continue diff --git a/graph_traversal/subgraph_preprocessing/SubgraphPreprocessing.py b/graph_traversal/subgraph_preprocessing/SubgraphPreprocessing.py index 8c3931d..a9fc4fc 100644 --- a/graph_traversal/subgraph_preprocessing/SubgraphPreprocessing.py +++ b/graph_traversal/subgraph_preprocessing/SubgraphPreprocessing.py @@ -29,14 +29,15 @@ class SubgraphPreprocessing: for workflow in outer_workflow_ids: print(f"Preprocessing: {workflow}") self.traverse_graph_process_paths(session, workflow, bookkeeping) - control_pairs = get_nodes_with_control_edges(session) - for pair in control_pairs: - target_id = pair["targetId"] - edge_component_id = pair["componentId"] - edge_id = pair["edgeId"] + control_tuples = get_nodes_with_control_edges(session) + for tuple in control_tuples: + target_id = tuple["targetId"] + edge_component_id = tuple["componentId"] + edge_id = tuple["edgeId"] result = get_workflow_list_of_data_edges_from_node(session, target_id, edge_component_id) workflow_lists = [record["workflow_list"] for record in result] - update_workflow_list_of_edge(session, edge_id, workflow_lists[0]) + if len(workflow_lists) > 0: + update_workflow_list_of_edge(session, edge_id, workflow_lists[0]) def traverse_graph_process_paths(self, session: Session, component_id: str, bookkeeping): diff --git a/neo4j_graph_queries/create_edge_queries.py b/neo4j_graph_queries/create_edge_queries.py index 4e8f000..05552a7 100644 --- a/neo4j_graph_queries/create_edge_queries.py +++ b/neo4j_graph_queries/create_edge_queries.py @@ -41,8 +41,6 @@ def create_out_param_relationship(driver: Driver, prefixed_component_id: str, pa """ component_id = clean_component_id(prefixed_component_id) component_node_id = ensure_component_node(driver, component_id)[0] - print(parameter_internal_id) - print(component_node_id) return create_data_relationship(driver, component_node_id, parameter_internal_id, component_id, parameter_id) diff --git a/neo4j_graph_queries/create_node_queries.py b/neo4j_graph_queries/create_node_queries.py index 7847346..c762122 100644 --- a/neo4j_graph_queries/create_node_queries.py +++ b/neo4j_graph_queries/create_node_queries.py @@ -1,4 +1,5 @@ import json +from pathlib import Path from neo4j import Driver from neo4j_graph_queries.utils import clean_component_id @@ -16,12 +17,14 @@ def ensure_component_node(driver: Driver, prefixed_component_id: str) -> tuple[i tuple[int,str]: the Neoj4 internal ID of the component node, the component ID of the component """ component_id = clean_component_id(prefixed_component_id) + nice_id = Path(component_id).stem query = """ MERGE (c:Component {component_id: $component_id}) + SET c.nice_id = $nice_id RETURN elementId(c) AS node_internal_id, c.component_id AS component_id """ with driver.session() as session: - result = session.run(query, component_id=component_id) + result = session.run(query, component_id=component_id, nice_id=nice_id) record = result.single() return record["node_internal_id"], record["component_id"] @@ -125,31 +128,4 @@ def ensure_out_parameter_node(driver: Driver, parameter_id: str, prefixed_compon new_param_type = " OR ".join(new_list) result = session.run(query, parameter_id=parameter_id, component_id=component_id, param_type=new_param_type, component_type=component_type) record = result.single() - return record["node_id"], record["parameter_id"], record["component_id"] - -def ensure_data_node(driver: Driver, node_id: str, prefixed_component_id: str) -> tuple[int,str,str]: - """ - Ensures that there exists a data node with ID node_id - associated with the component in the file with local path prefixed_component_id. - The ID of the component can be given based on the local relative path, so it is cleaned - before querying Neo4j. - - Parameters: - driver (Driver): the Neo4j driver - node_id (str): the ID of the data - prefixed_component_id (str): the local relative path of the component - - Returns: - tuple[int,str,str]: the Neoj4 internal ID of the data node, the data ID, the component ID - """ - component_id = clean_component_id(prefixed_component_id) - query = """ - MERGE (n:Data {data_id: $node_id, component_id: $component_id}) - RETURN elementId(n) AS node_internal_id, n.data_id AS id_property, n.component_id AS component_id_property - """ - with driver.session() as session: - result = session.run(query, node_id=node_id, component_id=component_id) - record = result.single() - return record["node_internal_id"], record["id_property"], record["component_id_property"] - - \ No newline at end of file + return record["node_id"], record["parameter_id"], record["component_id"] \ No newline at end of file -- GitLab