Skip to content
Snippets Groups Projects
Commit a409397c authored by Chiara Liotta's avatar Chiara Liotta
Browse files

create fan-in fan-out graph

parent e7fea0fc
No related branches found
No related tags found
No related merge requests found
......@@ -4,7 +4,7 @@ from neo4j import Driver
from graph_creation.cst_processing import traverse_when_statement_extract_dependencies
from graph_creation.utils import process_in_param, process_parameter_source
from neo4j_queries.node_queries import ensure_component_node, ensure_git_node, ensure_in_parameter_node, ensure_out_parameter_node
from neo4j_queries.edge_queries import create_control_relationship, create_data_relationship_with_id, create_out_param_relationship, create_references_relationship
from neo4j_queries.edge_queries import create_control_relationship, create_data_relationship, create_out_param_relationship, create_references_relationship
from neo4j_queries.utils import get_is_workflow
from parsers.javascript_parsing import parse_javascript_expression_string, parse_javascript_string
......@@ -140,7 +140,7 @@ def process_cwl_steps(driver: Driver, cwl_entity: dict, tool_paths: list[str], s
param_node_internal_id = param_node[0]
if is_tool:
# Create a data edge from the step component node to the in-parameter node
create_data_relationship_with_id(driver, s_node_internal_id, param_node_internal_id, step_path)
create_data_relationship(driver, s_node_internal_id, param_node_internal_id, step_path, input['id'])
# Inputs can have one or multiple data sources (data nodes)
if 'source' in input:
......@@ -182,7 +182,7 @@ def process_cwl_steps(driver: Driver, cwl_entity: dict, tool_paths: list[str], s
param_node_internal_id = param_node[0]
if is_tool:
# Create a data edge from out-parameter node to the step component node
create_data_relationship_with_id(driver, param_node_internal_id, s_node_internal_id, step_path)
create_data_relationship(driver, param_node_internal_id, s_node_internal_id, step_path, output_id)
def process_cwl_base_commands(driver, entity, links):
......
from neo4j import Driver
from metric_calculations.Neo4jTraversalDFS import Neo4jTraversalDFS
from graph_creation.cwl_parsing import get_cwl_from_repo
from graph_creation.docker_parsing import parse_all_dockerfiles
from graph_creation.utils import process_step_lookup
......@@ -7,7 +8,7 @@ from neo4j_queries.edge_queries import clean_relationship
from neo4j_queries.node_queries import ensure_component_node
from neo4j_queries.utils import get_is_workflow
def process_repos(repo_list: list[str], driver: Driver) -> None:
def process_repos(repo_list: list[str], driver: Driver, build = True, calculate = False) -> None:
"""
Processes a list of local repository paths containing CWL (Common Workflow Language) files,
parsing each CWL file and creating the corresponding nodes and relationships in a Neo4j graph.
......@@ -25,16 +26,17 @@ def process_repos(repo_list: list[str], driver: Driver) -> None:
None
"""
for repo in repo_list:
links = parse_all_dockerfiles(repo)
# Parse CWL files of current repo
workflows, tools = get_cwl_from_repo(repo)
# Extract tool paths for step processing later
tool_paths = [item["path"] for item in tools]
clean_relationship(driver)
# Combine workflows and tools into one list of entities to process
all_entities = workflows + tools
if build:
links = parse_all_dockerfiles(repo)
clean_relationship(driver)
for entity in all_entities:
print(f'Processing: {entity["path"]}')
is_workflow = get_is_workflow(entity)
......@@ -51,3 +53,10 @@ def process_repos(repo_list: list[str], driver: Driver) -> None:
# process_cwl_expression(driver, entity)
elif entity['class'] == 'CommandLineTool':
process_cwl_commandline(driver, entity, links)
if calculate:
neo4j_traversal = Neo4jTraversalDFS(driver)
for entity in all_entities:
print(f'Processing: {entity["path"]}')
is_workflow = get_is_workflow(entity)
if is_workflow:
neo4j_traversal.traverse_subgraph(entity['path'])
\ No newline at end of file
from pathlib import Path
from neo4j import Driver
from neo4j_queries.node_queries import ensure_in_parameter_node, ensure_out_parameter_node
from neo4j_queries.edge_queries import create_data_relationship_with_id, create_in_param_relationship
from neo4j_queries.edge_queries import create_data_relationship, create_in_param_relationship
GITLAB_ASTRON ='https://git.astron.nl'
......@@ -72,7 +72,7 @@ def process_parameter_source(driver: Driver, param_node_internal_id: int, source
# Ensure the source exists in the output parameter node for the subcomponent
source_param_node = ensure_out_parameter_node(driver, source_parsed[1], sub_component_id)[0]
# Create a relationship between the parameter node and its source
create_data_relationship_with_id(driver, param_node_internal_id, source_param_node, component_id)
create_data_relationship(driver, param_node_internal_id, source_param_node, component_id, source_id)
def resolve_relative_path(path: Path)-> Path:
"""
......
......@@ -45,6 +45,8 @@ if __name__ == '__main__':
driver.verify_connectivity()
print("Connection established.")
driver = GraphDatabase.driver(URI, auth=AUTH)
process_repos(repo_paths, driver)
# with driver.session() as session:
# session.run("MATCH ()-[r:DATA]-() DELETE r")
process_repos(repo_paths, driver, build=False, calculate=True)
driver.close()
from neo4j import Driver, GraphDatabase
import os, dotenv, pathlib
from neo4j_queries.utils import clean_component_id
class Neo4jTraversalDFS:
"""Class to perform DFS traversal and save the resulting subgraph."""
def __init__(self, uri, auth):
self.driver = GraphDatabase.driver(uri, auth=auth)
def __init__(self, driver: Driver):
self.driver = driver
def close(self):
"""Closes the Neo4j database connection."""
self.driver.close()
def _create_calculation_component(self, session, component_id):
"""Creates a CalculationComponent node for a given component_id if it does not exist."""
nice_id = pathlib.Path(component_id).stem
query = """
MERGE (cc:CalculationComponent {component_id: $component_id})
SET cc.nice_id = $nice_id
RETURN cc
"""
session.run(query, component_id=component_id, nice_id=nice_id)
def _create_direct_local_flow(self, session, from_component_id, to_component_id, workflow_id):
"""
Creates a DIRECT_LOCAL_FLOW relationship between two CalculationComponent nodes.
A -(DIRECT_LOCAL_FLOW)-> B if A calls B
This happens when a workflow A calls component B
"""
query = """
MATCH (cc_from:CalculationComponent {component_id: $from_component_id})
MATCH (cc_to:CalculationComponent {component_id: $to_component_id})
MERGE (cc_from)-[:DIRECT_LOCAL_FLOW {workflow_id: $workflow_id}]->(cc_to)
"""
session.run(query, from_component_id=from_component_id, to_component_id=to_component_id, workflow_id=workflow_id)
def _create_indirect_local_flow(self, session, from_component_id, to_component_id, workflow_id):
"""
Creates a DIRECT_LOCAL_FLOW relationship between two CalculationComponent nodes.
B -(INDIRECT_LOCAL_FLOW)-> A
if B calls A and A returns a value to B, which B subsequenty uses
This happens when a workflow B calls component A
"""
query = """
MATCH (cc_from:CalculationComponent {component_id: $from_component_id})
MATCH (cc_to:CalculationComponent {component_id: $to_component_id})
MERGE (cc_from)-[:INDIRECT_LOCAL_FLOW {workflow_id: $workflow_id}]->(cc_to)
"""
session.run(query, from_component_id=from_component_id, to_component_id=to_component_id, workflow_id=workflow_id)
def traverse_subgraph(self, component_id, calculation = True, visualization = False):
"""Performs DFS traversal and stores the resulting subgraph in Neo4j."""
start_component_id = clean_component_id(component_id)
with self.driver.session() as session:
# Find all starting nodes with the given component_id
query = """
MATCH (n:OutParameter {component_id: $component_id})
RETURN elementId(n) AS nodeId
"""
start_nodes = [record["nodeId"] for record in session.run(query, component_id=start_component_id)]
allowed_component_ids = {start_component_id} # Set of allowed component_ids
visited_nodes = set() # Track visited nodes to avoid cycles
visited_edges = set() # Track visited relationships
if calculation:
# Create CalculationComponent node for this component_id if not already created
self._create_calculation_component(session, start_component_id)
for node_id in start_nodes:
self._dfs_traverse(session, start_component_id, node_id, allowed_component_ids, visited_nodes, visited_edges, calculation, visualization)
def _dfs_traverse(self, session, workflow_id, node_id, allowed_component_ids, visited_nodes, visited_edges, calculation, visualization):
"""Recursively performs DFS traversal and saves the subgraph."""
if node_id in visited_nodes:
return # Stop if already visited
visited_nodes.add(node_id) # Mark node as visited
print(f"Visited Node ID: {node_id}")
if calculation:
# Get the component_id for the current node (OutParameter)
query = """
MATCH (n) WHERE elementId(n) = $node_id
RETURN n.component_id AS component_id, labels(n) AS nodeLabels
"""
result = session.run(query, node_id=node_id)
record = result.single()
component_id = record["component_id"]
starting_node_labels = record["nodeLabels"]
# Find valid connections
query = """
MATCH (n)-[r]-(m)
WHERE elementId(n) = $node_id AND r.component_id IN $allowed_component_ids
RETURN elementId(m) AS nextNodeId, elementId(r) AS relId, m.component_id AS nextComponentId, labels(m) AS nodeLabels, type(r) AS relType, r.component_id AS relComponentId
"""
results = session.run(query, node_id=node_id, allowed_component_ids=list(allowed_component_ids))
for record in results:
next_node_id = record["nextNodeId"]
rel_id = record["relId"]
next_component_id = record["nextComponentId"]
node_labels = record["nodeLabels"]
rel_type = record["relType"]
rel_component_id = record["relComponentId"]
if visualization:
session.run("""
MATCH (n)-[r]->(m)
WHERE elementId(n) = $node_id AND elementId(m) = $next_node_id
MERGE (n)-[:SUBGRAPH {original_type: $rel_type}]->(m)
""", node_id=node_id, next_node_id=next_node_id, rel_type=rel_type)
# Save the relationship if not already visited
if rel_id not in visited_edges:
visited_edges.add(rel_id)
# If an OutParameter node has a new component_id, expand allowed list
if "OutParameter" in node_labels and next_component_id not in allowed_component_ids:
allowed_component_ids.add(next_component_id)
if calculation:
self._create_calculation_component(session, next_component_id)
self._create_direct_local_flow(session, workflow_id, next_component_id, workflow_id)
self._create_indirect_local_flow(session, next_component_id, workflow_id, workflow_id)
print(f"New component_id added: {next_component_id}")
if calculation:
# If the relation is B -> A where B is InParameter and A is OutParameter
if "InParameter" in starting_node_labels and "OutParameter" in node_labels:
# Then there is a direct flow from A to B
self._create_direct_local_flow(session, next_component_id, component_id, rel_component_id)
# Recursively continue DFS
self._dfs_traverse(session, workflow_id, next_node_id, allowed_component_ids, visited_nodes, visited_edges, calculation, visualization)
def clean_up_flow(self):
with self.driver.session() as session:
session.run("""
MATCH ()-[r:DIRECT_LOCAL_FLOW]-()
DELETE r
""")
session.run("""
MATCH ()-[r:INDIRECT_LOCAL_FLOW]-()
DELETE r
""")
session.run("""
MATCH ()-[r:LOCAL_FLOW]-()
DELETE r
""")
def clean_up_subgraph(self):
with self.driver.session() as session:
session.run("""
MATCH ()-[r:SUBGRAPH]-()
DELETE r
""")
# if __name__ == "__main__":
# # Get the authentication details for Neo4j instance
# load_status = dotenv.load_dotenv("Neo4j-25ebc0db-Created-2024-11-17.txt")
# if load_status is False:
# raise RuntimeError('Environment variables not loaded.')
# URI = os.getenv("NEO4J_URI")
# AUTH = (os.getenv("NEO4J_USERNAME"), os.getenv("NEO4J_PASSWORD"))
# neo4j_traversal = Neo4jTraversalDFS(URI, AUTH)
# try:
# start_component_id = "ldv\imaging_compress_pipeline\download_and_compress_pipeline.cwl" # Start DFS from component_id 'x'
# neo4j_traversal.clean_up_flow()
# neo4j_traversal.traverse_subgraph(start_component_id)
# finally:
# neo4j_traversal.close()
......@@ -21,7 +21,7 @@ def create_in_param_relationship(driver: Driver, prefixed_component_id: str, par
query = """
MATCH (c:Component {component_id: $component_id}), (p:InParameter)
WHERE elementId(p) = $parameter_internal_id
MERGE (c)-[:DATA {component_id: $component_id}]->(p)
MERGE (c)-[:DATA {component_id: $component_id, data_id: p.parameter_id}]->(p)
RETURN c.component_id AS component_id, p.parameter_id AS parameter_id
"""
with driver.session() as session:
......@@ -48,7 +48,7 @@ def create_out_param_relationship(driver: Driver, prefixed_component_id: str, pa
query = """
MATCH (c:Component {component_id: $component_id}), (p: OutParameter)
WHERE elementId(p) = $parameter_internal_id
MERGE (c)<-[:DATA {component_id: $component_id}]-(p)
MERGE (c)<-[:DATA {component_id: $component_id, data_id: p.parameter_id}]-(p)
RETURN c.component_id AS component_id, p.parameter_id AS parameter_id
"""
with driver.session() as session:
......@@ -56,7 +56,7 @@ def create_out_param_relationship(driver: Driver, prefixed_component_id: str, pa
parameter_internal_id=parameter_internal_id)
def create_data_relationship_with_id(driver: Driver, from_internal_node_id: int, to_internal_node_id: int, id: str) -> tuple[int,int]:
def create_data_relationship(driver: Driver, from_internal_node_id: int, to_internal_node_id: int, component_id: str, data_id: str) -> tuple[int,int]:
"""
Creates a data dependency relationship in Neo4j between the two nodes with Neo4j internal IDs given as parameters.
This relationship is an outgoing data edge from the node with internal ID from_internal_node_id
......@@ -70,16 +70,16 @@ def create_data_relationship_with_id(driver: Driver, from_internal_node_id: int,
Returns:
tuple[int,int]: from_internal_node_id, to_internal_node_id
"""
clean_id = clean_component_id(id)
clean_id = clean_component_id(component_id)
query = """
MATCH (a), (b)
WHERE elementId(a) = $from_internal_node_id AND elementId(b) = $to_internal_node_id
MERGE (a)-[:DATA {component_id: $component_id}]->(b)
MERGE (a)-[:DATA {component_id: $component_id, data_id: $data_id}]->(b)
RETURN elementId(a) AS id_1, elementId(b) AS id_2
"""
with driver.session() as session:
result = session.run(query, from_internal_node_id=from_internal_node_id,
to_internal_node_id=to_internal_node_id, component_id= clean_id)
to_internal_node_id=to_internal_node_id, component_id= clean_id, data_id=data_id)
record = result.single()
return record["id_1"], record["id_2"]
......@@ -134,26 +134,3 @@ def clean_relationship(driver: Driver) -> tuple[int,int]:
"""
with driver.session() as session:
session.run(query)
\ No newline at end of file
def simplify_data_and_control_edges(driver: Driver):
with driver.session() as session:
create_data_edges_query = """
MATCH (n1)-[:DATA]->(n:Data), (n)-[:DATA]->(n2)
WITH n, n1, n2, n.component_id AS component_id, n.data_id AS data_id
MERGE (n1)-[:DATA {component_id: component_id, data_id: data_id}]->(n2)
"""
session.run(create_data_edges_query)
create_control_edges_query = """
MATCH (n1)-[:CONTROL]->(n:Data), (n)-[:DATA]->(n2)
WITH n, n1, n2, n.component_id AS component_id, n.data_id AS data_id
MERGE (n1)-[:CONTROL {component_id: component_id, data_id: data_id}]->(n2)
"""
session.run(create_control_edges_query)
delete_data_query = """
MATCH (n:Data)
DETACH DELETE n
"""
session.run(delete_data_query)
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment