Skip to content
Snippets Groups Projects
Commit 0c800e78 authored by Chiara Liotta's avatar Chiara Liotta
Browse files

change naming from paths to info flows

parent f0002530
No related branches found
No related tags found
No related merge requests found
...@@ -41,22 +41,22 @@ class ChangeImpact: ...@@ -41,22 +41,22 @@ class ChangeImpact:
return parts1[:2] == parts2[:2] return parts1[:2] == parts2[:2]
def complete_path_analysis(self, paths: dict[str, dict[str, list]]): def complete_info_flow_analysis(self, info_flows: dict[str, dict[str, list]]):
""" """
Analyzes the change impact between components based on the flow paths and calculates the coupling score. Analyzes the change impact between components based on the infomation flows and calculates the coupling score.
This method calculates the coupling score between components based on the distances between them This method calculates the coupling score between components based on the distances between them
in the flow paths, considering both direct and indirect connections. in the information flows, considering both direct and indirect connections.
The method generates a matrix where each cell (i, j) represents the coupling score between The method generates a matrix where each cell (i, j) represents the coupling score between
components i and j. The matrix is then saved to a CSV file named `change_impact_analysis.csv`. components i and j. The matrix is then saved to a CSV file named `change_impact_analysis.csv`.
Parameters: Parameters:
paths (dict[str, dict[str, list]]): A nested dictionary where the first level keys represent info_flows (dict[str, dict[str, list]]): A nested dictionary where the first level keys represent
component IDs, and the second level keys represent target component IDs. component IDs, and the second level keys represent target component IDs.
The values are lists of paths from the source to the target. The values are lists of info flows from the source to the target.
Each path in `paths[source_id][target_id]` is represented as a tuple `(context_id, distance)`, where: Each path in `info_flows[source_id][target_id]` is represented as a tuple `(context_id, distance)`, where:
- `context_id` is the ID of the component in whose context the path was identified. - `context_id` is the ID of the component in whose context the information flow was identified.
- `distance` is the number of edges in the path from source to target. - `distance` is 1 for direct/indirect/sequential flows, and 1 + # of intermediate step from source to target.
Returns: Returns:
pd.DataFrame: A DataFrame representing the coupling score matrix between all components. pd.DataFrame: A DataFrame representing the coupling score matrix between all components.
...@@ -67,11 +67,12 @@ class ChangeImpact: ...@@ -67,11 +67,12 @@ class ChangeImpact:
`coupling_score = Σ (N_l /l)` for all distinct path distances `l > 0`, where: `coupling_score = Σ (N_l /l)` for all distinct path distances `l > 0`, where:
- `N_l` is the frequency (count) of paths with distance `l`. - `N_l` is the frequency (count) of paths with distance `l`.
- `l` is the distance (number of edges) between the two components in the flow paths. - `l` is 1 + number of intermediate steps before the data exchange happens.
In other words, the coupling score is a weighted sum of path frequencies, where the weights are the inverses In other words, the coupling score is a weighted sum of path frequencies, where the weights are the inverses
of the distances raised to the power of the penalty. This formula gives more importance to shorter paths, while of the distances raised to the power of the penalty. This formula gives more importance to direct data exchanges between
longer paths are penalized according to the specified penalty. components (which happen in direct, indirect, and sequential flows), while data exchanges that have intermediate steps
are penalized according to the specified penalty.
""" """
component_ids = get_all_component_ids(self.driver.session()) component_ids = get_all_component_ids(self.driver.session())
sorted_component_ids = sorted([ id for id in component_ids if "example" not in id]) sorted_component_ids = sorted([ id for id in component_ids if "example" not in id])
...@@ -87,8 +88,8 @@ class ChangeImpact: ...@@ -87,8 +88,8 @@ class ChangeImpact:
if not self.have_same_repo_prefix(component_id_1, component_id_2): continue if not self.have_same_repo_prefix(component_id_1, component_id_2): continue
# Check if paths exist from each component # Check if paths exist from each component
paths_from_1 = component_id_1 in paths paths_from_1 = component_id_1 in info_flows
paths_from_2 = component_id_2 in paths paths_from_2 = component_id_2 in info_flows
# Skip if no paths are available for either component # Skip if no paths are available for either component
if not paths_from_1 and not paths_from_2: continue if not paths_from_1 and not paths_from_2: continue
...@@ -97,12 +98,12 @@ class ChangeImpact: ...@@ -97,12 +98,12 @@ class ChangeImpact:
all_paths = list() all_paths = list()
# Add paths from component_id_1 to component_id_2 # Add paths from component_id_1 to component_id_2
if paths_from_1 and component_id_2 in paths[component_id_1]: if paths_from_1 and component_id_2 in info_flows[component_id_1]:
all_paths.extend(paths[component_id_1][component_id_2]) all_paths.extend(info_flows[component_id_1][component_id_2])
# Add paths from component_id_2 to component_id_1 # Add paths from component_id_2 to component_id_1
if paths_from_2 and component_id_1 in paths[component_id_2]: if paths_from_2 and component_id_1 in info_flows[component_id_2]:
all_paths.extend(paths[component_id_2][component_id_1]) all_paths.extend(info_flows[component_id_2][component_id_1])
# Extract distances from the paths # Extract distances from the paths
distances = [path[2] for path in all_paths] distances = [path[2] for path in all_paths]
......
...@@ -3,7 +3,7 @@ from neo4j import Driver, GraphDatabase, Session ...@@ -3,7 +3,7 @@ from neo4j import Driver, GraphDatabase, Session
from collections import deque from collections import deque
import json import json
import copy import copy
from graph_analysis.utils import append_paths_entry, current_stack_structure_processed, perform_topological_sort from graph_analysis.utils import append_info_flow_entry, current_stack_structure_processed, perform_topological_sort
from neo4j_graph_queries.processing_queries import get_all_in_parameter_nodes_of_entity, get_node_details, get_valid_connections from neo4j_graph_queries.processing_queries import get_all_in_parameter_nodes_of_entity, get_node_details, get_valid_connections
from neo4j_graph_queries.utils import clean_component_id, get_is_workflow_class from neo4j_graph_queries.utils import clean_component_id, get_is_workflow_class
from queue import Queue from queue import Queue
...@@ -38,11 +38,11 @@ class FlowCalculation: ...@@ -38,11 +38,11 @@ class FlowCalculation:
to compute flow paths, storing the results in a JSON file (`flow_paths.json`). to compute flow paths, storing the results in a JSON file (`flow_paths.json`).
### Data Structures: ### Data Structures:
- **`paths: dict[str, dict[str, list]]`** - **`info_flows: dict[str, dict[str, list]]`**
- A nested dictionary where: - A nested dictionary where:
- The first key (`str`) represents the source component ID. - The first key (`str`) represents the source component ID.
- The second key (`str`) represents the target component ID. - The second key (`str`) represents the target component ID.
- The value (`list`) contains all possible paths from the source to the target. - The value (`list`) contains all possible information flows from the source to the target.
- This dictionary is converted into the JSON file - This dictionary is converted into the JSON file
- **`bookkeeping: dict[int, list[tuple[list, list]]]`** - **`bookkeeping: dict[int, list[tuple[list, list]]]`**
...@@ -59,18 +59,18 @@ class FlowCalculation: ...@@ -59,18 +59,18 @@ class FlowCalculation:
sorted_components = perform_topological_sort(session) sorted_components = perform_topological_sort(session)
bookkeeping = {} bookkeeping = {}
paths: dict[str, dict[str, list]] = {} info_flows: dict[str, dict[str, list]] = {}
workflow_ids = sorted_components workflow_ids = sorted_components
for workflow in workflow_ids: for workflow in workflow_ids:
print(f"Preprocessing: {workflow}") print(f"Preprocessing: {workflow}")
self.bsf_traverse_paths_change_impact(session, workflow, bookkeeping, paths) self.bsf_traverse_paths_change_impact(session, workflow, bookkeeping, info_flows)
with open("flow_paths.json", "w") as json_file: with open("flow_paths.json", "w") as json_file:
json.dump(paths, json_file, indent=4) json.dump(info_flows, json_file, indent=4)
def process_sequential_flows_to_component(self, component_id: str, depth: int, last_seen: dict[str, int], outer_workflows: list, def process_sequential_flows_to_component(self, component_id: str, depth: int, last_seen: dict[str, int], outer_workflows: list,
paths: dict[str, dict[str, list]]): info_flows: dict[str, dict[str, list]]):
""" """
Processes sequential flow paths leading to the specified component and updates the paths dictionary. Processes sequential and transitive flows leading to the specified component and updates the info_flows dictionary.
This method iterates through previously seen components (`last_seen`), where the keys represent encountered This method iterates through previously seen components (`last_seen`), where the keys represent encountered
component IDs and the values indicate the depth at which they were encountered. It calculates the distance component IDs and the values indicate the depth at which they were encountered. It calculates the distance
...@@ -80,11 +80,11 @@ class FlowCalculation: ...@@ -80,11 +80,11 @@ class FlowCalculation:
from the seen component to the current component is added in the context of the outer workflow. from the seen component to the current component is added in the context of the outer workflow.
Parameters: Parameters:
component_id (str): The target component for which flow paths are being processed. component_id (str): The target component for which flows are being processed.
depth (int): The current depth in the traversal. depth (int): The current depth in the traversal.
last_seen (dict): A dictionary mapping component IDs to the depth at which they were last encountered. last_seen (dict): A dictionary mapping component IDs to the depth at which they were last encountered.
outer_workflows (list): A list of outer workflow component IDs. outer_workflows (list): A list of outer workflow component IDs.
paths (dict): A nested dictionary storing discovered flow paths. info_flows (dict): A nested dictionary storing discovered information flows.
Updates: Updates:
- Adds new entries to `paths` in the format: `paths[seen_id][component_id] = (outer_component_id, flow_type, distance)`. - Adds new entries to `paths` in the format: `paths[seen_id][component_id] = (outer_component_id, flow_type, distance)`.
...@@ -104,12 +104,12 @@ class FlowCalculation: ...@@ -104,12 +104,12 @@ class FlowCalculation:
flow_type = "Sequential" flow_type = "Sequential"
else: else:
flow_type = "Transitive" flow_type = "Transitive"
append_paths_entry(seen_id, component_id, tuple([outer_component_id, flow_type, distance]), paths) append_info_flow_entry(seen_id, component_id, tuple([outer_component_id, flow_type, distance]), info_flows)
def process_direct_indirect_flow_of_node_id(self, node_id: int, component_id: str, outer_workflows: list, def process_direct_indirect_flow_of_node_id(self, node_id: int, component_id: str, outer_workflows: list,
component_stack: deque, step_stack: deque, component_stack: deque, step_stack: deque,
bookkeeping: dict[str, list[tuple[list, list]]], bookkeeping: dict[str, list[tuple[list, list]]],
paths: dict[str, dict[str, list]], direct: bool): info_flows: dict[str, dict[str, list]], direct: bool):
""" """
Processes the direct or indirect flow of a given node within the outer workflows. Processes the direct or indirect flow of a given node within the outer workflows.
...@@ -126,11 +126,11 @@ class FlowCalculation: ...@@ -126,11 +126,11 @@ class FlowCalculation:
component_stack (deque): A stack maintaining the sequence of outer components encountered. component_stack (deque): A stack maintaining the sequence of outer components encountered.
step_stack (deque): A stack maintaining the sequence of outer steps taken. step_stack (deque): A stack maintaining the sequence of outer steps taken.
bookkeeping (dict): A record of previously processed nodes to prevent redundant computations. bookkeeping (dict): A record of previously processed nodes to prevent redundant computations.
paths (dict): A dictionary storing established connections between components. info_flows (dict): A dictionary storing established connections between components.
direct (bool): Whether to create a direct or indirect flow. direct (bool): Whether to create a direct or indirect flow.
Updates: Updates:
- Adds new entries to `paths` in the format: `paths[seen_id][component_id] = (outer_component_id, flow_type, distance)`. - Adds new entries to `info_flows` in the format: `info_flows[seen_id][component_id] = (outer_component_id, flow_type, distance)`.
""" """
for index, outer_workflow_id in enumerate(outer_workflows): for index, outer_workflow_id in enumerate(outer_workflows):
...@@ -150,13 +150,13 @@ class FlowCalculation: ...@@ -150,13 +150,13 @@ class FlowCalculation:
if direct: if direct:
entry = (outer_workflow_id, "Direct", 1) entry = (outer_workflow_id, "Direct", 1)
append_paths_entry(outer_workflow_id, component_id, entry, paths) append_info_flow_entry(outer_workflow_id, component_id, entry, info_flows)
else: else:
entry = (outer_workflow_id, "Indirect", 1) entry = (outer_workflow_id, "Indirect", 1)
append_paths_entry(component_id, outer_workflow_id, entry, paths) append_info_flow_entry(component_id, outer_workflow_id, entry, info_flows)
def bsf_traverse_paths_change_impact(self, session: Session, component_id, bookkeeping, paths): def bsf_traverse_paths_change_impact(self, session: Session, component_id, bookkeeping, info_flows):
start_component_id = clean_component_id(component_id) start_component_id = clean_component_id(component_id)
# Find all "InParameter" nodes associated with the component # Find all "InParameter" nodes associated with the component
...@@ -196,9 +196,9 @@ class FlowCalculation: ...@@ -196,9 +196,9 @@ class FlowCalculation:
# Extract list of outer workflows (leftmost = outermost) # Extract list of outer workflows (leftmost = outermost)
outer_workflows = [workflow[0] for workflow in component_stack if get_is_workflow_class(workflow[1])] outer_workflows = [workflow[0] for workflow in component_stack if get_is_workflow_class(workflow[1])]
# Process sequential and direct flows # Process sequential and direct flows
self.process_sequential_flows_to_component(component_id, depth, last_seen, outer_workflows, paths) self.process_sequential_flows_to_component(component_id, depth, last_seen, outer_workflows, info_flows)
self.process_direct_indirect_flow_of_node_id(node_id, component_id, outer_workflows, component_stack, self.process_direct_indirect_flow_of_node_id(node_id, component_id, outer_workflows, component_stack,
step_stack, bookkeeping, paths, direct=True) step_stack, bookkeeping, info_flows, direct=True)
# Increment depth as we move deeper into the traversal, unless we just entered a workflow # Increment depth as we move deeper into the traversal, unless we just entered a workflow
if not get_is_workflow_class(component_type): if not get_is_workflow_class(component_type):
...@@ -213,7 +213,7 @@ class FlowCalculation: ...@@ -213,7 +213,7 @@ class FlowCalculation:
# Process indirect flows # Process indirect flows
outer_workflows = [workflow[0] for workflow in component_stack if get_is_workflow_class(workflow[1])] outer_workflows = [workflow[0] for workflow in component_stack if get_is_workflow_class(workflow[1])]
self.process_direct_indirect_flow_of_node_id(node_id, component_id, outer_workflows, component_stack, step_stack, self.process_direct_indirect_flow_of_node_id(node_id, component_id, outer_workflows, component_stack, step_stack,
bookkeeping, paths, direct=False) bookkeeping, info_flows, direct=False)
if get_is_workflow_class(component_type): if get_is_workflow_class(component_type):
# When we exit a workflow, the workflow needs to be at # When we exit a workflow, the workflow needs to be at
# the same depth as its last step # the same depth as its last step
......
from pathlib import Path
from neo4j import Session from neo4j import Session
from neo4j_graph_queries.processing_queries import count_nodes_and_edges, get_all_workflow_ids, get_data_flow_relationships_for_sorting from neo4j_graph_queries.processing_queries import get_all_workflow_ids, get_data_flow_relationships_for_sorting
import networkx as nx import networkx as nx
from neo4j_graph_queries.utils import clean_component_id def append_info_flow_entry(id1: str, id2: str, entry: tuple[str, int], info_flows: dict[str, dict[str, list]]) -> None:
def append_paths_entry(id1: str, id2: str, entry: tuple[str, int], paths: dict[str, dict[str, list]]) -> None:
""" """
Adds an entry to the paths dictionary, ensuring necessary keys exist. Adds an entry to the paths dictionary, ensuring necessary keys exist.
...@@ -15,11 +12,11 @@ def append_paths_entry(id1: str, id2: str, entry: tuple[str, int], paths: dict[s ...@@ -15,11 +12,11 @@ def append_paths_entry(id1: str, id2: str, entry: tuple[str, int], paths: dict[s
entry (tuple[str, int]): The entry to append, consisting of a string and an integer. entry (tuple[str, int]): The entry to append, consisting of a string and an integer.
paths (dict[str, dict[str, list]]): The dictionary storing path entries. paths (dict[str, dict[str, list]]): The dictionary storing path entries.
""" """
if id1 not in paths: if id1 not in info_flows:
paths[id1] = dict() info_flows[id1] = dict()
if id2 not in paths[id1]: if id2 not in info_flows[id1]:
paths[id1][id2] = list() info_flows[id1][id2] = list()
paths[id1][id2].append(entry) info_flows[id1][id2].append(entry)
def is_substack(inner_stack: list, outer_stack: list) -> bool: def is_substack(inner_stack: list, outer_stack: list) -> bool:
""" """
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment