Skip to content
Snippets Groups Projects
Commit fcb9826f authored by Chiara Liotta's avatar Chiara Liotta
Browse files

some fixes in graph creation and flow calculation

parent 77a16e2d
No related branches found
No related tags found
No related merge requests found
from pathlib import Path from pathlib import Path
import re import re
from neo4j import Driver from neo4j import Driver
from graph_creation.utils import extract_js_expression_dependencies, get_input_source, process_control_dependencies, process_in_param, process_parameter_source from graph_creation.utils import extract_js_expression_dependencies, get_input_source, process_control_dependencies, process_in_param, process_output, process_parameter_source, process_step_lookup
from neo4j_graph_queries.create_node_queries import ensure_git_node, ensure_in_parameter_node, ensure_out_parameter_node from neo4j_graph_queries.create_node_queries import ensure_git_node, ensure_in_parameter_node, ensure_out_parameter_node
from neo4j_graph_queries.create_edge_queries import create_out_param_relationship, create_references_relationship from neo4j_graph_queries.create_edge_queries import create_out_param_relationship, create_references_relationship
from neo4j_graph_queries.utils import get_is_workflow from neo4j_graph_queries.utils import get_is_workflow
def process_cwl_entity(driver, entity):
is_workflow = get_is_workflow(entity)
steps = None
if is_workflow:
steps = process_step_lookup(entity)
process_cwl_inputs(driver, entity)
process_cwl_outputs(driver, entity, steps)
if steps:
process_cwl_steps(driver, entity, steps)
def process_cwl_inputs(driver: Driver, cwl_entity: dict) -> None: def process_cwl_inputs(driver: Driver, cwl_entity: dict) -> None:
""" """
Processes the inputs of a CWL entity, either as a list or a dictionary of inputs, Processes the inputs of a CWL entity, either as a list or a dictionary of inputs,
...@@ -29,8 +40,11 @@ def process_cwl_inputs(driver: Driver, cwl_entity: dict) -> None: ...@@ -29,8 +40,11 @@ def process_cwl_inputs(driver: Driver, cwl_entity: dict) -> None:
elif isinstance(cwl_entity['inputs'], dict): elif isinstance(cwl_entity['inputs'], dict):
# If 'inputs' is a dictionary, iterate over the keys (which are the input IDs) # If 'inputs' is a dictionary, iterate over the keys (which are the input IDs)
input_dict = cwl_entity['inputs'] input_dict = cwl_entity['inputs']
for key in input_dict.keys(): for key, value in input_dict.items():
process_in_param(driver, key, component_id, input_dict[key]['type'], cwl_entity['class']) if isinstance(value, dict):
process_in_param(driver, key, component_id, value['type'], cwl_entity['class'])
else:
process_in_param(driver, key, component_id, value, cwl_entity['class'])
def process_cwl_outputs(driver: Driver, cwl_entity: dict, step_lookup: dict) -> None: def process_cwl_outputs(driver: Driver, cwl_entity: dict, step_lookup: dict) -> None:
""" """
...@@ -58,27 +72,28 @@ def process_cwl_outputs(driver: Driver, cwl_entity: dict, step_lookup: dict) -> ...@@ -58,27 +72,28 @@ def process_cwl_outputs(driver: Driver, cwl_entity: dict, step_lookup: dict) ->
None None
""" """
component_id = cwl_entity['path'] component_id = cwl_entity['path']
outputs = cwl_entity['outputs']
if isinstance(outputs, list):
for output in cwl_entity['outputs']: for output in cwl_entity['outputs']:
if isinstance(output, dict): output_id = output['id']
# Create out-parameter node with the parameter ID as defined in the component output_source = None
# and component ID equal to the path of the componet
out_param_node = ensure_out_parameter_node(driver, output['id'], component_id, output["type"], cwl_entity['class'])
out_param_node_internal_id = out_param_node[0]
# If it's not a workflow, create a relationship between the component and the output parameter
is_worflow = get_is_workflow(cwl_entity)
if not is_worflow:
create_out_param_relationship(driver, component_id, out_param_node_internal_id, output['id'])
# If the output has an 'outputSource', process the relationship(s) to the source(s)
if 'outputSource' in output: if 'outputSource' in output:
# The output source can be a singular ID or a list of IDs output_source = output['outputSource']
if isinstance(output['outputSource'], str): process_output(driver, output_id, output['type'], component_id, cwl_entity['class'], step_lookup, output_source)
source_id = output['outputSource'] else:
process_parameter_source(driver, out_param_node_internal_id, source_id, component_id, step_lookup) process_output(driver, output_id, output['type'], component_id, cwl_entity['class'], step_lookup, output_source)
elif isinstance(output['outputSource'], list): elif isinstance(outputs, dict):
for source_id in output['outputSource']: for output_id, details in outputs.items():
process_parameter_source(driver, out_param_node_internal_id, source_id, component_id, step_lookup) if isinstance(details, str):
process_output(driver, output_id, details, component_id, cwl_entity['class'], step_lookup)
else:
if 'outputSource' in details:
output_source = details['outputSource']
process_output(driver, output_id, output['type'], component_id, cwl_entity['class'], step_lookup, output_source)
else:
process_output(driver, output_id, details['type'], component_id, cwl_entity['class'], step_lookup)
def process_cwl_steps(driver: Driver, cwl_entity: dict, step_lookup) -> None: def process_cwl_steps(driver: Driver, cwl_entity: dict, step_lookup) -> None:
""" """
...@@ -110,7 +125,14 @@ def process_cwl_steps(driver: Driver, cwl_entity: dict, step_lookup) -> None: ...@@ -110,7 +125,14 @@ def process_cwl_steps(driver: Driver, cwl_entity: dict, step_lookup) -> None:
for step in cwl_entity['steps']: for step in cwl_entity['steps']:
# Get the resolved path of the step from the step_lookup # Get the resolved path of the step from the step_lookup
step_path = step_lookup[step['id']] step_path: str = step_lookup[step['id']]
if not isinstance(step['run'], str):
run_dict = step['run']
run_dict['path'] = step_path
print(f"processing {step_path}")
process_cwl_entity(driver, run_dict)
continue
# Process the list of inputs of the step # Process the list of inputs of the step
for input in step['in']: for input in step['in']:
...@@ -137,8 +159,9 @@ def process_cwl_steps(driver: Driver, cwl_entity: dict, step_lookup) -> None: ...@@ -137,8 +159,9 @@ def process_cwl_steps(driver: Driver, cwl_entity: dict, step_lookup) -> None:
if ref[0] == "parameter": if ref[0] == "parameter":
# Retrieve the source of the referenced input parameter # Retrieve the source of the referenced input parameter
source = get_input_source(step['in'], ref[1]) source = get_input_source(step['in'], ref[1])
else:
# The reference already mentions the source (output of a step) if not source:
# The reference already mentions the source (output of a step or workflow input)
source = ref[1] source = ref[1]
if source: if source:
# Create control dependencies from the in-parameters of the step to the source of the reference # Create control dependencies from the in-parameters of the step to the source of the reference
......
...@@ -4,6 +4,7 @@ from graph_creation.docker_parsing import parse_all_dockerfiles ...@@ -4,6 +4,7 @@ from graph_creation.docker_parsing import parse_all_dockerfiles
from graph_creation.utils import process_step_lookup from graph_creation.utils import process_step_lookup
from graph_creation.cwl_processing import process_cwl_commandline, process_cwl_inputs, process_cwl_outputs, process_cwl_steps from graph_creation.cwl_processing import process_cwl_commandline, process_cwl_inputs, process_cwl_outputs, process_cwl_steps
from neo4j_graph_queries.utils import get_is_workflow from neo4j_graph_queries.utils import get_is_workflow
import pprint
def process_repos(repo_list: list[str], driver: Driver) -> None: def process_repos(repo_list: list[str], driver: Driver) -> None:
""" """
......
...@@ -2,7 +2,7 @@ from pathlib import Path ...@@ -2,7 +2,7 @@ from pathlib import Path
from neo4j import Driver from neo4j import Driver
import re import re
from neo4j_graph_queries.create_node_queries import ensure_component_node, ensure_in_parameter_node, ensure_out_parameter_node from neo4j_graph_queries.create_node_queries import ensure_component_node, ensure_in_parameter_node, ensure_out_parameter_node
from neo4j_graph_queries.create_edge_queries import create_control_relationship, create_data_relationship, create_in_param_relationship from neo4j_graph_queries.create_edge_queries import create_control_relationship, create_data_relationship, create_in_param_relationship, create_out_param_relationship
from neo4j_graph_queries.processing_queries import get_all_in_parameter_nodes_of_entity from neo4j_graph_queries.processing_queries import get_all_in_parameter_nodes_of_entity
GITLAB_ASTRON ='https://git.astron.nl' GITLAB_ASTRON ='https://git.astron.nl'
...@@ -19,14 +19,20 @@ def process_step_lookup(cwl_entity: dict) -> dict[str, str]: ...@@ -19,14 +19,20 @@ def process_step_lookup(cwl_entity: dict) -> dict[str, str]:
dict: A dictionary where each key is the ID of the step in the context of the workflow, and the value is the resolved file path of the step dict: A dictionary where each key is the ID of the step in the context of the workflow, and the value is the resolved file path of the step
""" """
step_lookup = {} step_lookup = {}
for step in cwl_entity['steps']:
# Retrieve the directory containing the workflow file # Retrieve the directory containing the workflow file
workflow_folder = Path(cwl_entity['path']).parent workflow_folder = Path(cwl_entity['path']).parent
while ".cwl" in str(workflow_folder):
workflow_folder = workflow_folder.parent
for step in cwl_entity['steps']:
# Resolve the full path of the step file by combining the workflow folder and the step's 'run' path # Resolve the full path of the step file by combining the workflow folder and the step's 'run' path
if isinstance(step['run'], str):
full_step_path = workflow_folder / Path(step['run']) full_step_path = workflow_folder / Path(step['run'])
step_path = resolve_relative_path(full_step_path)
else:
step_path = Path(cwl_entity['path']) / step['id']
# Resolve the path (deal with "./" and "../") # Resolve the path (deal with "./" and "../")
step_path = str(resolve_relative_path(full_step_path)) step_lookup[step['id']] = str(step_path)
step_lookup[step['id']] = step_path
return step_lookup return step_lookup
def process_in_param(driver: Driver, param_id: str, component_id: str, param_type: str, component_type: str) -> None: def process_in_param(driver: Driver, param_id: str, component_id: str, param_type: str, component_type: str) -> None:
...@@ -43,10 +49,8 @@ def process_in_param(driver: Driver, param_id: str, component_id: str, param_typ ...@@ -43,10 +49,8 @@ def process_in_param(driver: Driver, param_id: str, component_id: str, param_typ
Returns: Returns:
None None
""" """
param_node = ensure_in_parameter_node(driver, param_id, component_id, param_type, component_type) param_node = ensure_in_parameter_node(driver, param_id, component_id, param_type, component_type)
if component_type != "Workflow": if component_type != "Workflow":
ensure_component_node(driver, component_id)
create_in_param_relationship(driver, component_id, param_node[0], param_node[1]) create_in_param_relationship(driver, component_id, param_node[0], param_node[1])
def process_parameter_source(driver: Driver, param_node_internal_id: int, source_id: str, workflow_id: str, step_lookup: dict, step_id: str = "") -> None: def process_parameter_source(driver: Driver, param_node_internal_id: int, source_id: str, workflow_id: str, step_lookup: dict, step_id: str = "") -> None:
...@@ -189,3 +193,26 @@ def get_input_source(inputs: list[dict], input_id: str) -> str | None: ...@@ -189,3 +193,26 @@ def get_input_source(inputs: list[dict], input_id: str) -> str | None:
if inp["id"] == input_id: if inp["id"] == input_id:
return inp.get("source") # returns None if 'source' doesn't exist return inp.get("source") # returns None if 'source' doesn't exist
return None # returns None if input_id is not found return None # returns None if input_id is not found
def process_output(driver, output_id, output_type, component_id, component_type, step_lookup, output_source = None):
# Create out-parameter node with the parameter ID as defined in the component
# and component ID equal to the path of the componet
out_param_node = ensure_out_parameter_node(driver, output_id, component_id, output_type, component_type)
out_param_node_internal_id = out_param_node[0]
# If it's not a workflow, create a relationship between the component and the output parameter
if component_type != "Workflow":
create_out_param_relationship(driver, component_id, out_param_node_internal_id, output_id)
# If the output has an 'outputSource', process the relationship(s) to the source(s)
if output_source:
# The output source can be a singular ID or a list of IDs
if isinstance(output_source, str):
source_id = output_source
print(source_id)
process_parameter_source(driver, out_param_node_internal_id, source_id, component_id, step_lookup)
elif isinstance(output_source, list):
for source_id in output_source:
print(source_id)
process_parameter_source(driver, out_param_node_internal_id, source_id, component_id, step_lookup)
...@@ -62,7 +62,6 @@ class FlowCalculation: ...@@ -62,7 +62,6 @@ class FlowCalculation:
paths: dict[str, dict[str, list]] = {} paths: dict[str, dict[str, list]] = {}
workflow_ids = sorted_components workflow_ids = sorted_components
for workflow in workflow_ids: for workflow in workflow_ids:
if "example" not in workflow:
print(f"Preprocessing: {workflow}") print(f"Preprocessing: {workflow}")
self.bsf_traverse_paths_change_impact(session, workflow, bookkeeping, paths) self.bsf_traverse_paths_change_impact(session, workflow, bookkeeping, paths)
with open("flow_paths.json", "w") as json_file: with open("flow_paths.json", "w") as json_file:
...@@ -201,7 +200,8 @@ class FlowCalculation: ...@@ -201,7 +200,8 @@ class FlowCalculation:
self.process_direct_indirect_flow_of_node_id(node_id, component_id, outer_workflows, component_stack, self.process_direct_indirect_flow_of_node_id(node_id, component_id, outer_workflows, component_stack,
step_stack, bookkeeping, paths, direct=True) step_stack, bookkeeping, paths, direct=True)
# Increment depth as we move deeper into the traversal # Increment depth as we move deeper into the traversal, unless we just entered a workflow
if component_type != "Workflow":
depth = depth + 1 depth = depth + 1
# If the stack is empty, return early # If the stack is empty, return early
......
...@@ -29,13 +29,14 @@ class SubgraphPreprocessing: ...@@ -29,13 +29,14 @@ class SubgraphPreprocessing:
for workflow in outer_workflow_ids: for workflow in outer_workflow_ids:
print(f"Preprocessing: {workflow}") print(f"Preprocessing: {workflow}")
self.traverse_graph_process_paths(session, workflow, bookkeeping) self.traverse_graph_process_paths(session, workflow, bookkeeping)
control_pairs = get_nodes_with_control_edges(session) control_tuples = get_nodes_with_control_edges(session)
for pair in control_pairs: for tuple in control_tuples:
target_id = pair["targetId"] target_id = tuple["targetId"]
edge_component_id = pair["componentId"] edge_component_id = tuple["componentId"]
edge_id = pair["edgeId"] edge_id = tuple["edgeId"]
result = get_workflow_list_of_data_edges_from_node(session, target_id, edge_component_id) result = get_workflow_list_of_data_edges_from_node(session, target_id, edge_component_id)
workflow_lists = [record["workflow_list"] for record in result] workflow_lists = [record["workflow_list"] for record in result]
if len(workflow_lists) > 0:
update_workflow_list_of_edge(session, edge_id, workflow_lists[0]) update_workflow_list_of_edge(session, edge_id, workflow_lists[0])
......
...@@ -41,8 +41,6 @@ def create_out_param_relationship(driver: Driver, prefixed_component_id: str, pa ...@@ -41,8 +41,6 @@ def create_out_param_relationship(driver: Driver, prefixed_component_id: str, pa
""" """
component_id = clean_component_id(prefixed_component_id) component_id = clean_component_id(prefixed_component_id)
component_node_id = ensure_component_node(driver, component_id)[0] component_node_id = ensure_component_node(driver, component_id)[0]
print(parameter_internal_id)
print(component_node_id)
return create_data_relationship(driver, component_node_id, parameter_internal_id, component_id, parameter_id) return create_data_relationship(driver, component_node_id, parameter_internal_id, component_id, parameter_id)
......
import json import json
from pathlib import Path
from neo4j import Driver from neo4j import Driver
from neo4j_graph_queries.utils import clean_component_id from neo4j_graph_queries.utils import clean_component_id
...@@ -16,12 +17,14 @@ def ensure_component_node(driver: Driver, prefixed_component_id: str) -> tuple[i ...@@ -16,12 +17,14 @@ def ensure_component_node(driver: Driver, prefixed_component_id: str) -> tuple[i
tuple[int,str]: the Neoj4 internal ID of the component node, the component ID of the component tuple[int,str]: the Neoj4 internal ID of the component node, the component ID of the component
""" """
component_id = clean_component_id(prefixed_component_id) component_id = clean_component_id(prefixed_component_id)
nice_id = Path(component_id).stem
query = """ query = """
MERGE (c:Component {component_id: $component_id}) MERGE (c:Component {component_id: $component_id})
SET c.nice_id = $nice_id
RETURN elementId(c) AS node_internal_id, c.component_id AS component_id RETURN elementId(c) AS node_internal_id, c.component_id AS component_id
""" """
with driver.session() as session: with driver.session() as session:
result = session.run(query, component_id=component_id) result = session.run(query, component_id=component_id, nice_id=nice_id)
record = result.single() record = result.single()
return record["node_internal_id"], record["component_id"] return record["node_internal_id"], record["component_id"]
...@@ -126,30 +129,3 @@ def ensure_out_parameter_node(driver: Driver, parameter_id: str, prefixed_compon ...@@ -126,30 +129,3 @@ def ensure_out_parameter_node(driver: Driver, parameter_id: str, prefixed_compon
result = session.run(query, parameter_id=parameter_id, component_id=component_id, param_type=new_param_type, component_type=component_type) result = session.run(query, parameter_id=parameter_id, component_id=component_id, param_type=new_param_type, component_type=component_type)
record = result.single() record = result.single()
return record["node_id"], record["parameter_id"], record["component_id"] return record["node_id"], record["parameter_id"], record["component_id"]
\ No newline at end of file
def ensure_data_node(driver: Driver, node_id: str, prefixed_component_id: str) -> tuple[int,str,str]:
"""
Ensures that there exists a data node with ID node_id
associated with the component in the file with local path prefixed_component_id.
The ID of the component can be given based on the local relative path, so it is cleaned
before querying Neo4j.
Parameters:
driver (Driver): the Neo4j driver
node_id (str): the ID of the data
prefixed_component_id (str): the local relative path of the component
Returns:
tuple[int,str,str]: the Neoj4 internal ID of the data node, the data ID, the component ID
"""
component_id = clean_component_id(prefixed_component_id)
query = """
MERGE (n:Data {data_id: $node_id, component_id: $component_id})
RETURN elementId(n) AS node_internal_id, n.data_id AS id_property, n.component_id AS component_id_property
"""
with driver.session() as session:
result = session.run(query, node_id=node_id, component_id=component_id)
record = result.single()
return record["node_internal_id"], record["id_property"], record["component_id_property"]
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment