diff --git a/graph_analysis/metric_calculations/ChangeImpact.py b/graph_analysis/metric_calculations/ChangeImpact.py index 48763066ac9b649ad7d81379dd0fc62f329e3d66..0a44e6e638152ee60d8685f171e7c51949823802 100644 --- a/graph_analysis/metric_calculations/ChangeImpact.py +++ b/graph_analysis/metric_calculations/ChangeImpact.py @@ -41,22 +41,22 @@ class ChangeImpact: return parts1[:2] == parts2[:2] - def complete_path_analysis(self, paths: dict[str, dict[str, list]]): + def complete_info_flow_analysis(self, info_flows: dict[str, dict[str, list]]): """ - Analyzes the change impact between components based on the flow paths and calculates the coupling score. + Analyzes the change impact between components based on the infomation flows and calculates the coupling score. This method calculates the coupling score between components based on the distances between them - in the flow paths, considering both direct and indirect connections. + in the information flows, considering both direct and indirect connections. The method generates a matrix where each cell (i, j) represents the coupling score between components i and j. The matrix is then saved to a CSV file named `change_impact_analysis.csv`. Parameters: - paths (dict[str, dict[str, list]]): A nested dictionary where the first level keys represent + info_flows (dict[str, dict[str, list]]): A nested dictionary where the first level keys represent component IDs, and the second level keys represent target component IDs. - The values are lists of paths from the source to the target. - Each path in `paths[source_id][target_id]` is represented as a tuple `(context_id, distance)`, where: - - `context_id` is the ID of the component in whose context the path was identified. - - `distance` is the number of edges in the path from source to target. + The values are lists of info flows from the source to the target. + Each path in `info_flows[source_id][target_id]` is represented as a tuple `(context_id, distance)`, where: + - `context_id` is the ID of the component in whose context the information flow was identified. + - `distance` is 1 for direct/indirect/sequential flows, and 1 + # of intermediate step from source to target. Returns: pd.DataFrame: A DataFrame representing the coupling score matrix between all components. @@ -67,11 +67,12 @@ class ChangeImpact: `coupling_score = Σ (N_l /l)` for all distinct path distances `l > 0`, where: - `N_l` is the frequency (count) of paths with distance `l`. - - `l` is the distance (number of edges) between the two components in the flow paths. + - `l` is 1 + number of intermediate steps before the data exchange happens. In other words, the coupling score is a weighted sum of path frequencies, where the weights are the inverses - of the distances raised to the power of the penalty. This formula gives more importance to shorter paths, while - longer paths are penalized according to the specified penalty. + of the distances raised to the power of the penalty. This formula gives more importance to direct data exchanges between + components (which happen in direct, indirect, and sequential flows), while data exchanges that have intermediate steps + are penalized according to the specified penalty. """ component_ids = get_all_component_ids(self.driver.session()) sorted_component_ids = sorted([ id for id in component_ids if "example" not in id]) @@ -87,8 +88,8 @@ class ChangeImpact: if not self.have_same_repo_prefix(component_id_1, component_id_2): continue # Check if paths exist from each component - paths_from_1 = component_id_1 in paths - paths_from_2 = component_id_2 in paths + paths_from_1 = component_id_1 in info_flows + paths_from_2 = component_id_2 in info_flows # Skip if no paths are available for either component if not paths_from_1 and not paths_from_2: continue @@ -97,12 +98,12 @@ class ChangeImpact: all_paths = list() # Add paths from component_id_1 to component_id_2 - if paths_from_1 and component_id_2 in paths[component_id_1]: - all_paths.extend(paths[component_id_1][component_id_2]) + if paths_from_1 and component_id_2 in info_flows[component_id_1]: + all_paths.extend(info_flows[component_id_1][component_id_2]) # Add paths from component_id_2 to component_id_1 - if paths_from_2 and component_id_1 in paths[component_id_2]: - all_paths.extend(paths[component_id_2][component_id_1]) + if paths_from_2 and component_id_1 in info_flows[component_id_2]: + all_paths.extend(info_flows[component_id_2][component_id_1]) # Extract distances from the paths distances = [path[2] for path in all_paths] diff --git a/graph_analysis/metric_calculations/FlowCalculation.py b/graph_analysis/metric_calculations/FlowCalculation.py index adcd700ba8b2154eb3c2acc433bec42f6b8377d7..babbc48d41cc88cb5f0d9b11114eb04c9096e711 100644 --- a/graph_analysis/metric_calculations/FlowCalculation.py +++ b/graph_analysis/metric_calculations/FlowCalculation.py @@ -3,7 +3,7 @@ from neo4j import Driver, GraphDatabase, Session from collections import deque import json import copy -from graph_analysis.utils import append_paths_entry, current_stack_structure_processed, perform_topological_sort +from graph_analysis.utils import append_info_flow_entry, current_stack_structure_processed, perform_topological_sort from neo4j_graph_queries.processing_queries import get_all_in_parameter_nodes_of_entity, get_node_details, get_valid_connections from neo4j_graph_queries.utils import clean_component_id, get_is_workflow_class from queue import Queue @@ -38,11 +38,11 @@ class FlowCalculation: to compute flow paths, storing the results in a JSON file (`flow_paths.json`). ### Data Structures: - - **`paths: dict[str, dict[str, list]]`** + - **`info_flows: dict[str, dict[str, list]]`** - A nested dictionary where: - The first key (`str`) represents the source component ID. - The second key (`str`) represents the target component ID. - - The value (`list`) contains all possible paths from the source to the target. + - The value (`list`) contains all possible information flows from the source to the target. - This dictionary is converted into the JSON file - **`bookkeeping: dict[int, list[tuple[list, list]]]`** @@ -59,18 +59,18 @@ class FlowCalculation: sorted_components = perform_topological_sort(session) bookkeeping = {} - paths: dict[str, dict[str, list]] = {} + info_flows: dict[str, dict[str, list]] = {} workflow_ids = sorted_components for workflow in workflow_ids: print(f"Preprocessing: {workflow}") - self.bsf_traverse_paths_change_impact(session, workflow, bookkeeping, paths) + self.bsf_traverse_paths_change_impact(session, workflow, bookkeeping, info_flows) with open("flow_paths.json", "w") as json_file: - json.dump(paths, json_file, indent=4) + json.dump(info_flows, json_file, indent=4) def process_sequential_flows_to_component(self, component_id: str, depth: int, last_seen: dict[str, int], outer_workflows: list, - paths: dict[str, dict[str, list]]): + info_flows: dict[str, dict[str, list]]): """ - Processes sequential flow paths leading to the specified component and updates the paths dictionary. + Processes sequential and transitive flows leading to the specified component and updates the info_flows dictionary. This method iterates through previously seen components (`last_seen`), where the keys represent encountered component IDs and the values indicate the depth at which they were encountered. It calculates the distance @@ -80,11 +80,11 @@ class FlowCalculation: from the seen component to the current component is added in the context of the outer workflow. Parameters: - component_id (str): The target component for which flow paths are being processed. + component_id (str): The target component for which flows are being processed. depth (int): The current depth in the traversal. last_seen (dict): A dictionary mapping component IDs to the depth at which they were last encountered. outer_workflows (list): A list of outer workflow component IDs. - paths (dict): A nested dictionary storing discovered flow paths. + info_flows (dict): A nested dictionary storing discovered information flows. Updates: - Adds new entries to `paths` in the format: `paths[seen_id][component_id] = (outer_component_id, flow_type, distance)`. @@ -104,12 +104,12 @@ class FlowCalculation: flow_type = "Sequential" else: flow_type = "Transitive" - append_paths_entry(seen_id, component_id, tuple([outer_component_id, flow_type, distance]), paths) + append_info_flow_entry(seen_id, component_id, tuple([outer_component_id, flow_type, distance]), info_flows) def process_direct_indirect_flow_of_node_id(self, node_id: int, component_id: str, outer_workflows: list, component_stack: deque, step_stack: deque, bookkeeping: dict[str, list[tuple[list, list]]], - paths: dict[str, dict[str, list]], direct: bool): + info_flows: dict[str, dict[str, list]], direct: bool): """ Processes the direct or indirect flow of a given node within the outer workflows. @@ -126,11 +126,11 @@ class FlowCalculation: component_stack (deque): A stack maintaining the sequence of outer components encountered. step_stack (deque): A stack maintaining the sequence of outer steps taken. bookkeeping (dict): A record of previously processed nodes to prevent redundant computations. - paths (dict): A dictionary storing established connections between components. + info_flows (dict): A dictionary storing established connections between components. direct (bool): Whether to create a direct or indirect flow. Updates: - - Adds new entries to `paths` in the format: `paths[seen_id][component_id] = (outer_component_id, flow_type, distance)`. + - Adds new entries to `info_flows` in the format: `info_flows[seen_id][component_id] = (outer_component_id, flow_type, distance)`. """ for index, outer_workflow_id in enumerate(outer_workflows): @@ -150,13 +150,13 @@ class FlowCalculation: if direct: entry = (outer_workflow_id, "Direct", 1) - append_paths_entry(outer_workflow_id, component_id, entry, paths) + append_info_flow_entry(outer_workflow_id, component_id, entry, info_flows) else: entry = (outer_workflow_id, "Indirect", 1) - append_paths_entry(component_id, outer_workflow_id, entry, paths) + append_info_flow_entry(component_id, outer_workflow_id, entry, info_flows) - def bsf_traverse_paths_change_impact(self, session: Session, component_id, bookkeeping, paths): + def bsf_traverse_paths_change_impact(self, session: Session, component_id, bookkeeping, info_flows): start_component_id = clean_component_id(component_id) # Find all "InParameter" nodes associated with the component @@ -196,9 +196,9 @@ class FlowCalculation: # Extract list of outer workflows (leftmost = outermost) outer_workflows = [workflow[0] for workflow in component_stack if get_is_workflow_class(workflow[1])] # Process sequential and direct flows - self.process_sequential_flows_to_component(component_id, depth, last_seen, outer_workflows, paths) + self.process_sequential_flows_to_component(component_id, depth, last_seen, outer_workflows, info_flows) self.process_direct_indirect_flow_of_node_id(node_id, component_id, outer_workflows, component_stack, - step_stack, bookkeeping, paths, direct=True) + step_stack, bookkeeping, info_flows, direct=True) # Increment depth as we move deeper into the traversal, unless we just entered a workflow if not get_is_workflow_class(component_type): @@ -213,7 +213,7 @@ class FlowCalculation: # Process indirect flows outer_workflows = [workflow[0] for workflow in component_stack if get_is_workflow_class(workflow[1])] self.process_direct_indirect_flow_of_node_id(node_id, component_id, outer_workflows, component_stack, step_stack, - bookkeeping, paths, direct=False) + bookkeeping, info_flows, direct=False) if get_is_workflow_class(component_type): # When we exit a workflow, the workflow needs to be at # the same depth as its last step diff --git a/graph_analysis/utils.py b/graph_analysis/utils.py index e8318352b782c65b46fdf4d21b1ed9066e057a93..648957c141b789dd19689c44b0c5e163a10a90bf 100644 --- a/graph_analysis/utils.py +++ b/graph_analysis/utils.py @@ -1,11 +1,8 @@ -from pathlib import Path from neo4j import Session -from neo4j_graph_queries.processing_queries import count_nodes_and_edges, get_all_workflow_ids, get_data_flow_relationships_for_sorting +from neo4j_graph_queries.processing_queries import get_all_workflow_ids, get_data_flow_relationships_for_sorting import networkx as nx -from neo4j_graph_queries.utils import clean_component_id - -def append_paths_entry(id1: str, id2: str, entry: tuple[str, int], paths: dict[str, dict[str, list]]) -> None: +def append_info_flow_entry(id1: str, id2: str, entry: tuple[str, int], info_flows: dict[str, dict[str, list]]) -> None: """ Adds an entry to the paths dictionary, ensuring necessary keys exist. @@ -15,11 +12,11 @@ def append_paths_entry(id1: str, id2: str, entry: tuple[str, int], paths: dict[s entry (tuple[str, int]): The entry to append, consisting of a string and an integer. paths (dict[str, dict[str, list]]): The dictionary storing path entries. """ - if id1 not in paths: - paths[id1] = dict() - if id2 not in paths[id1]: - paths[id1][id2] = list() - paths[id1][id2].append(entry) + if id1 not in info_flows: + info_flows[id1] = dict() + if id2 not in info_flows[id1]: + info_flows[id1][id2] = list() + info_flows[id1][id2].append(entry) def is_substack(inner_stack: list, outer_stack: list) -> bool: """