Select Git revision
-
Hannes Feldt authoredHannes Feldt authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
FlowCalculation.py 14.71 KiB
import pprint
from neo4j import Driver, GraphDatabase, Session
from collections import deque
import json
import copy
from graph_traversal.utils import append_paths_entry, current_stack_structure_processed, perform_topological_sort
from neo4j_graph_queries.processing_queries import get_all_in_parameter_nodes_of_entity, get_node_details, get_valid_connections
from neo4j_graph_queries.utils import clean_component_id
from queue import Queue
class FlowCalculation:
"""
This class calculates all possible flow paths between two components and stores them in a JSON file named `flow_paths.json`.
To execute the calculation, call the `perform_flow_path_calculation` method.
The generated JSON is structured as a nested dictionary, where `dictionary[id1][id2]` contains a list of all paths
from the component with ID `id1` to the component with ID `id2`.
Each path is represented as a tuple `(id3, distance)`, where:
- `id3` is the ID of the component in whose context the path was identified.
- `distance` is the number of edges in the path from `id1` to `id2`.
"""
def __init__(self, uri, auth):
self.driver = GraphDatabase.driver(uri, auth=auth)
def __init__(self, driver: Driver):
self.driver = driver
def close(self):
"""Closes the Neo4j database connection."""
self.driver.close()
def perform_flow_path_calculation(self):
"""
This method performs a topological sort of components, then traverses the graph
to compute flow paths, storing the results in a JSON file (`flow_paths.json`).
### Data Structures:
- **`paths: dict[str, dict[str, list]]`**
- A nested dictionary where:
- The first key (`str`) represents the source component ID.
- The second key (`str`) represents the target component ID.
- The value (`list`) contains all possible paths from the source to the target.
- This dictionary is converted into the JSON file
- **`bookkeeping: dict[int, list[tuple[list, list]]]`**
- A dictionary where:
- The key (`int`) is the **Neo4j internal node ID**.
- The value (`list[tuple[list, list]]`) stores tuples of:
- **`component_stack (list)`**: The workflows being traversed when the node was encountered.
- The **leftmost element** is the **outermost workflow**.
- **`step_stack (list)`**: The workflow steps being traversed when the node was encountered.
- This dictionary is used to avoid redundant calculations
"""
with self.driver.session() as session:
# Perform topological sorting to determine traversal order
sorted_components = perform_topological_sort(session)
bookkeeping = {}
paths: dict[str, dict[str, list]] = {}
workflow_ids = sorted_components
for workflow in workflow_ids:
print(f"Preprocessing: {workflow}")
self.bsf_traverse_paths_change_impact(session, workflow, bookkeeping, paths)
with open("flow_paths.json", "w") as json_file:
json.dump(paths, json_file, indent=4)
def process_sequential_flows_to_component(self, component_id: str, depth: int, last_seen: dict[str, int], outer_workflows: list,
paths: dict[str, dict[str, list]]):
"""
Processes sequential flow paths leading to the specified component and updates the paths dictionary.
This method iterates through previously seen components (`last_seen`), where the keys represent encountered
component IDs and the values indicate the depth at which they were encountered. It calculates the distance
from each of these components to `component_id`.
If a seen component was encountered at a greater depth than an outer workflow component, a new path entry
from the seen component to the current component is added in the context of the outer workflow.
Parameters:
component_id (str): The target component for which flow paths are being processed.
depth (int): The current depth in the traversal.
last_seen (dict): A dictionary mapping component IDs to the depth at which they were last encountered.
outer_workflows (list): A list of outer workflow component IDs.
paths (dict): A nested dictionary storing discovered flow paths.
Updates:
- Adds new entries to `paths` in the format: `paths[seen_id][component_id] = (outer_component_id, flow_type, distance)`.
"""
# Iterate through previously seen components and their recorded depths
for seen_id, depth_seen in last_seen.items():
# Skip the target component itself and outer workflows
if seen_id != component_id and seen_id not in outer_workflows:
# Calculate the distance from the seen component to the target componen
distance = depth - depth_seen
# Iterate through outer workflow components to determine the right context(s) of the path
for outer_component_id in outer_workflows:
# Ensure the seen component was encountered at a greater depth than the outer workflow component
if depth_seen > last_seen[outer_component_id]:
if distance == 1:
flow_type = "Sequential"
else:
flow_type = "Transitive"
append_paths_entry(seen_id, component_id, tuple([outer_component_id, flow_type, distance]), paths)
def process_direct_indirect_flow_of_node_id(self, node_id: int, component_id: str, outer_workflows: list,
component_stack: deque, step_stack: deque,
bookkeeping: dict[str, list[tuple[list, list]]],
paths: dict[str, dict[str, list]], direct: bool):
"""
Processes the direct or indirect flow of a given node within the outer workflows.
This function iterates through the outer workflows and establishes a direct flow
from the outer workflow to the component or an indirect flow from the component to
the outer workflow.
If the node has already been processed as a member of an outer workflow in the context of the same step(s),
it skips redundant processing.
Parameters:
node_id (str): The unique identifier of the node being processed.
component_id (str): The identifier of the component currently being processed.
outer_workflows (list): A list of component IDs representing outer workflows.
component_stack (deque): A stack maintaining the sequence of outer components encountered.
step_stack (deque): A stack maintaining the sequence of outer steps taken.
bookkeeping (dict): A record of previously processed nodes to prevent redundant computations.
paths (dict): A dictionary storing established connections between components.
direct (bool): Whether to create a direct or indirect flow.
Updates:
- Adds new entries to `paths` in the format: `paths[seen_id][component_id] = (outer_component_id, flow_type, distance)`.
"""
for index, outer_workflow_id in enumerate(outer_workflows):
# Skip if the outer component is the same as the current component
if component_id != outer_workflow_id:
# Check if the node has already been processed
if node_id in bookkeeping:
# Extract the nested components and steps relevant to the current workflow depth
nested_components = list(component_stack)[-len(outer_workflows) + index: ]
nested_steps = list(step_stack)[-len(outer_workflows) + index: ]
# Skip processing if the current stack structure has already been handled
# This avoids e.g. that a workflow A that sends one data item step Y
# is wrongly shown to have multiple outgoing flows to Y because of nesting
if current_stack_structure_processed(bookkeeping, node_id, nested_components, nested_steps):
continue
if direct:
entry = (outer_workflow_id, "Direct", 1)
append_paths_entry(outer_workflow_id, component_id, entry, paths)
else:
entry = (outer_workflow_id, "Indirect", 1)
append_paths_entry(component_id, outer_workflow_id, entry, paths)
def bsf_traverse_paths_change_impact(self, session: Session, component_id, bookkeeping, paths):
start_component_id = clean_component_id(component_id)
# Find all "InParameter" nodes associated with the component
result = get_all_in_parameter_nodes_of_entity(session, start_component_id)
start_nodes = [record["nodeId"] for record in result]
bfs_queue = Queue()
for node in start_nodes:
node_details = {
"node_id": node,
"component_stack": deque([]),
"step_stack": deque([]),
"depth": 0,
"last_seen": {}
}
bfs_queue.put(node_details)
while not bfs_queue.empty():
node_details: dict = bfs_queue.get()
node_id: int = node_details["node_id"]
component_stack: deque = node_details["component_stack"]
step_stack: deque = node_details["step_stack"]
depth: int = node_details["depth"]
last_seen: dict = node_details["last_seen"]
component_id, current_node_labels, component_type = get_node_details(session, str(node_id))
# If an InParameter node has a new component_id, enter this component
if "InParameter" in current_node_labels:
component_stack.append((component_id, component_type))
print(f"entering {component_id}")
# The first In-Parameter belongs to the outer workflow and gets depth 0
last_seen[component_id] = depth
# Extract list of outer workflows (leftmost = outermost)
outer_workflows = [workflow[0] for workflow in component_stack if workflow[1] == "Workflow"]
# Process sequential and direct flows
self.process_sequential_flows_to_component(component_id, depth, last_seen, outer_workflows, paths)
self.process_direct_indirect_flow_of_node_id(node_id, component_id, outer_workflows, component_stack,
step_stack, bookkeeping, paths, direct=True)
# Increment depth as we move deeper into the traversal, unless we just entered a workflow
if component_type != "Workflow":
depth = depth + 1
# If the stack is empty, return early
if not component_stack: continue
# Exit component when an OutParameter is found
if "OutParameter" in current_node_labels:
component_stack.pop()
# Process indirect flows
outer_workflows = [workflow[0] for workflow in component_stack if workflow[1] == "Workflow"]
self.process_direct_indirect_flow_of_node_id(node_id, component_id, outer_workflows, component_stack, step_stack,
bookkeeping, paths, direct=False)
if component_type == "Workflow":
# When we exit a workflow, the workflow needs to be at
# the same depth as its last step
last_seen[component_id] = depth - 1
# If the stack is empty after popping, return early
if not component_stack: continue
# Convert current stacks into list representations for bookkeeping
current_cs = list(component_stack)
current_ss = list(step_stack)
# Check if the current node has been encountered before
if node_id in bookkeeping:
# If the (sub)path structure has already been processed under the same conditions, exit early
if current_stack_structure_processed(bookkeeping, node_id, current_cs, current_ss):
continue
# Otherwise, update bookkeeping with the new state
bookkeeping[node_id].append((current_cs, current_ss))
else:
# Initialize a new entry in bookkeeping for this node
bookkeeping[node_id] = [(current_cs, current_ss)]
# Find valid connections based on component type
results = list()
if component_stack[-1][1] == "Workflow" and step_stack and "InParameter" not in current_node_labels:
# If inside a workflow and transitioning between steps, use both the top componet_id and top step_id in the stacks
results = get_valid_connections(session, node_id, component_stack[-1][0], step_stack[-1])
step_stack.pop()
else:
# Otherwise, retrieve valid connections only based on the top component_id in the component_stack
results = get_valid_connections(session, node_id, component_stack[-1][0])
# Extract next node IDs and step IDs from query results
records = [ (record["nextNodeId"], record["stepId"]) for record in results ]
# Recursively process each valid connection
for record in records:
next_node_id = record[0]
step_id = record[1]
# Create deep copies to ensure traversal states are independent
new_component_stack = copy.deepcopy(component_stack)
new_step_stack = copy.deepcopy(step_stack)
new_last_seen = copy.deepcopy(last_seen)
new_depth = copy.deepcopy(depth)
# If a step ID exists, push it onto the step stack
if step_id != "":
new_step_stack.append(step_id)
next_node_details = {
"node_id": next_node_id,
"component_stack": new_component_stack,
"step_stack": new_step_stack,
"depth": new_depth,
"last_seen": new_last_seen
}
bfs_queue.put(next_node_details)