Skip to content
Snippets Groups Projects
Commit c81bc284 authored by Chiara Liotta's avatar Chiara Liotta
Browse files

remove component nodes for wfs and intermediate data nodes

parent f9c046d3
Branches
No related tags found
No related merge requests found
from pathlib import Path from pathlib import Path
from cwl_utils.parser import save import ruamel.yaml
from cwl_utils.parser.cwl_v1_2_utils import load_inputfile import chardet
def get_cwl_from_repo(repo_path: str) -> list[dict]: from neo4j_queries.utils import get_is_workflow
def get_cwl_from_repo(repo_path: str) -> tuple[list[dict],list[dict]]:
""" """
Given the path of a local repository, it processes all the CWL files in the repository. Processes all CWL (Common Workflow Language) files in a given repository, categorizing them into workflows and tools.
Each CWL file is parsed into a dictionary using the cwl_utils library.
The path is saved using the key 'path' with value equal to the relative path of the CWL file.
Parameters: Parameters:
repo_path (str): the path of the local repository repo_path (str): The path to the local repository containing CWL files.
Returns: Returns:
list[dict]: a list of dictonaries, each dictionary is a parsed CWL file tuple[list[dict], list[dict]]:
- The first list contains dictionaries representing parsed CWL workflow files.
- The second list contains dictionaries representing parsed CWL tool files.
""" """
cwl_entities = [] cwl_workflow_entities = []
cwl_tool_entities = []
# Recursively find all CWL files in the repository
pathlist = Path(repo_path).glob('**/*.cwl') pathlist = Path(repo_path).glob('**/*.cwl')
for path in pathlist: for path in pathlist:
path_in_str = str(path) path_in_str = str(path)
# Parse CWL file
cwl_obj = load_inputfile(path_in_str) # Detect file encoding to handle non-UTF-8 encoded files
# Save parsed file into a dictionary with open(path, 'rb') as file:
saved_obj = save(cwl_obj, relative_uris=True) raw_data = file.read()
# Save the path of the CWL file result = chardet.detect(raw_data)
saved_obj['path'] = path_in_str encoding = result['encoding']
cwl_entities.append(saved_obj)
# Open the file using the detected encoding and parse it as YAML
return cwl_entities with open(path, "r", encoding=encoding) as file:
\ No newline at end of file yaml = ruamel.yaml.YAML()
yaml_dict = yaml.load(file)
# Add the file path to the dictionary for reference
yaml_dict['path'] = path_in_str
# Categorize the file based on its 'class' field
if get_is_workflow(yaml_dict['class']):
cwl_workflow_entities.append(yaml_dict)
else:
cwl_tool_entities.append(yaml_dict)
return cwl_workflow_entities, cwl_tool_entities
\ No newline at end of file
from neo4j import Driver from neo4j import Driver
from graph_creation.cst_processing import traverse_and_create, traverse_when_statement_extract_dependencies from graph_creation.cst_processing import traverse_when_statement_extract_dependencies
from graph_creation.utils import create_input_nodes_and_relationships, process_source_relationship, resolve_relative_path from graph_creation.utils import process_in_param, process_parameter_source
from neo4j_queries.node_queries import ensure_component_node, ensure_data_node, ensure_in_parameter_node, ensure_out_parameter_node, get_wf_data_nodes_from_step_in_param from neo4j_queries.node_queries import ensure_component_node, ensure_in_parameter_node, ensure_out_parameter_node
from neo4j_queries.edge_queries import create_control_relationship, create_data_relationship, create_out_param_relationship from neo4j_queries.edge_queries import create_control_relationship, create_data_relationship, create_out_param_relationship
from pathlib import Path
from neo4j_queries.utils import get_is_workflow
from parsers.javascript_parsing import parse_javascript_expression_string, parse_javascript_string from parsers.javascript_parsing import parse_javascript_expression_string, parse_javascript_string
# TODO: deal with inputBindings # TODO: deal with inputBindings
def process_cwl_inputs(driver: Driver, cwl_entity: dict) -> None: def process_cwl_inputs(driver: Driver, cwl_entity: dict) -> None:
""" """
Processes the inputs of a CWL component (Workflow, CommandLineTool, or ExpressionTool) Processes the inputs of a CWL entity, either as a list or a dictionary of inputs,
For each input the following nodes and edges are created: and processes each input parameter by calling `process_in_param`.
- an in-parameter node with the parameter ID as defined in the component and component ID equal to the path of the componet
- a data node with component ID of the component and data ID equal to the parameter ID
- a data edge from the component node to the in-parameter node
- a data edge from the data node to the the in-parameter node
Parameters: Parameters:
driver (Driver): the driver used to connect to Neo4j driver (Driver): The Neo4j driver used to execute queries.
cwl_entity (dict): the dictionary containing the parsed contents of the CWL component cwl_entity (dict): A dictionary representing a CWL entity, which includes an 'inputs' key containing
either a list or dictionary of input parameters.
Returns:
None
""" """
component_id = cwl_entity['path'] component_id = cwl_entity['path']
# Inputs can be defined a list or a dictionary is_workflow = get_is_workflow(cwl_entity)
if type(cwl_entity['inputs']) == list: # Process the inputs based on their type (list or dictionary)
# List of dictionaries if isinstance(cwl_entity['inputs'], list):
# each element is identifiable via the key 'id' # If 'inputs' is a list, iterate over each input (which is expected to be a dictionary)
for input in cwl_entity['inputs']: for input in cwl_entity['inputs']:
if type(input) == dict: if isinstance(input, dict):
create_input_nodes_and_relationships(driver, input['id'], component_id) process_in_param(driver, input['id'], component_id, is_workflow)
elif type(cwl_entity['inputs']) == dict: elif isinstance(cwl_entity['inputs'], dict):
# Dictionary where each key is the ID of the input # If 'inputs' is a dictionary, iterate over the keys (which are the input IDs)
# the value is a dictionary containing other properties
for key in cwl_entity['inputs'].keys(): for key in cwl_entity['inputs'].keys():
create_input_nodes_and_relationships(driver, key, component_id) process_in_param(driver, key, component_id, is_workflow)
# TODO: deal with outputBindings # TODO: deal with outputBindings
def process_cwl_outputs(driver: Driver, cwl_entity: dict) -> None: def process_cwl_outputs(driver: Driver, cwl_entity: dict, step_lookup) -> None:
""" """
Processes the outputs of a CWL component (Workflow, CommandLineTool, or ExpressionTool) Processes the output parameters of a CWL entity by creating the necessary nodes
For each output the following nodes and edges are created: and relationships for each output parameter in a graph or database. The function handles both singular and
- an out-parameter node with the parameter ID as defined in the component and component ID equal to the path of the componet list-based output sources, ensuring that each output is linked to its corresponding source or sources.
- a data node with component ID of the component and data ID equal to output source defined in the component
- a data edge from the out-parameter node to the component node For each output in the CWL entity:
- a data edge from the out-parameter node to the data node - An out-parameter node is created for the output.
- If the CWL entity is not a workflow, a relationship is created between the component node and the output parameter.
- If the output contains an 'outputSource', the function processes the relationship between the output
parameter and its source(s). The 'outputSource' can either be a single source ID or a list of source IDs.
Parameters: Parameters:
driver (Driver): the driver used to connect to Neo4j driver (Driver): The Neo4j driver used to execute queries
cwl_entity (dict): the dictionary containing the parsed contents of the CWL component cwl_entity (dict): A dictionary representing a CWL entity, which includes:
- 'path' (str): The path to the CWL file, used as the component ID.
- 'outputs' (list): A list of output parameters. Each output is a dictionary containing:
- 'id' (str): The unique identifier of the output parameter.
- 'outputSource' (str or list of str): The source(s) for the output parameter, which can be a single
source ID or a list of source IDs.
step_lookup (dict): A dictionary that maps step IDs to their corresponding resolved paths. This is used to
resolve the source ID(s) in the 'outputSource' field to their correct locations.
Returns:
None
""" """
component_id = cwl_entity['path'] component_id = cwl_entity['path']
for output in cwl_entity['outputs']: for output in cwl_entity['outputs']:
if type(output) == dict: if isinstance(output, dict):
# Create out-parameter node with the parameter ID as defined in the component # Create out-parameter node with the parameter ID as defined in the component
# and component ID equal to the path of the componet # and component ID equal to the path of the componet
param_node = ensure_out_parameter_node(driver, output['id'], component_id) out_param_node = ensure_out_parameter_node(driver, output['id'], component_id)
param_node_internal_id = param_node[0] out_param_node_internal_id = out_param_node[0]
# Create out-parameter node with the parameter ID as defined in the component
# and component ID equal to the path of the componet # If it's not a workflow, create a relationship between the component and the output parameter
create_out_param_relationship(driver, component_id, param_node_internal_id) is_worflow = get_is_workflow(cwl_entity)
# Create a data node with component ID of the component and data ID equal to output source defined in the component if not is_worflow:
# and a data edge from the out-parameter node to the data node create_out_param_relationship(driver, component_id, out_param_node_internal_id)
# If the output has an 'outputSource', process the relationship(s) to the source(s)
if 'outputSource' in output: if 'outputSource' in output:
# the output source can be a singular ID or a list of IDs # The output source can be a singular ID or a list of IDs
if type(output['outputSource']) == str: if isinstance(output['outputSource'], str):
process_source_relationship(driver, output['outputSource'], component_id, param_node_internal_id) source_id = output['outputSource']
elif type(output['outputSource']) == list: process_parameter_source(driver, out_param_node_internal_id, source_id, component_id, step_lookup)
elif isinstance(output['outputSource'], list):
for source_id in output['outputSource']: for source_id in output['outputSource']:
process_source_relationship(driver, source_id, component_id, param_node_internal_id) process_parameter_source(driver, out_param_node_internal_id, source_id, component_id, step_lookup)
def process_cwl_steps(driver: Driver, cwl_entity: dict) -> None: def process_cwl_steps(driver: Driver, cwl_entity: dict, tool_paths: list[str], step_lookup) -> None:
""" """
Processes the steps of a CWL Workflow component (which we will refer to as outer workflow component). Processes the steps of a CWL entity, creating necessary nodes and relationships
A step can be a Workflow, CommandLineTool or ExpressionTool. for each step. The function handles the inputs, outputs, and control dependencies associated with each step
For each step, a component node is created with component ID equal to the path of the step. in the workflow
Then, the lists of inputs and outputs are processed.
For each step in the CWL entity:
- For each input, the following nodes and edges are created: - A component node is created for the step if it corresponds to a tool (identified via tool_paths)
- in-parameter node with ID as defined in the component and component ID equal to the path of the step - The inputs are processed by creating in-parameter nodes and establishing relationships with the step
- a data edge from the step component node to the in-parameter node - The "when" field (control dependencies) is processed by extracting the dependent parameters or outputs
- potentially a data node corresponding to the source of the input, with ID equal to the source ID defined in the outer workflow and creating control relationships
and component ID equal to the path of the outer workflow - The outputs are processed by creating out-parameter nodes and establishing relationships with the step
- potentially a data edge from the in-parameter node to the data node of the source
- If the step has a "when" field, then the JS expression is parsed and its dependencies are extracted.
- The step is control dependent on data node x with component_id equal to the outer workflow id if:
- the when expression mentions a step parameter which is data dependent on x
- the when expression mentions the data_id of x
- A control edge is created from the step component node to the data node x.
- For each output, the following nodes and edges are created:
- out-parameter node with ID as defined in the component and component ID equal to the path of the step
- a data edge from the out-parameter node to the step component node
- a data node representing the outer-workflow-level output, with ID equal to [step id]/[output id as defined in workflow]
and component ID equal to the path of the outer workflow
- a data edge from the out-parameter node to the data node
Parameters: Parameters:
driver (Driver): the driver used to connect to Neo4j driver (Driver): The Neo4j driver used to execute queries
cwl_entity (dict): the dictionary containing the parsed contents of the CWL component cwl_entity (dict): A dictionary representing a CWL entity, which includes:
- 'path' (str): The path to the CWL file, used as the component ID.
- 'steps' (list): A list of steps in the workflow, each step being a dictionary containing:
- 'id' (str): The unique identifier for the step.
- 'in' (list): A list of inputs for the step.
- 'out' (list): A list of outputs for the step.
- 'when' (str or dict): A conditional expression controlling the execution of the step.
tool_paths (list[str]): A list of paths that correspond to tool steps. These paths are used to determine
whether a step corresponds to a tool or not.
step_lookup (dict): A dictionary that maps step IDs to their resolved paths. This is used to resolve
the actual paths of steps when processing their inputs, outputs, and control dependencies.
Returns:
None
""" """
component_id = cwl_entity['path']
for step in cwl_entity['steps']: for step in cwl_entity['steps']:
# Retrieve path of the step # Get the resolved path of the step from the step_lookup
workflow_folder = Path(cwl_entity['path']).parent step_path = step_lookup[step['id']]
full_step_path = workflow_folder / Path(step['run'])
step_path = str(resolve_relative_path(full_step_path)) is_tool = step_path in tool_paths
# Create the step component node with ID equal to the step # Create the step component node if it's a tool
if step_path in tool_paths:
is_tool = True
s_node = ensure_component_node(driver, step_path) s_node = ensure_component_node(driver, step_path)
s_node_internal_id = s_node[0] s_node_internal_id = s_node[0]
# Process the list of inputs of the step # Process the list of inputs of the step
for input in step['in']: for input in step['in']:
process_in_param(driver, input['id'], step_path, not is_tool)
# Create in-parameter node with ID as defined in the component and component ID equal to the path of the step # Create in-parameter node with ID as defined in the component and component ID equal to the path of the step
param_node = ensure_in_parameter_node(driver, input['id'], step_path) param_node = ensure_in_parameter_node(driver, input['id'], step_path)
param_node_internal_id = param_node[0] param_node_internal_id = param_node[0]
if is_tool:
# Create a data edge from the step component node to the in-parameter node # Create a data edge from the step component node to the in-parameter node
create_data_relationship(driver, s_node_internal_id, param_node_internal_id) create_data_relationship(driver, s_node_internal_id, param_node_internal_id)
# Inputs can have one or multiple data sources (data nodes) # Inputs can have one or multiple data sources (data nodes)
# A data edge is drawn from the in-parameter node to the data node of the source
if 'source' in input: if 'source' in input:
if type(input['source']) == str: if isinstance(input['source'], str):
source_id = input['source'] source_id = input['source']
process_source_relationship(driver, source_id, cwl_entity['path'], param_node_internal_id) process_parameter_source(driver, param_node_internal_id, source_id, component_id, step_lookup)
elif type(input['source']) == list: elif isinstance(input['source'], list):
for source_id in input['source']: for source_id in input['source']:
process_source_relationship(driver, source_id, cwl_entity['path'], param_node_internal_id) process_parameter_source(driver, param_node_internal_id, source_id, component_id, step_lookup)
# Process the "when" field, aka control dependencies # Process the "when" field, aka control dependencies
if 'when' in step: if 'when' in step:
...@@ -141,9 +161,9 @@ def process_cwl_steps(driver: Driver, cwl_entity: dict) -> None: ...@@ -141,9 +161,9 @@ def process_cwl_steps(driver: Driver, cwl_entity: dict) -> None:
if ref[0] == "parameter": if ref[0] == "parameter":
input_data = ensure_in_parameter_node(driver, ref_id, step_path)[0] input_data = ensure_in_parameter_node(driver, ref_id, step_path)[0]
nodes.append(input_data) nodes.append(input_data)
elif ref[0] == "step_output": # elif ref[0] == "step_output":
step_output = ensure_data_node(driver, ref_id, cwl_entity['path'])[0] # step_output = ensure_out_parameter_node(driver, ref_id, cwl_entity['path'])[0]
nodes.append(step_output) # nodes.append(step_output)
for node in nodes: for node in nodes:
create_control_relationship(driver, s_node_internal_id, node, cwl_entity['path']) create_control_relationship(driver, s_node_internal_id, node, cwl_entity['path'])
...@@ -151,21 +171,17 @@ def process_cwl_steps(driver: Driver, cwl_entity: dict) -> None: ...@@ -151,21 +171,17 @@ def process_cwl_steps(driver: Driver, cwl_entity: dict) -> None:
# Process the list of outputs of the step # Process the list of outputs of the step
for output in step['out']: for output in step['out']:
# An output can be defined as a dictionary or simply as a string (ID only) # An output can be defined as a dictionary or simply as a string (ID only)
if type(output) == dict: if isinstance(output, dict):
output_id = output['id'] output_id = output['id']
else: else:
output_id = output output_id = output
# Create out-parameter node with ID as defined in the component and component ID equal to the path of the step # Create out-parameter node with ID as defined in the component and component ID equal to the path of the step
param_node = ensure_out_parameter_node(driver, output_id, step_path) param_node = ensure_out_parameter_node(driver, output_id, step_path)
param_node_internal_id = param_node[0] param_node_internal_id = param_node[0]
if is_tool:
# Create a data edge from out-parameter node to the step component node # Create a data edge from out-parameter node to the step component node
create_data_relationship(driver, param_node_internal_id, s_node_internal_id) create_data_relationship(driver, param_node_internal_id, s_node_internal_id)
# Create data node with id equal to step_id/output_id and component ID equal to the path of the outer workflow
outer_output_id = f"{step['id']}/{output_id}"
data_node = ensure_data_node(driver, outer_output_id, cwl_entity['path'])
data_node_internal_id = data_node[0]
# Create a data edge from the data node to the out-parameter node
create_data_relationship(driver, data_node_internal_id, param_node_internal_id)
def process_cwl_expression(driver: Driver, entity: dict) -> None: def process_cwl_expression(driver: Driver, entity: dict) -> None:
expression = entity['expression'] expression = entity['expression']
......
from neo4j import Driver from neo4j import Driver
from graph_creation.cwl_parsing import get_cwl_from_repo from graph_creation.cwl_parsing import get_cwl_from_repo
from graph_creation.cwl_processing import process_cwl_expression, process_cwl_inputs, process_cwl_outputs, process_cwl_steps from graph_creation.utils import process_step_lookup
from neo4j_queries.edge_queries import simplify_data_and_control_edges from graph_creation.cwl_processing import process_cwl_inputs, process_cwl_outputs, process_cwl_steps
from neo4j_queries.node_queries import ensure_component_node from neo4j_queries.node_queries import ensure_component_node
from neo4j_queries.utils import get_is_workflow
def process_repos(repo_list: list[str], driver: Driver) -> None: def process_repos(repo_list: list[str], driver: Driver) -> None:
""" """
Given a list of paths to local repositories and a Neo4j driver, Processes a list of local repository paths containing CWL (Common Workflow Language) files,
the function parses the CWL files and turns them into a Neo4j dependency graph. parsing each CWL file and creating the corresponding nodes and relationships in a Neo4j graph.
The function extracts workflows and tools from each repository, processes the inputs, outputs, and
steps for each entity, and links them into a dependency graph. The Neo4j driver is used to interact
with the database, creating nodes and relationships based on the parsed CWL data.
Parameters: Parameters:
repo_list (list[str]): a list of paths to local repositories repo_list (list[str]): A list of paths to local repositories. Each repository contains CWL files
driver (Driver): a Neo4j driver that define workflows and tools
driver (Driver): A Neo4j driver used to interact with the database
Returns:
None
""" """
cwl_entities = {}
for repo in repo_list: for repo in repo_list:
# Parse CWL files # Parse CWL files of current repo
cwl_entities[repo]= get_cwl_from_repo(repo) workflows, tools = get_cwl_from_repo(repo)
for entity in cwl_entities[repo]:
component_id = entity['path'] # Extract tool paths for step processing later
ensure_component_node(driver, component_id) tool_paths = [item["path"] for item in tools]
# Combine workflows and tools into one list of entities to process
all_entities = workflows + tools
for entity in all_entities:
print(f'Processing: {entity["path"]}')
is_workflow = get_is_workflow(entity)
steps = None
if not is_workflow:
ensure_component_node(driver, entity['path'])
else:
steps = process_step_lookup(entity)
process_cwl_inputs(driver, entity) process_cwl_inputs(driver, entity)
process_cwl_outputs(driver, entity) process_cwl_outputs(driver, entity, steps)
if entity['class'] == 'Workflow': if steps:
process_cwl_steps(driver, entity) process_cwl_steps(driver, entity, tool_paths, steps)
# elif entity['class'] == 'ExpressionTool': # elif entity['class'] == 'ExpressionTool':
# process_cwl_expression(driver, entity) # process_cwl_expression(driver, entity)
\ No newline at end of file
simplify_data_and_control_edges(driver)
from pathlib import Path from pathlib import Path
from neo4j import Driver from neo4j import Driver
from neo4j_queries.node_queries import ensure_data_node, ensure_in_parameter_node from neo4j_queries.node_queries import ensure_in_parameter_node, ensure_out_parameter_node
from neo4j_queries.edge_queries import create_data_relationship, create_in_param_relationship from neo4j_queries.edge_queries import create_data_relationship_with_id, create_in_param_relationship
def create_input_nodes_and_relationships(driver: Driver, input_id: str, component_id: str) -> None: def process_step_lookup(cwl_entity: dict) -> dict:
""" """
Processes a single input tied to a specific CWL component. Processes the steps in a CWL entity to create a lookup dictionary mapping step IDs to their resolved file paths.
The following nodes and edges are created:
- an in-parameter node with the parameter ID as defined in the component and component ID equal to the path of the componet
- a data node with component ID of the component and data ID equal to the parameter ID
- a data edge from the component node to the in-parameter node
- a data edge from the data node to the the in-parameter node
Parameters: Parameters:
driver (Driver): the driver used to connect to Neo4j cwl_entity (dict): A dictionary representing a CWL entity, which includes a 'steps' key containing
input_id (str): the ID of the input as defined in the CWL component the steps of the workflow and a 'path' key with the path to the workflow file
component_id (str): the unique ID of the CWL component (its path)
Returns:
dict: A dictionary where each key is the ID of the step in the context of the workflow, and the value is the resolved file path of the step
""" """
# Create in-parameter with the parameter ID as defined in the component and component ID equal to the path of the componet step_lookup = {}
param_node = ensure_in_parameter_node(driver, input_id, component_id) for step in cwl_entity['steps']:
param_node_internal_id = param_node[0] # Retrieve the directory containing the workflow file
# Create a data edge from the component node to the in-parameter node workflow_folder = Path(cwl_entity['path']).parent
create_in_param_relationship(driver, component_id, param_node_internal_id) # Resolve the full path of the step file by combining the workflow folder and the step's 'run' path
# Create a data node with component ID of the component and data ID equal to the parameter ID full_step_path = workflow_folder / Path(step['run'])
data_node = ensure_data_node(driver, input_id, component_id) # Resolve the path (deal with "./" and "../")
data_node_internal_id = data_node[0] step_path = str(resolve_relative_path(full_step_path))
# Create a data edge from the data node to the the in-parameter node step_lookup[step['id']] = step_path
create_data_relationship(driver, data_node_internal_id, param_node_internal_id) return step_lookup
def process_source_relationship(driver: Driver, source_id: str, component_id: str, param_node_internal_id: int) -> None: def process_in_param(driver: Driver, param_id: str, component_id: str, is_workflow: bool):
""" """
Processes a source relationship between a data node and a parameter node. Processes an input parameter by ensuring its node exists and optionally creating a relationship
The data node does not need to exist already, while the parameter node must have already been created. between the component and the parameter node.
The following nodes and edges are created:
- a data node with ID equal to source_id and component ID equal to the path of the component it belongs to
- a data edge from the parameter node to the data node
Parameters: Parameters:
driver (Driver): the driver used to connect to Neo4j driver: The database or graph driver used to execute queries
source_id (str): the ID of the data that functions as a source for the parameter param_id (str): The unique identifier of the input parameter
component_id (str): the unique ID of the CWL component (its path) component_id (str): The ID of the component to which the parameter belongs
param_node_internal_id (int): the unique ID of the parameter node as defined internally by Neo4j is_workflow (bool): Indicates if the component is a workflow. If True, no relationship is created
Returns:
None
"""
param_node = ensure_in_parameter_node(driver, param_id, component_id)
if not is_workflow:
create_in_param_relationship(driver, component_id, param_node[0])
def process_parameter_source(driver: Driver, param_node_internal_id: int, source_id: str, component_id: str, step_lookup: dict) -> None:
"""
Processes a parameter source by creating a data relationship between a parameter node and its source.
Parameters:
driver (Driver): The Neo4j driver used to execute queries
param_node_internal_id (int): The internal ID of the parameter node to which the relationship is being created
source_id (str): The source identifier, which can be a single identifier (in case the source is an in-param of the workflow)
or include a subcomponent (e.g., "source" or "sub_component/source")
component_id (str): The ID of the component owning the parameter node
step_lookup (dict): A mapping of subcomponent identifiers to their respective IDs within the workflow that calls them
Returns:
None
""" """
data_node = ensure_data_node(driver, source_id, component_id) # Parse the source_id to identify whether it refers to a workflow parameter or an output of a subcomponent (subcomponent/id)
data_node_internal_id = data_node[0] source_parsed = source_id.split("/")
create_data_relationship(driver, param_node_internal_id, data_node_internal_id) if len(source_parsed) == 1:
# Ensure the source exists in the parameter node and retrieve it
source_param_node = ensure_in_parameter_node(driver, source_parsed[0], component_id)[0]
else:
# If source_id refers to an output subcomponent/sourc
# Retrieve the subcomponent ID from the step_lookup dictionary
sub_component_id = step_lookup[source_parsed[0]]
# Ensure the source exists in the output parameter node for the subcomponent
source_param_node = ensure_out_parameter_node(driver, source_parsed[1], sub_component_id)[0]
# Create a relationship between the parameter node and its source
create_data_relationship_with_id(driver, param_node_internal_id, source_param_node, component_id)
def resolve_relative_path(path: Path)-> Path: def resolve_relative_path(path: Path)-> Path:
""" """
......
...@@ -27,8 +27,6 @@ def create_in_param_relationship(driver: Driver, prefixed_component_id: str, par ...@@ -27,8 +27,6 @@ def create_in_param_relationship(driver: Driver, prefixed_component_id: str, par
with driver.session() as session: with driver.session() as session:
result = session.run(query, component_id=component_id, result = session.run(query, component_id=component_id,
parameter_internal_id=parameter_internal_id) parameter_internal_id=parameter_internal_id)
record = result.single()
return record["component_id"], record["parameter_id"]
def create_out_param_relationship(driver: Driver, prefixed_component_id: str, parameter_internal_id: int) -> tuple[str,str]: def create_out_param_relationship(driver: Driver, prefixed_component_id: str, parameter_internal_id: int) -> tuple[str,str]:
""" """
...@@ -56,8 +54,6 @@ def create_out_param_relationship(driver: Driver, prefixed_component_id: str, pa ...@@ -56,8 +54,6 @@ def create_out_param_relationship(driver: Driver, prefixed_component_id: str, pa
with driver.session() as session: with driver.session() as session:
result = session.run(query, component_id=component_id, result = session.run(query, component_id=component_id,
parameter_internal_id=parameter_internal_id) parameter_internal_id=parameter_internal_id)
record = result.single()
return record["component_id"], record["parameter_id"]
def create_data_relationship(driver: Driver, from_internal_node_id: int, to_internal_node_id: int) -> tuple[int,int]: def create_data_relationship(driver: Driver, from_internal_node_id: int, to_internal_node_id: int) -> tuple[int,int]:
""" """
...@@ -85,6 +81,32 @@ def create_data_relationship(driver: Driver, from_internal_node_id: int, to_inte ...@@ -85,6 +81,32 @@ def create_data_relationship(driver: Driver, from_internal_node_id: int, to_inte
record = result.single() record = result.single()
return record["id_1"], record["id_2"] return record["id_1"], record["id_2"]
def create_data_relationship_with_id(driver: Driver, from_internal_node_id: int, to_internal_node_id: int, id: str) -> tuple[int,int]:
"""
Creates a data dependency relationship in Neo4j between the two nodes with Neo4j internal IDs given as parameters.
This relationship is an outgoing data edge from the node with internal ID from_internal_node_id
to the node with internal ID to_internal_node_id.
Parameters:
driver (Driver): the Neo4j driver
from_internal_node_id (int): the internal Neo4j ID of the first node
to_internal_node_id (int): the internal Neo4j ID of the second node
Returns:
tuple[int,int]: from_internal_node_id, to_internal_node_id
"""
query = """
MATCH (a), (b)
WHERE elementId(a) = $from_internal_node_id AND elementId(b) = $to_internal_node_id
MERGE (a)-[:DATA {component_id: $component_id}]->(b)
RETURN elementId(a) AS id_1, elementId(b) AS id_2
"""
with driver.session() as session:
result = session.run(query, from_internal_node_id=from_internal_node_id,
to_internal_node_id=to_internal_node_id, component_id= id)
record = result.single()
return record["id_1"], record["id_2"]
def create_control_relationship(driver: Driver, from_internal_node_id: int, to_internal_node_id: int, component_id: str) -> tuple[int,int]: def create_control_relationship(driver: Driver, from_internal_node_id: int, to_internal_node_id: int, component_id: str) -> tuple[int,int]:
""" """
......
...@@ -10,3 +10,15 @@ def clean_component_id(prefixed_component_id: str) -> str: ...@@ -10,3 +10,15 @@ def clean_component_id(prefixed_component_id: str) -> str:
""" """
component_id = prefixed_component_id.removeprefix("repos\\") component_id = prefixed_component_id.removeprefix("repos\\")
return component_id return component_id
def get_is_workflow(cwl_entity: dict) -> bool:
"""
Determines if a given CWL entity represents a workflow.
Parameters:
cwl_entity (dict): A dictionary representing a CWL entity, which includes a 'class' key.
Returns:
bool: True if the CWL entity is a workflow, False otherwise.
"""
return cwl_entity['class'] == 'Workflow'
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment