Skip to content
Snippets Groups Projects
Commit 6f1c8163 authored by Chiara Liotta's avatar Chiara Liotta
Browse files

show two pipelines in separate graphs

parent f9c046d3
No related branches found
No related tags found
No related merge requests found
from neo4j import Driver from neo4j import Driver
from graph_creation.cst_processing import traverse_and_create, traverse_when_statement_extract_dependencies from graph_creation.cst_processing import traverse_and_create, traverse_when_statement_extract_dependencies
from graph_creation.utils import create_input_nodes_and_relationships, process_source_relationship, resolve_relative_path from graph_creation.utils import create_input_nodes_and_relationships, create_main_input_nodes_and_relationships, process_source_relationship, resolve_relative_path
from neo4j_queries.node_queries import ensure_component_node, ensure_data_node, ensure_in_parameter_node, ensure_out_parameter_node, get_wf_data_nodes_from_step_in_param from neo4j_queries.node_queries import ensure_component_node, ensure_data_node, ensure_in_parameter_node, ensure_main_out_parameter_node, ensure_out_parameter_node
from neo4j_queries.edge_queries import create_control_relationship, create_data_relationship, create_out_param_relationship from neo4j_queries.edge_queries import create_control_relationship, create_data_relationship, create_main_out_param_relationship, create_out_param_relationship
from pathlib import Path from pathlib import Path
from parsers.javascript_parsing import parse_javascript_expression_string, parse_javascript_string from parsers.javascript_parsing import parse_javascript_expression_string, parse_javascript_string
...@@ -35,6 +35,34 @@ def process_cwl_inputs(driver: Driver, cwl_entity: dict) -> None: ...@@ -35,6 +35,34 @@ def process_cwl_inputs(driver: Driver, cwl_entity: dict) -> None:
for key in cwl_entity['inputs'].keys(): for key in cwl_entity['inputs'].keys():
create_input_nodes_and_relationships(driver, key, component_id) create_input_nodes_and_relationships(driver, key, component_id)
def process_cwl_main_inputs(driver: Driver, cwl_entity: dict) -> None:
"""
Processes the inputs of a CWL component (Workflow, CommandLineTool, or ExpressionTool)
For each input the following nodes and edges are created:
- an in-parameter node with the parameter ID as defined in the component and component ID equal to the path of the componet
- a data node with component ID of the component and data ID equal to the parameter ID
- a data edge from the component node to the in-parameter node
- a data edge from the data node to the the in-parameter node
Parameters:
driver (Driver): the driver used to connect to Neo4j
cwl_entity (dict): the dictionary containing the parsed contents of the CWL component
"""
component_id = cwl_entity['path']
# Inputs can be defined a list or a dictionary
if type(cwl_entity['inputs']) == list:
# List of dictionaries
# each element is identifiable via the key 'id'
for input in cwl_entity['inputs']:
if type(input) == dict:
create_main_input_nodes_and_relationships(driver, input['id'], component_id)
elif type(cwl_entity['inputs']) == dict:
# Dictionary where each key is the ID of the input
# the value is a dictionary containing other properties
for key in cwl_entity['inputs'].keys():
create_main_input_nodes_and_relationships(driver, key, component_id)
# TODO: deal with outputBindings # TODO: deal with outputBindings
def process_cwl_outputs(driver: Driver, cwl_entity: dict) -> None: def process_cwl_outputs(driver: Driver, cwl_entity: dict) -> None:
""" """
...@@ -69,6 +97,39 @@ def process_cwl_outputs(driver: Driver, cwl_entity: dict) -> None: ...@@ -69,6 +97,39 @@ def process_cwl_outputs(driver: Driver, cwl_entity: dict) -> None:
for source_id in output['outputSource']: for source_id in output['outputSource']:
process_source_relationship(driver, source_id, component_id, param_node_internal_id) process_source_relationship(driver, source_id, component_id, param_node_internal_id)
def process_cwl_main_outputs(driver: Driver, cwl_entity: dict) -> None:
"""
Processes the outputs of a CWL component (Workflow, CommandLineTool, or ExpressionTool)
For each output the following nodes and edges are created:
- an out-parameter node with the parameter ID as defined in the component and component ID equal to the path of the componet
- a data node with component ID of the component and data ID equal to output source defined in the component
- a data edge from the out-parameter node to the component node
- a data edge from the out-parameter node to the data node
Parameters:
driver (Driver): the driver used to connect to Neo4j
cwl_entity (dict): the dictionary containing the parsed contents of the CWL component
"""
component_id = cwl_entity['path']
for output in cwl_entity['outputs']:
if type(output) == dict:
# Create out-parameter node with the parameter ID as defined in the component
# and component ID equal to the path of the componet
param_node = ensure_main_out_parameter_node(driver, output['id'], component_id)
param_node_internal_id = param_node[0]
# Create out-parameter node with the parameter ID as defined in the component
# and component ID equal to the path of the componet
create_main_out_param_relationship(driver, component_id, param_node_internal_id)
# Create a data node with component ID of the component and data ID equal to output source defined in the component
# and a data edge from the out-parameter node to the data node
if 'outputSource' in output:
# the output source can be a singular ID or a list of IDs
if type(output['outputSource']) == str:
process_source_relationship(driver, output['outputSource'], component_id, param_node_internal_id)
elif type(output['outputSource']) == list:
for source_id in output['outputSource']:
process_source_relationship(driver, source_id, component_id, param_node_internal_id)
def process_cwl_steps(driver: Driver, cwl_entity: dict) -> None: def process_cwl_steps(driver: Driver, cwl_entity: dict) -> None:
""" """
Processes the steps of a CWL Workflow component (which we will refer to as outer workflow component). Processes the steps of a CWL Workflow component (which we will refer to as outer workflow component).
...@@ -169,5 +230,5 @@ def process_cwl_steps(driver: Driver, cwl_entity: dict) -> None: ...@@ -169,5 +230,5 @@ def process_cwl_steps(driver: Driver, cwl_entity: dict) -> None:
def process_cwl_expression(driver: Driver, entity: dict) -> None: def process_cwl_expression(driver: Driver, entity: dict) -> None:
expression = entity['expression'] expression = entity['expression']
expr_tree = parse_javascript_string(expression) # expr_tree = parse_javascript_string(expression)
# traverse_and_create(driver, entity['path'], expr_tree) # traverse_and_create(driver, entity['path'], expr_tree)
\ No newline at end of file
from neo4j import Driver from neo4j import Driver
from graph_creation.cwl_parsing import get_cwl_from_repo from graph_creation.cwl_parsing import get_cwl_from_repo
from graph_creation.cwl_processing import process_cwl_expression, process_cwl_inputs, process_cwl_outputs, process_cwl_steps from graph_creation.cwl_processing import process_cwl_expression, process_cwl_inputs, process_cwl_main_inputs, process_cwl_main_outputs, process_cwl_outputs, process_cwl_steps
from neo4j_queries.edge_queries import simplify_data_and_control_edges from neo4j_queries.edge_queries import simplify_data_and_control_edges
from neo4j_queries.node_queries import ensure_component_node from neo4j_queries.node_queries import ensure_component_node, ensure_main_component_node
def process_repos(repo_list: list[str], driver: Driver) -> None: def process_repos(repo_list: list[str], driver: Driver) -> None:
""" """
...@@ -18,10 +18,12 @@ def process_repos(repo_list: list[str], driver: Driver) -> None: ...@@ -18,10 +18,12 @@ def process_repos(repo_list: list[str], driver: Driver) -> None:
# Parse CWL files # Parse CWL files
cwl_entities[repo]= get_cwl_from_repo(repo) cwl_entities[repo]= get_cwl_from_repo(repo)
for entity in cwl_entities[repo]: for entity in cwl_entities[repo]:
pipelines = ["compress_pipeline.cwl"]
if any(pipeline in entity['path'] for pipeline in pipelines):
component_id = entity['path'] component_id = entity['path']
ensure_component_node(driver, component_id) ensure_main_component_node(driver, component_id)
process_cwl_inputs(driver, entity) process_cwl_main_inputs(driver, entity)
process_cwl_outputs(driver, entity) process_cwl_main_outputs(driver, entity)
if entity['class'] == 'Workflow': if entity['class'] == 'Workflow':
process_cwl_steps(driver, entity) process_cwl_steps(driver, entity)
# elif entity['class'] == 'ExpressionTool': # elif entity['class'] == 'ExpressionTool':
......
from pathlib import Path from pathlib import Path
from neo4j import Driver from neo4j import Driver
from neo4j_queries.node_queries import ensure_data_node, ensure_in_parameter_node from neo4j_queries.node_queries import ensure_data_node, ensure_in_parameter_node, ensure_main_in_parameter_node
from neo4j_queries.edge_queries import create_data_relationship, create_in_param_relationship from neo4j_queries.edge_queries import create_data_relationship, create_in_param_relationship, create_main_in_param_relationship
def create_input_nodes_and_relationships(driver: Driver, input_id: str, component_id: str) -> None: def create_input_nodes_and_relationships(driver: Driver, input_id: str, component_id: str) -> None:
""" """
...@@ -28,6 +28,31 @@ def create_input_nodes_and_relationships(driver: Driver, input_id: str, componen ...@@ -28,6 +28,31 @@ def create_input_nodes_and_relationships(driver: Driver, input_id: str, componen
# Create a data edge from the data node to the the in-parameter node # Create a data edge from the data node to the the in-parameter node
create_data_relationship(driver, data_node_internal_id, param_node_internal_id) create_data_relationship(driver, data_node_internal_id, param_node_internal_id)
def create_main_input_nodes_and_relationships(driver: Driver, input_id: str, component_id: str) -> None:
"""
Processes a single input tied to a specific CWL component.
The following nodes and edges are created:
- an in-parameter node with the parameter ID as defined in the component and component ID equal to the path of the componet
- a data node with component ID of the component and data ID equal to the parameter ID
- a data edge from the component node to the in-parameter node
- a data edge from the data node to the the in-parameter node
Parameters:
driver (Driver): the driver used to connect to Neo4j
input_id (str): the ID of the input as defined in the CWL component
component_id (str): the unique ID of the CWL component (its path)
"""
# Create in-parameter with the parameter ID as defined in the component and component ID equal to the path of the componet
param_node = ensure_main_in_parameter_node(driver, input_id, component_id)
param_node_internal_id = param_node[0]
# Create a data edge from the component node to the in-parameter node
create_main_in_param_relationship(driver, component_id, param_node_internal_id)
# Create a data node with component ID of the component and data ID equal to the parameter ID
data_node = ensure_data_node(driver, input_id, component_id)
data_node_internal_id = data_node[0]
# Create a data edge from the data node to the the in-parameter node
create_data_relationship(driver, data_node_internal_id, param_node_internal_id)
def process_source_relationship(driver: Driver, source_id: str, component_id: str, param_node_internal_id: int) -> None: def process_source_relationship(driver: Driver, source_id: str, component_id: str, param_node_internal_id: int) -> None:
""" """
Processes a source relationship between a data node and a parameter node. Processes a source relationship between a data node and a parameter node.
......
from graph_creation.cst_processing import traverse_and_create, traverse_when_statement_extract_dependencies
from graph_creation.repo_processing import process_repos from graph_creation.repo_processing import process_repos
from dockerfile_parse import DockerfileParser
from pprint import pprint
from neo4j import GraphDatabase from neo4j import GraphDatabase
import dotenv import dotenv
import os import os
import gitlab import gitlab
import subprocess import subprocess
from parsers.javascript_parsing import parse_javascript_expression_string, parse_javascript_string
def clone_repos(repo_list: list[str], folder_name: str) -> None: def clone_repos(repo_list: list[str], folder_name: str) -> None:
""" """
...@@ -25,7 +29,7 @@ def clone_repos(repo_list: list[str], folder_name: str) -> None: ...@@ -25,7 +29,7 @@ def clone_repos(repo_list: list[str], folder_name: str) -> None:
if __name__ == '__main__': if __name__ == '__main__':
relevant_repos = ['ldv/imaging_compress_pipeline'] relevant_repos = ['ldv/imaging_compress_pipeline']
folder = 'repos' folder = 'repos'
clone_repos(relevant_repos, folder) # clone_repos(relevant_repos, folder)
# Get the authentication details for Neo4j instance # Get the authentication details for Neo4j instance
load_status = dotenv.load_dotenv("Neo4j-25ebc0db-Created-2024-11-17.txt") load_status = dotenv.load_dotenv("Neo4j-25ebc0db-Created-2024-11-17.txt")
...@@ -41,6 +45,14 @@ if __name__ == '__main__': ...@@ -41,6 +45,14 @@ if __name__ == '__main__':
driver.verify_connectivity() driver.verify_connectivity()
print("Connection established.") print("Connection established.")
driver = GraphDatabase.driver(URI, auth=AUTH) driver = GraphDatabase.driver(URI, auth=AUTH)
# tree = parse_javascript_expression_string("steps.step_a.outputs.output_param")
# dfp = DockerfileParser(path="repos\ldv\imaging_compress_pipeline\docker\Dockerfile")
# Print the parsed structure:
# pprint(dfp.json)
# traverse_and_create(driver, tree)
process_repos(repo_paths, driver) process_repos(repo_paths, driver)
driver.close() driver.close()
...@@ -59,6 +59,64 @@ def create_out_param_relationship(driver: Driver, prefixed_component_id: str, pa ...@@ -59,6 +59,64 @@ def create_out_param_relationship(driver: Driver, prefixed_component_id: str, pa
record = result.single() record = result.single()
return record["component_id"], record["parameter_id"] return record["component_id"], record["parameter_id"]
def create_main_in_param_relationship(driver: Driver, prefixed_component_id: str, parameter_internal_id: int) -> tuple[str,str]:
"""
Creates a data dependency relationship in Neo4j between a component node with path prefixed_component_id
and an in-parameter node with Neo4j internal ID parameter_internal_id.
This relationship is an outgoing data edge from the component to the in-parameter node.
The ID of the component can be given based on the local relative path, so it needs to be cleaned
before querying Neo4j.
Parameters:
driver (Driver): the Neo4j driver
prefixed_component_id (str): the local relative path of the component
parameter_internal_id (int): the internal Neo4j ID of the in-parameter node
Returns:
tuple[str,str]: the component ID of the component, the parameter ID of the parameter
"""
component_id = clean_component_id(prefixed_component_id)
query = """
MATCH (c:MainComponent {component_id: $component_id}), (p)
WHERE elementId(p) = $parameter_internal_id
MERGE (c)-[:DATA]->(p)
RETURN c.component_id AS component_id, p.parameter_id AS parameter_id
"""
with driver.session() as session:
result = session.run(query, component_id=component_id,
parameter_internal_id=parameter_internal_id)
record = result.single()
return record["component_id"], record["parameter_id"]
def create_main_out_param_relationship(driver: Driver, prefixed_component_id: str, parameter_internal_id: int) -> tuple[str,str]:
"""
Creates a data dependency relationship in Neo4j between a component node with path prefixed_component_id
and an out-parameter node with Neo4j internal ID parameter_internal_id.
This relationship is an outgoing data edge from the out-parameter to the component node.
The ID of the component can be given based on the local relative path, so it needs to be cleaned
before querying Neo4j.
Parameters:
driver (Driver): the Neo4j driver
prefixed_component_id (str): the local relative path of the component
parameter_internal_id (int): the internal Neo4j ID of the out-parameter node
Returns:
tuple[str,str]: the component ID of the component, the parameter ID of the parameter
"""
component_id = clean_component_id(prefixed_component_id)
query = """
MATCH (c:MainComponent {component_id: $component_id}), (p)
WHERE elementId(p) = $parameter_internal_id
MERGE (c)<-[:DATA]-(p)
RETURN c.component_id AS component_id, p.parameter_id AS parameter_id
"""
with driver.session() as session:
result = session.run(query, component_id=component_id,
parameter_internal_id=parameter_internal_id)
record = result.single()
return record["component_id"], record["parameter_id"]
def create_data_relationship(driver: Driver, from_internal_node_id: int, to_internal_node_id: int) -> tuple[int,int]: def create_data_relationship(driver: Driver, from_internal_node_id: int, to_internal_node_id: int) -> tuple[int,int]:
""" """
Creates a data dependency relationship in Neo4j between the two nodes with Neo4j internal IDs given as parameters. Creates a data dependency relationship in Neo4j between the two nodes with Neo4j internal IDs given as parameters.
......
from pathlib import Path
from neo4j import Driver from neo4j import Driver
from neo4j_queries.utils import clean_component_id from neo4j_queries.utils import clean_component_id
...@@ -16,12 +17,37 @@ def ensure_component_node(driver: Driver, prefixed_component_id: str) -> tuple[i ...@@ -16,12 +17,37 @@ def ensure_component_node(driver: Driver, prefixed_component_id: str) -> tuple[i
tuple[int,str]: the Neoj4 internal ID of the component node, the component ID of the component tuple[int,str]: the Neoj4 internal ID of the component node, the component ID of the component
""" """
component_id = clean_component_id(prefixed_component_id) component_id = clean_component_id(prefixed_component_id)
nice_id = Path(component_id).stem
query = """ query = """
MERGE (c:Component {component_id: $component_id}) MERGE (c:Component {component_id: $component_id, nice_id: $nice_id})
RETURN elementId(c) AS node_internal_id, c.component_id AS component_id RETURN elementId(c) AS node_internal_id, c.component_id AS component_id
""" """
with driver.session() as session: with driver.session() as session:
result = session.run(query, component_id=component_id) result = session.run(query, component_id=component_id, nice_id=nice_id)
record = result.single()
return record["node_internal_id"], record["component_id"]
def ensure_main_component_node(driver: Driver, prefixed_component_id: str) -> tuple[int,str]:
"""
Ensures that there exists a component node corresponding to the file with local path prefixed_component_id.
The ID of the component can be given based on the local relative path, so it is cleaned
before querying Neo4j.
Parameters:
driver (Driver): the Neo4j driver
prefixed_component_id (str): the local relative path of the component
Returns:
tuple[int,str]: the Neoj4 internal ID of the component node, the component ID of the component
"""
component_id = clean_component_id(prefixed_component_id)
nice_id = Path(component_id).stem
query = """
MERGE (c:MainComponent {component_id: $component_id, nice_id: $nice_id})
RETURN elementId(c) AS node_internal_id, c.component_id AS component_id
"""
with driver.session() as session:
result = session.run(query, component_id=component_id, nice_id=nice_id)
record = result.single() record = result.single()
return record["node_internal_id"], record["component_id"] return record["node_internal_id"], record["component_id"]
...@@ -51,6 +77,32 @@ def ensure_in_parameter_node(driver: Driver, node_id: str, prefixed_component_id ...@@ -51,6 +77,32 @@ def ensure_in_parameter_node(driver: Driver, node_id: str, prefixed_component_id
record = result.single() record = result.single()
return record["node_internal_id"], record["id_property"], record["component_id_property"] return record["node_internal_id"], record["id_property"], record["component_id_property"]
def ensure_main_in_parameter_node(driver: Driver, node_id: str, prefixed_component_id: str) \
-> tuple[int,str,str,str]:
"""
Ensures that there exists an in-parameter node with ID node_id
associated with the component in the file with local path prefixed_component_id.
The ID of the component can be given based on the local relative path, so it is cleaned
before querying Neo4j.
Parameters:
driver (Driver): the Neo4j driver
node_id (str): the ID of the parameter
prefixed_component_id (str): the local relative path of the component
Returns:
tuple[int,str,str]: the Neoj4 internal ID of the parameter node, the parameter ID, the component ID
"""
component_id = clean_component_id(prefixed_component_id)
query = """
MERGE (n:MainInParameter {parameter_id: $node_id, component_id: $component_id})
RETURN elementId(n) AS node_internal_id, n.parameter_id AS id_property, n.component_id AS component_id_property
"""
with driver.session() as session:
result = session.run(query, node_id=node_id, component_id=component_id)
record = result.single()
return record["node_internal_id"], record["id_property"], record["component_id_property"]
def ensure_out_parameter_node(driver: Driver, node_id: str, prefixed_component_id: str) \ def ensure_out_parameter_node(driver: Driver, node_id: str, prefixed_component_id: str) \
-> tuple[int,str,str,str]: -> tuple[int,str,str,str]:
""" """
...@@ -77,6 +129,32 @@ def ensure_out_parameter_node(driver: Driver, node_id: str, prefixed_component_i ...@@ -77,6 +129,32 @@ def ensure_out_parameter_node(driver: Driver, node_id: str, prefixed_component_i
record = result.single() record = result.single()
return record["node_internal_id"], record["id_property"], record["component_id_property"] return record["node_internal_id"], record["id_property"], record["component_id_property"]
def ensure_main_out_parameter_node(driver: Driver, node_id: str, prefixed_component_id: str) \
-> tuple[int,str,str,str]:
"""
Ensures that there exists an out-parameter node with ID node_id
associated with the component in the file with local path prefixed_component_id.
The ID of the component can be given based on the local relative path, so it is cleaned
before querying Neo4j.
Parameters:
driver (Driver): the Neo4j driver
node_id (str): the ID of the parameter
prefixed_component_id (str): the local relative path of the component
Returns:
tuple[int,str,str]: the Neoj4 internal ID of the parameter node, the parameter ID, the component ID
"""
component_id = clean_component_id(prefixed_component_id)
query = """
MERGE (n:MainOutParameter {parameter_id: $node_id, component_id: $component_id})
RETURN elementId(n) AS node_internal_id, n.parameter_id AS id_property, n.component_id AS component_id_property
"""
with driver.session() as session:
result = session.run(query, node_id=node_id, component_id=component_id)
record = result.single()
return record["node_internal_id"], record["id_property"], record["component_id_property"]
def ensure_data_node(driver: Driver, node_id: str, prefixed_component_id: str) -> tuple[int,str,str]: def ensure_data_node(driver: Driver, node_id: str, prefixed_component_id: str) -> tuple[int,str,str]:
""" """
Ensures that there exists a data node with ID node_id Ensures that there exists a data node with ID node_id
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment