From b218400b5edbd2e3a63ce05ddf5738d633bfcf40 Mon Sep 17 00:00:00 2001 From: Chiara Liotta <liotta@astron.nl> Date: Thu, 28 Nov 2024 12:53:52 +0100 Subject: [PATCH] add comments and docs --- graph_creation/cwl_parsing.py | 16 +++- graph_creation/cwl_processing.py | 126 ++++++++++++++++++++++-------- graph_creation/repo_processing.py | 11 ++- graph_creation/utils.py | 36 +++++++-- main.py | 13 ++- neo4j_queries/edge_queries.py | 43 ++++++++++ neo4j_queries/node_queries.py | 52 ++++++++++-- neo4j_queries/utils.py | 9 +++ 8 files changed, 257 insertions(+), 49 deletions(-) diff --git a/graph_creation/cwl_parsing.py b/graph_creation/cwl_parsing.py index 9417087..53dc061 100644 --- a/graph_creation/cwl_parsing.py +++ b/graph_creation/cwl_parsing.py @@ -3,12 +3,26 @@ from cwl_utils.parser import save from cwl_utils.parser.cwl_v1_2_utils import load_inputfile def get_cwl_from_repo(repo_path: str) -> list[dict]: + """ + Given the path of a local repository, it processes all the CWL files in the repository. + Each CWL file is parsed into a dictionary using the cwl_utils library. + The path is saved using the key 'path' with value equal to the relative path of the CWL file. + + Parameters: + repo_path (str): the path of the local repository + + Returns: + list[dict]: a list of dictonaries, each dictionary is a parsed CWL file + """ cwl_entities = [] pathlist = Path(repo_path).glob('**/*.cwl') for path in pathlist: - path_in_str = str(path) + path_in_str = str(path) + # Parse CWL file cwl_obj = load_inputfile(path_in_str) + # Save parsed file into a dictionary saved_obj = save(cwl_obj, relative_uris=True) + # Save the path of the CWL file saved_obj['path'] = path_in_str cwl_entities.append(saved_obj) diff --git a/graph_creation/cwl_processing.py b/graph_creation/cwl_processing.py index fe231ab..2446396 100644 --- a/graph_creation/cwl_processing.py +++ b/graph_creation/cwl_processing.py @@ -4,70 +4,134 @@ from neo4j_queries.node_queries import ensure_component_node, ensure_data_node, from neo4j_queries.edge_queries import create_data_relationship, create_out_param_relationship from pathlib import Path +# TODO: deal with inputBindings def process_cwl_inputs(driver: Driver, cwl_entity: dict) -> None: + """ + Processes the inputs of a CWL component (Workflow, CommandLineTool, or ExpressionTool) + For each input the following nodes and edges are created: + - an in-parameter node with the parameter ID as defined in the component and component ID equal to the path of the componet + - a data node with component ID of the component and data ID equal to the parameter ID + - a data edge from the component node to the in-parameter node + - a data edge from the data node to the the in-parameter node + + Parameters: + driver (Driver): the driver used to connect to Neo4j + cwl_entity (dict): the dictionary containing the parsed contents of the CWL component + """ component_id = cwl_entity['path'] + # Inputs can be defined a list or a dictionary if type(cwl_entity['inputs']) == list: + # List of dictionaries + # each element is identifiable via the key 'id' for input in cwl_entity['inputs']: if type(input) == dict: create_input_nodes_and_relationships(driver, input['id'], component_id) elif type(cwl_entity['inputs']) == dict: + # Dictionary where each key is the ID of the input + # the value is a dictionary containing other properties for key in cwl_entity['inputs'].keys(): create_input_nodes_and_relationships(driver, key, component_id) +# TODO: deal with outputBindings def process_cwl_outputs(driver: Driver, cwl_entity: dict) -> None: + """ + Processes the outputs of a CWL component (Workflow, CommandLineTool, or ExpressionTool) + For each output the following nodes and edges are created: + - an out-parameter node with the parameter ID as defined in the component and component ID equal to the path of the componet + - a data node with component ID of the component and data ID equal to output source defined in the component + - a data edge from the out-parameter node to the component node + - a data edge from the out-parameter node to the data node + + Parameters: + driver (Driver): the driver used to connect to Neo4j + cwl_entity (dict): the dictionary containing the parsed contents of the CWL component + """ component_id = cwl_entity['path'] for output in cwl_entity['outputs']: if type(output) == dict: - # Create out-parameter node o_node with id = o.id and component_id = c_node.id + # Create out-parameter node with the parameter ID as defined in the component + # and component ID equal to the path of the componet param_node = ensure_parameter_node(driver, output['id'], component_id, 'out') - # Create a directed data edge from o_node to c_node param_node_internal_id = param_node[0] + # Create out-parameter node with the parameter ID as defined in the component + # and component ID equal to the path of the componet create_out_param_relationship(driver, component_id, param_node_internal_id) + # Create a data node with component ID of the component and data ID equal to output source defined in the component + # and a data edge from the out-parameter node to the data node if 'outputSource' in output: + # the output source can be a singular ID or a list of IDs if type(output['outputSource']) == str: process_source_relationship(driver, output['outputSource'], component_id, param_node_internal_id) elif type(output['outputSource']) == list: - for o in output['outputSource']: - process_source_relationship(driver, o, component_id, param_node_internal_id) - -def process_cwl_steps(driver: Driver, cwl_entity: dict, repo: str) -> None: + for source_id in output['outputSource']: + process_source_relationship(driver, source_id, component_id, param_node_internal_id) + +def process_cwl_steps(driver: Driver, cwl_entity: dict, repo_path: str) -> None: + """ + Processes the steps of a CWL Workflow component( which we will refer to as outer workflow component). + A step can be a Workflow, CommandLineTool or ExpressionTool. + For each step, a component node is created with component ID equal to the path of the step. + Then, the lists of inputs and outputs are processed. + For each input, the following nodes and edges are created: + - in-parameter node with ID as defined in the component and component ID equal to the path of the step + - a data edge from the step component node to the in-parameter node + - potentially a data node corresponding to the source of the input, with ID equal to the source ID defined in the outer workflow + and component ID equal to the path of the outer workflow + - potentially a data edge from the in-parameter node to the data node of the source + + For each output, the following nodes and edges are created: + - out-parameter node with ID as defined in the component and component ID equal to the path of the step + - a data edge from the out-parameter node to the step component node + - a data node representing the outer-workflow-level output, with ID equal to [step id]/[output id as defined in workflow] + and component ID equal to the path of the outer workflow + - a data edge from the out-parameter node to the data node + + Parameters: + driver (Driver): the driver used to connect to Neo4j + cwl_entity (dict): the dictionary containing the parsed contents of the CWL component + repo_path (str): the path of the repository that contains the CWL component + """ for step in cwl_entity['steps']: - combined_path = Path(repo) / step['run'] + # Retrieve path of the step + combined_path = Path(repo_path) / step['run'] step_path = str(combined_path) - # if a component node with the same path (run) as s does not exist then - # Create component node s_node unique to s with id equal to run + # Create the step component node with ID equal to the step s_node = ensure_component_node(driver, step_path) s_node_internal_id = s_node[0] - for i in step['in']: - # Create in-parameter node i_node with id = i.id and component_id = s.run - param_node = ensure_parameter_node(driver, i['id'], step_path, 'in') + + # Process the list of inputs of the step + for input in step['in']: + # Create in-parameter node with ID as defined in the component and component ID equal to the path of the step + param_node = ensure_parameter_node(driver, input['id'], step_path, 'in') param_node_internal_id = param_node[0] - # Create a data edge from s_node to i_node + # Create a data edge from the step component node to the in-parameter node create_data_relationship(driver, s_node_internal_id, param_node_internal_id) - if 'source' in i: - if type(i['source']) == str: - source_id = i['source'] + # Inputs can have one or multiple data sources (data nodes) + # A data edge is drawn from the in-parameter node to the data node of the source + if 'source' in input: + if type(input['source']) == str: + source_id = input['source'] process_source_relationship(driver, source_id, cwl_entity['path'], param_node_internal_id) - elif type(i['source']) == list: - for source_id in i['source']: + elif type(input['source']) == list: + for source_id in input['source']: process_source_relationship(driver, source_id, cwl_entity['path'], param_node_internal_id) - for o in step['out']: - if type(o) == dict: - o_id = o['id'] + # Process the list of outputs of the step + for output in step['out']: + # An output can be defined as a dictionary or simply as a string (ID only) + if type(output) == dict: + output_id = output['id'] else: - o_id = o - # Create out-parameter node o_node with id = o.id and component_id = s.run - param_node = ensure_parameter_node(driver, o_id, step_path, 'out') + output_id = output + # Create out-parameter node with ID as defined in the component and component ID equal to the path of the step + param_node = ensure_parameter_node(driver, output_id, step_path, 'out') param_node_internal_id = param_node[0] - # Create a data edge from o_node to s_node + # Create a data edge from out-parameter node to the step component node create_data_relationship(driver, param_node_internal_id, s_node_internal_id) - # Workflow-level outputs of a step have \texttt{id} corresponding to \texttt{[[step ID]/[output ID as defined in workflow]]} - # and a \texttt{component\_id} property equal to the ID of the workflow - # Create data node o_data_node with id = step_id/output_id and component_id = c_node.id - output_id = f"{step['id']}/{o_id}" - data_node = ensure_data_node(driver, output_id, cwl_entity['path']) + # Create data node with id equal to step_id/output_id and component ID equal to the path of the outer workflow + outer_output_id = f"{step['id']}/{output_id}" + data_node = ensure_data_node(driver, outer_output_id, cwl_entity['path']) data_node_internal_id = data_node[0] - # Create a data edge from o_node to o_data_node + # Create a data edge from the out-parameter node to the data node create_data_relationship(driver, param_node_internal_id, data_node_internal_id) \ No newline at end of file diff --git a/graph_creation/repo_processing.py b/graph_creation/repo_processing.py index e36dd5f..9e53fbb 100644 --- a/graph_creation/repo_processing.py +++ b/graph_creation/repo_processing.py @@ -4,12 +4,19 @@ from graph_creation.cwl_processing import process_cwl_inputs, process_cwl_output from neo4j_queries.node_queries import ensure_component_node def process_repos(repo_list: list[str], driver: Driver) -> None: + """ + Given a list of paths to local repositories and a Neo4j driver, + the function parses the CWL files and turns them into a Neo4j dependency graph. + + Parameters: + repo_list (list[str]): a list of paths to local repositories + driver (Driver): a Neo4j driver + """ cwl_entities = {} for repo in repo_list: + # Parse CWL files cwl_entities[repo]= get_cwl_from_repo(repo) for entity in cwl_entities[repo]: - # if a component node with the same path as c does not exist then - # create component node c_node unique to c with id equal to path and alias equal to a empty dictionary component_id = entity['path'] ensure_component_node(driver, component_id) process_cwl_inputs(driver, entity) diff --git a/graph_creation/utils.py b/graph_creation/utils.py index de3dabd..86e0018 100644 --- a/graph_creation/utils.py +++ b/graph_creation/utils.py @@ -3,18 +3,44 @@ from neo4j_queries.node_queries import ensure_data_node, ensure_parameter_node from neo4j_queries.edge_queries import create_data_relationship, create_in_param_relationship def create_input_nodes_and_relationships(driver: Driver, input_id: str, component_id: str) -> None: - # Create in-parameter node i_node with id = i.id and component_id = c_node.id + """ + Processes a single input tied to a specific CWL component. + The following nodes and edges are created: + - an in-parameter node with the parameter ID as defined in the component and component ID equal to the path of the componet + - a data node with component ID of the component and data ID equal to the parameter ID + - a data edge from the component node to the in-parameter node + - a data edge from the data node to the the in-parameter node + + Parameters: + driver (Driver): the driver used to connect to Neo4j + input_id (str): the ID of the input as defined in the CWL component + component_id (str): the unique ID of the CWL component (its path) + """ + # Create in-parameter with the parameter ID as defined in the component and component ID equal to the path of the componet param_node = ensure_parameter_node(driver, input_id, component_id, 'in') param_node_internal_id = param_node[0] - # Create a directed data edge from c_node to i_node + # Create a data edge from the component node to the in-parameter node create_in_param_relationship(driver, component_id, param_node_internal_id) - # Create a data node i_data_node with id = i.id and component_id = c_node.id + # Create a data node with component ID of the component and data ID equal to the parameter ID data_node = ensure_data_node(driver, input_id, component_id) data_node_internal_id = data_node[0] - # Create a data edge from i_data_node to i_node + # Create a data edge from the data node to the the in-parameter node create_data_relationship(driver, data_node_internal_id, param_node_internal_id) -def process_source_relationship(driver: Driver, source_id: str, component_id: str, param_node_internal_id: str) -> None: +def process_source_relationship(driver: Driver, source_id: str, component_id: str, param_node_internal_id: int) -> None: + """ + Processes a source relationship between a data node and a parameter node. + The data node does not need to exist already, while the parameter node must have already been created. + The following nodes and edges are created: + - a data node with ID equal to source_id and component ID equal to the path of the component it belongs to + - a data edge from the parameter node to the data node + + Parameters: + driver (Driver): the driver used to connect to Neo4j + source_id (str): the ID of the data that functions as a source for the parameter + component_id (str): the unique ID of the CWL component (its path) + param_node_internal_id (int): the unique ID of the parameter node as defined internally by Neo4j + """ data_node = ensure_data_node(driver, source_id, component_id) data_node_internal_id = data_node[0] create_data_relationship(driver, param_node_internal_id, data_node_internal_id) \ No newline at end of file diff --git a/main.py b/main.py index a961ca2..81e6ccf 100644 --- a/main.py +++ b/main.py @@ -5,7 +5,15 @@ import os import gitlab import subprocess -def clone_repos(repo_list: list[str], folder_name: str): +def clone_repos(repo_list: list[str], folder_name: str) -> None: + """ + Given a list of relative paths to ASTRON GitLab repositories and the name of a folder, + the mentioned repositories are cloned into the mentioned folder. + + Parameters: + repo_list (list[str]): list of relative paths to ASTRON GitLab repositories + folder_name (str): the name of the folder to clone the repos into + """ gl = gitlab.Gitlab('https://git.astron.nl') projects = gl.projects.list(iterator=True, get_all=True) for project in projects: @@ -19,6 +27,7 @@ if __name__ == '__main__': folder = 'repos' clone_repos(relevant_repos, folder) + # Get the authentication details for Neo4j instance load_status = dotenv.load_dotenv("Neo4j-25ebc0db-Created-2024-11-17.txt") if load_status is False: raise RuntimeError('Environment variables not loaded.') @@ -27,7 +36,7 @@ if __name__ == '__main__': AUTH = (os.getenv("NEO4J_USERNAME"), os.getenv("NEO4J_PASSWORD")) repo_paths = [f'{folder}/{path}' for path in relevant_repos] - print(repo_paths) + with GraphDatabase.driver(URI, auth=AUTH) as driver: driver.verify_connectivity() print("Connection established.") diff --git a/neo4j_queries/edge_queries.py b/neo4j_queries/edge_queries.py index ff48f4c..02d8128 100644 --- a/neo4j_queries/edge_queries.py +++ b/neo4j_queries/edge_queries.py @@ -2,6 +2,21 @@ from neo4j import Driver from neo4j_queries.utils import clean_component_id def create_in_param_relationship(driver: Driver, prefixed_component_id: str, parameter_internal_id: int) -> tuple[str,str]: + """ + Creates a data dependency relationship in Neo4j between a component node with path prefixed_component_id + and an in-parameter node with Neo4j internal ID parameter_internal_id. + This relationship is an outgoing data edge from the component to the in-parameter node. + The ID of the component can be given based on the local relative path, so it needs to be cleaned + before querying Neo4j. + + Parameters: + driver (Driver): the Neo4j driver + prefixed_component_id (str): the local relative path of the component + parameter_internal_id (int): the internal Neo4j ID of the in-parameter node + + Returns: + tuple[str,str]: the component ID of the component, the parameter ID of the parameter + """ component_id = clean_component_id(prefixed_component_id) query = """ MATCH (c:Component {component_id: $component_id}), (p) @@ -16,6 +31,21 @@ def create_in_param_relationship(driver: Driver, prefixed_component_id: str, par return record["component_id"], record["parameter_id"] def create_out_param_relationship(driver: Driver, prefixed_component_id: str, parameter_internal_id: int) -> tuple[str,str]: + """ + Creates a data dependency relationship in Neo4j between a component node with path prefixed_component_id + and an out-parameter node with Neo4j internal ID parameter_internal_id. + This relationship is an outgoing data edge from the out-parameter to the component node. + The ID of the component can be given based on the local relative path, so it needs to be cleaned + before querying Neo4j. + + Parameters: + driver (Driver): the Neo4j driver + prefixed_component_id (str): the local relative path of the component + parameter_internal_id (int): the internal Neo4j ID of the out-parameter node + + Returns: + tuple[str,str]: the component ID of the component, the parameter ID of the parameter + """ component_id = clean_component_id(prefixed_component_id) query = """ MATCH (c:Component {component_id: $component_id}), (p) @@ -30,6 +60,19 @@ def create_out_param_relationship(driver: Driver, prefixed_component_id: str, pa return record["component_id"], record["parameter_id"] def create_data_relationship(driver: Driver, from_internal_node_id: int, to_internal_node_id: int) -> tuple[int,int]: + """ + Creates a data dependency relationship in Neo4j between the two nodes with Neo4j internal IDs given as parameters. + This relationship is an outgoing data edge from the node with internal ID from_internal_node_id + to the node with internal ID to_internal_node_id. + + Parameters: + driver (Driver): the Neo4j driver + from_internal_node_id (int): the internal Neo4j ID of the first node + to_internal_node_id (int): the internal Neo4j ID of the second node + + Returns: + tuple[str,str]: from_internal_node_id, to_internal_node_id + """ query = """ MATCH (a), (b) WHERE id(a) = $from_internal_node_id AND id(b) = $to_internal_node_id diff --git a/neo4j_queries/node_queries.py b/neo4j_queries/node_queries.py index 7bdd951..b2dbb9f 100644 --- a/neo4j_queries/node_queries.py +++ b/neo4j_queries/node_queries.py @@ -3,24 +3,48 @@ from neo4j import Driver from neo4j_queries.utils import clean_component_id def ensure_component_node(driver: Driver, prefixed_component_id: str) -> tuple[int,str]: + """ + Ensures that there exists a component node corresponding to the file with local path prefixed_component_id. + The ID of the component can be given based on the local relative path, so it is cleaned + before querying Neo4j. + + Parameters: + driver (Driver): the Neo4j driver + prefixed_component_id (str): the local relative path of the component + + Returns: + tuple[int,str]: the Neoj4 internal ID of the component node, the component ID of the component + """ component_id = clean_component_id(prefixed_component_id) query = """ MERGE (c:Component {component_id: $component_id}) - RETURN id(c) AS node_internal_id, c.id AS id_property + RETURN id(c) AS node_internal_id, c.component_id AS component_id """ with driver.session() as session: result = session.run(query, component_id=component_id) record = result.single() - return record["node_internal_id"], record["id_property"] + return record["node_internal_id"], record["component_id"] def ensure_parameter_node(driver: Driver, node_id: str, prefixed_component_id: str, param_type: str) \ -> tuple[int,str,str,str]: + """ + Ensures that there exists a parameter node with ID node_id and type param_type + associated with the component in the file with local path prefixed_component_id. + The ID of the component can be given based on the local relative path, so it is cleaned + before querying Neo4j. + + Parameters: + driver (Driver): the Neo4j driver + node_id (str): the ID of the parameter + prefixed_component_id (str): the local relative path of the component + param_type (str): the type of the parameter ('in' or 'out') + + Returns: + tuple[int,str,str, str]: the Neoj4 internal ID of the parameter node, the parameter ID, the component ID, the parameter type + """ component_id = clean_component_id(prefixed_component_id) query = """ - MERGE (n:Parameter {parameter_id: $node_id, component_id: $component_id}) - ON CREATE SET - n.component_id = $component_id, - n.parameter_type = $param_type + MERGE (n:Parameter {parameter_id: $node_id, component_id: $component_id, parameter_type: $param_type}) RETURN id(n) AS node_internal_id, n.parameter_id AS id_property, n.component_id AS component_id_property, n.parameter_type AS parameter_type_property """ @@ -30,11 +54,23 @@ def ensure_parameter_node(driver: Driver, node_id: str, prefixed_component_id: s return record["node_internal_id"], record["id_property"], record["component_id_property"], record['parameter_type_property'] def ensure_data_node(driver: Driver, node_id: str, prefixed_component_id: str) -> tuple[int,str,str]: + """ + Ensures that there exists a data node with ID node_id + associated with the component in the file with local path prefixed_component_id. + The ID of the component can be given based on the local relative path, so it is cleaned + before querying Neo4j. + + Parameters: + driver (Driver): the Neo4j driver + node_id (str): the ID of the data + prefixed_component_id (str): the local relative path of the component + + Returns: + tuple[int,str,str, str]: the Neoj4 internal ID of the data node, the data ID, the component ID + """ component_id = clean_component_id(prefixed_component_id) query = """ MERGE (n:Data {data_id: $node_id, component_id: $component_id}) - ON CREATE SET - n.component_id = $component_id RETURN id(n) AS node_internal_id, n.data_id AS id_property, n.component_id AS component_id_property """ with driver.session() as session: diff --git a/neo4j_queries/utils.py b/neo4j_queries/utils.py index c00f1b5..90785a7 100644 --- a/neo4j_queries/utils.py +++ b/neo4j_queries/utils.py @@ -1,3 +1,12 @@ def clean_component_id(prefixed_component_id: str) -> str: + """ + Cleans the local folder name (repos) from the repository path. + + Parameters: + prefixed_component_id (str): the local relative path of a file in a repository located in the "repos" folder + + Returns: + str: the cleaned relative path of a file + """ component_id = prefixed_component_id.removeprefix("repos\\") return component_id \ No newline at end of file -- GitLab