Skip to content
Snippets Groups Projects
Commit c9aae592 authored by Chiara Liotta's avatar Chiara Liotta
Browse files

rename neo4j query module

parent 268591a5
No related branches found
No related tags found
No related merge requests found
...@@ -2,9 +2,9 @@ from pathlib import Path ...@@ -2,9 +2,9 @@ from pathlib import Path
import re import re
from neo4j import Driver from neo4j import Driver
from graph_creation.utils import extract_js_expression_dependencies, get_input_source, process_control_dependencies, process_in_param, process_parameter_source from graph_creation.utils import extract_js_expression_dependencies, get_input_source, process_control_dependencies, process_in_param, process_parameter_source
from neo4j_dependency_queries.create_node_queries import ensure_git_node, ensure_in_parameter_node, ensure_out_parameter_node from neo4j_graph_queries.create_node_queries import ensure_git_node, ensure_in_parameter_node, ensure_out_parameter_node
from neo4j_dependency_queries.create_edge_queries import create_out_param_relationship, create_references_relationship from neo4j_graph_queries.create_edge_queries import create_out_param_relationship, create_references_relationship
from neo4j_dependency_queries.utils import get_is_workflow from neo4j_graph_queries.utils import get_is_workflow
def process_cwl_inputs(driver: Driver, cwl_entity: dict) -> None: def process_cwl_inputs(driver: Driver, cwl_entity: dict) -> None:
""" """
......
...@@ -3,7 +3,7 @@ from graph_creation.cwl_parsing import get_cwl_from_repo ...@@ -3,7 +3,7 @@ from graph_creation.cwl_parsing import get_cwl_from_repo
from graph_creation.docker_parsing import parse_all_dockerfiles from graph_creation.docker_parsing import parse_all_dockerfiles
from graph_creation.utils import process_step_lookup from graph_creation.utils import process_step_lookup
from graph_creation.cwl_processing import process_cwl_commandline, process_cwl_inputs, process_cwl_outputs, process_cwl_steps from graph_creation.cwl_processing import process_cwl_commandline, process_cwl_inputs, process_cwl_outputs, process_cwl_steps
from neo4j_dependency_queries.utils import get_is_workflow from neo4j_graph_queries.utils import get_is_workflow
def process_repos(repo_list: list[str], driver: Driver) -> None: def process_repos(repo_list: list[str], driver: Driver) -> None:
""" """
......
from pathlib import Path from pathlib import Path
from neo4j import Driver from neo4j import Driver
import re import re
from graph_creation.cwl_parsing import get_cwl_from_repo from neo4j_graph_queries.create_node_queries import ensure_component_node, ensure_in_parameter_node, ensure_out_parameter_node
from neo4j_dependency_queries.create_node_queries import ensure_component_node, ensure_in_parameter_node, ensure_out_parameter_node from neo4j_graph_queries.create_edge_queries import create_control_relationship, create_data_relationship, create_in_param_relationship
from neo4j_dependency_queries.create_edge_queries import create_control_relationship, create_data_relationship, create_in_param_relationship from neo4j_graph_queries.processing_queries import get_all_in_parameter_nodes_of_entity
from neo4j_dependency_queries.processing_queries import get_all_in_parameter_nodes_of_entity
GITLAB_ASTRON ='https://git.astron.nl' GITLAB_ASTRON ='https://git.astron.nl'
......
...@@ -2,8 +2,8 @@ from neo4j import Driver, GraphDatabase, Session ...@@ -2,8 +2,8 @@ from neo4j import Driver, GraphDatabase, Session
from collections import deque from collections import deque
import copy import copy
from metric_calculations.utils import current_stack_structure_processed, perform_topological_sort from metric_calculations.utils import current_stack_structure_processed, perform_topological_sort
from neo4j_dependency_queries.processing_queries import get_all_in_parameter_nodes_of_entity, get_node_details, get_nodes_with_control_edges, get_valid_connections, get_workflow_list_of_data_edges_from_node, initiate_workflow_list, update_workflow_list_of_edge from neo4j_graph_queries.processing_queries import get_all_in_parameter_nodes_of_entity, get_node_details, get_nodes_with_control_edges, get_valid_connections, get_workflow_list_of_data_edges_from_node, initiate_workflow_list, update_workflow_list_of_edge
from neo4j_dependency_queries.utils import clean_component_id from neo4j_graph_queries.utils import clean_component_id
class DependencyTraversalDFS: class DependencyTraversalDFS:
"""Class to perform DFS traversal and save the resulting subgraph.""" """Class to perform DFS traversal and save the resulting subgraph."""
......
from neo4j import Session from neo4j import Session
from neo4j_dependency_queries.processing_queries import get_data_flow_relationships_for_sorting from neo4j_graph_queries.processing_queries import get_data_flow_relationships_for_sorting
import networkx as nx import networkx as nx
def append_paths_entry(id1: str, id2: str, entry: tuple[str, int], paths: dict[str, dict[str, list]]) -> None: def append_paths_entry(id1: str, id2: str, entry: tuple[str, int], paths: dict[str, dict[str, list]]) -> None:
......
import pathlib
from neo4j import Session
def create_calculation_component_node(session: Session, component_id: str, component_type: str):
"""Creates a CalculationComponent node for a given component_id if it does not exist."""
nice_id = pathlib.Path(component_id).stem
query = """
MERGE (cc:CalculationComponent {component_id: $component_id})
SET cc.nice_id = $nice_id
SET cc.component_type = $component_type
RETURN cc
"""
session.run(query, component_id=component_id, nice_id=nice_id, component_type=component_type)
def create_direct_flow(session: Session, from_component_id: str, to_component_id: str,
workflow_id: str, data_ids: str, workflow_list: list):
"""
Creates a DIRECT_FLOW relationship between two CalculationComponent nodes.
A -(DIRECT_FLOW)-> B if A calls B
This happens when a workflow A calls component B as a step
"""
description = "Workflow (source) calls step (target)"
query = """
MATCH (cc_from:CalculationComponent {component_id: $from_component_id})
MATCH (cc_to:CalculationComponent {component_id: $to_component_id})
MERGE (cc_from)-[r:DIRECT_FLOW
{workflow_id: $workflow_id, description: $description, workflow_list:apoc.coll.sort($workflow_list)}]->(cc_to)
ON MATCH SET r.data_ids = apoc.coll.toSet(r.data_ids + $data_ids)
ON CREATE SET r.data_ids = $data_ids
"""
session.run(query, from_component_id=from_component_id, to_component_id=to_component_id, workflow_id=workflow_id,
description=description, data_ids=data_ids, workflow_list=workflow_list)
def create_indirect_flow(session: Session, from_component_id: str, to_component_id: str,
workflow_id: str, data_ids: str, workflow_list: list):
"""
Creates an INDIRECT_FLOW relationship between two CalculationComponent nodes.
A -(INDIRECT_FLOW)-> B
- Case: if B calls A and A returns a value to B, which B subsequenty uses
This happens when a workflow B calls component A
"""
description = "Step (source) called by workflow (target)"
query = """
MATCH (cc_from:CalculationComponent {component_id: $from_component_id})
MATCH (cc_to:CalculationComponent {component_id: $to_component_id})
MERGE (cc_from)-[r:INDIRECT_FLOW
{workflow_id: $workflow_id, description: $description, workflow_list:apoc.coll.sort($workflow_list)}]->(cc_to)
ON MATCH SET r.data_ids = apoc.coll.toSet(r.data_ids + $data_ids)
ON CREATE SET r.data_ids = $data_ids
"""
session.run(query, from_component_id=from_component_id, to_component_id=to_component_id, workflow_id=workflow_id,
description=description, data_ids=data_ids, workflow_list=workflow_list)
def create_sequential_indirect_flow(session: Session, from_component_id: str, to_component_id: str,
workflow_id: str, data_ids: str, workflow_list: list):
"""
Creates an SEQUENTIAL_INDIRECT_FLOW relationship between two CalculationComponent nodes.
A -(SEQUENTIAL_INDIRECT_FLOW)-> B
- Case: if C calls both and B passing an output value from A to B
This happens when workflow C calls both A and B and an output of A is used as an input parameter of B
"""
description = "Output of step (source) used as input for step (target)"
query = """
MATCH (cc_from:CalculationComponent {component_id: $from_component_id})
MATCH (cc_to:CalculationComponent {component_id: $to_component_id})
MERGE (cc_from)-[r:SEQUENTIAL_INDIRECT_FLOW
{workflow_id: $workflow_id, description: $description, workflow_list:apoc.coll.sort($workflow_list)}]->(cc_to)
ON MATCH SET r.data_ids = apoc.coll.toSet(r.data_ids + $data_ids)
ON CREATE SET r.data_ids = $data_ids
"""
session.run(query, from_component_id=from_component_id, to_component_id=to_component_id, workflow_id=workflow_id,
description=description, data_ids=data_ids, workflow_list=workflow_list)
\ No newline at end of file
from neo4j import Session
def get_outer_workflow_ids(session: Session, component_id: str):
query = """
MATCH (c:CalculationComponent {component_id: $component_id})-[e]-()
RETURN collect(distinct e.workflow_id) AS workflows
"""
workflows = set(session.run(query, component_id=component_id).single()["workflows"])
return workflows
def get_all_component_ids(session: Session):
"""Fetches all component_ids of CalculationComponent nodes."""
query = """
MATCH (c:CalculationComponent)
RETURN c.component_id AS component_id
"""
result = session.run(query)
return [record["component_id"] for record in result]
def get_all_workflow_ids(session: Session):
"""Fetches all component_ids of CalculationComponent nodes."""
query = """
MATCH (c:CalculationComponent)
WHERE c.e = "Workflow"
RETURN c.component_id AS component_id
"""
result = session.run(query)
return [record["component_id"] for record in result]
def get_indirect_flow_connections(session: Session, component_id: str, workflow_id: str):
query = """
MATCH (c1:CalculationComponent2 {component_id: $component_id})-[r:INDIRECT_FLOW {workflow_id: $workflow_id}]->(c2:CalculationComponent2)
RETURN c2.component_id AS next_component_id, elementId(r) AS edge_id
"""
result = session.run(query, component_id=component_id, workflow_id=workflow_id)
return result
def get_sequential_indirect_flow_connections(session: Session, component_id: str, workflow_id: str):
query = """
MATCH (c1:CalculationComponent2 {component_id: $component_id})-[r:SEQUENTIAL_INDIRECT_FLOW {workflow_id: $workflow_id}]->(c2:CalculationComponent2)
RETURN c2.component_id AS next_component_id, elementId(r) AS edge_id
"""
result = session.run(query, component_id=component_id, workflow_id=workflow_id)
return result
def fetch_calculation_component_fan_data(session: Session):
"""Queries Neo4j for CalculationComponent fan-in, fan-out, and fan-out set."""
query = """
MATCH (cc:CalculationComponent)
OPTIONAL MATCH (cc)<-[r_in]-(target_in) // Count all incoming relationships
OPTIONAL MATCH (cc)-[r_out]->(target_out) // Count all outgoing relationships & get targets
RETURN cc.component_id AS component_id,
COUNT(DISTINCT r_in) AS fan_in,
COUNT(DISTINCT r_out) AS fan_out,
COLLECT(DISTINCT target_in.component_id) AS fan_in_set,
COLLECT(DISTINCT target_out.component_id) AS fan_out_set
"""
result = session.run(query)
return [{"component_id": record["component_id"],
"fan_in": record["fan_in"],
"fan_out": record["fan_out"],
"fan_in_set": record["fan_in_set"],
"fan_out_set": record["fan_out_set"]}
for record in result]
\ No newline at end of file
from neo4j import Driver from neo4j import Driver
from neo4j_dependency_queries.create_node_queries import ensure_component_node from neo4j_graph_queries.create_node_queries import ensure_component_node
from neo4j_dependency_queries.utils import clean_component_id from neo4j_graph_queries.utils import clean_component_id
def create_in_param_relationship(driver: Driver, prefixed_component_id: str, parameter_internal_id: int, parameter_id: str) -> tuple[str,str]: def create_in_param_relationship(driver: Driver, prefixed_component_id: str, parameter_internal_id: int, parameter_id: str) -> tuple[str,str]:
""" """
......
import json import json
from neo4j import Driver from neo4j import Driver
from neo4j_dependency_queries.utils import clean_component_id from neo4j_graph_queries.utils import clean_component_id
def ensure_component_node(driver: Driver, prefixed_component_id: str) -> tuple[int,str]: def ensure_component_node(driver: Driver, prefixed_component_id: str) -> tuple[int,str]:
""" """
......
from neo4j import Session from neo4j import Session
from neo4j_dependency_queries.utils import clean_component_id from neo4j_graph_queries.utils import clean_component_id
def get_node_details(session: Session, node_id: str): def get_node_details(session: Session, node_id: str):
......
File moved
...@@ -4,7 +4,7 @@ from collections import defaultdict ...@@ -4,7 +4,7 @@ from collections import defaultdict
import pandas as pd import pandas as pd
from graph_creation.utils import resolve_relative_path from graph_creation.utils import resolve_relative_path
from neo4j_dependency_queries.utils import clean_component_id from neo4j_graph_queries.utils import clean_component_id
def get_cwl_change_history(repo_path): def get_cwl_change_history(repo_path):
""" """
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment