Select Git revision
cwl_processing.py
-
Chiara Liotta authoredChiara Liotta authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
cwl_processing.py 14.69 KiB
from pathlib import Path
import re
from neo4j import Driver
from graph_creation.utils import extract_js_expression_dependencies, get_input_source, process_control_dependencies, process_in_param, process_output, process_parameter_source, process_step_lookup
from neo4j_graph_queries.create_node_queries import ensure_git_node, ensure_in_parameter_node, ensure_out_parameter_node
from neo4j_graph_queries.create_edge_queries import create_out_param_relationship, create_references_relationship
from neo4j_graph_queries.utils import get_is_workflow
def process_cwl_entity(driver, entity):
is_workflow = get_is_workflow(entity)
steps = None
if is_workflow:
steps = process_step_lookup(entity)
process_cwl_inputs(driver, entity)
process_cwl_outputs(driver, entity, steps)
if steps:
process_cwl_steps(driver, entity, steps)
def process_cwl_inputs(driver: Driver, cwl_entity: dict) -> None:
"""
Processes the inputs of a CWL entity, either as a list or a dictionary of inputs,
and processes each input parameter by calling `process_in_param`.
Parameters:
driver (Driver): The Neo4j driver used to execute queries.
cwl_entity (dict): A dictionary representing a CWL entity, which includes an 'inputs' key containing
either a list or dictionary of input parameters.
Returns:
None
"""
component_id = cwl_entity['path']
# Process the inputs based on their type (list or dictionary)
if isinstance(cwl_entity['inputs'], list):
# If 'inputs' is a list, iterate over each input (which is expected to be a dictionary)
for input in cwl_entity['inputs']:
if isinstance(input, dict):
process_in_param(driver, input['id'], component_id, input['type'], cwl_entity['class'])
elif isinstance(cwl_entity['inputs'], dict):
# If 'inputs' is a dictionary, iterate over the keys (which are the input IDs)
input_dict = cwl_entity['inputs']
for key, value in input_dict.items():
if isinstance(value, dict):
process_in_param(driver, key, component_id, value['type'], cwl_entity['class'])
else:
process_in_param(driver, key, component_id, value, cwl_entity['class'])
def process_cwl_outputs(driver: Driver, cwl_entity: dict, step_lookup: dict) -> None:
"""
Processes the output parameters of a CWL entity by creating the necessary nodes
and relationships for each output parameter in a graph or database. The function handles both singular and
list-based output sources, ensuring that each output is linked to its corresponding source or sources.
For each output in the CWL entity:
- An out-parameter node is created for the output.
- If the output contains an 'outputSource', the function processes the relationship between the output
parameter and its source(s). The 'outputSource' can either be a single source ID or a list of source IDs.
Parameters:
driver (Driver): The Neo4j driver used to execute queries
cwl_entity (dict): A dictionary representing a CWL entity, which includes:
- 'path' (str): The path to the CWL file, used as the component ID.
- 'outputs' (list): A list of output parameters. Each output is a dictionary containing:
- 'id' (str): The unique identifier of the output parameter.
- 'outputSource' (str or list of str): The source(s) for the output parameter, which can be a single
source ID or a list of source IDs.
step_lookup (dict): A dictionary that maps step IDs to their corresponding resolved paths. This is used to
resolve the source ID(s) in the 'outputSource' field to their correct locations.
Returns:
None
"""
component_id = cwl_entity['path']
outputs = cwl_entity['outputs']
if isinstance(outputs, list):
for output in cwl_entity['outputs']:
output_id = output['id']
output_source = None
if 'outputSource' in output:
output_source = output['outputSource']
process_output(driver, output_id, output['type'], component_id, cwl_entity['class'], step_lookup, output_source)
else:
process_output(driver, output_id, output['type'], component_id, cwl_entity['class'], step_lookup, output_source)
elif isinstance(outputs, dict):
for output_id, details in outputs.items():
if isinstance(details, str):
process_output(driver, output_id, details, component_id, cwl_entity['class'], step_lookup)
else:
if 'outputSource' in details:
output_source = details['outputSource']
process_output(driver, output_id, output['type'], component_id, cwl_entity['class'], step_lookup, output_source)
else:
process_output(driver, output_id, details['type'], component_id, cwl_entity['class'], step_lookup)
def process_cwl_steps(driver: Driver, cwl_entity: dict, step_lookup) -> None:
"""
Processes the steps of a CWL entity, creating necessary nodes and relationships
for each step. The function handles the inputs, outputs, and control dependencies associated with each step
in the workflow
For each step in the CWL entity:
- The inputs are processed by creating in-parameter nodes and establishing relationships with the step
- The "when" field (control dependencies) is processed by extracting the dependent parameters or outputs
and creating control relationships
=
Parameters:
driver (Driver): The Neo4j driver used to execute queries
cwl_entity (dict): A dictionary representing a CWL entity, which includes:
- 'path' (str): The path to the CWL file, used as the component ID.
- 'steps' (list): A list of steps in the workflow, each step being a dictionary containing:
- 'id' (str): The unique identifier for the step.
- 'in' (list): A list of inputs for the step.
- 'when' (str, optional): A conditional expression controlling the execution of the step.
step_lookup (dict): A dictionary that maps step IDs to their resolved paths. This is used to resolve
the actual paths of steps when processing their inputs, outputs, and control dependencies.
Returns:
None
"""
workflow_id = cwl_entity['path']
for step in cwl_entity['steps']:
# Get the resolved path of the step from the step_lookup
step_path: str = step_lookup[step['id']]
if not isinstance(step['run'], str):
run_dict = step['run']
run_dict['path'] = step_path
print(f"processing {step_path}")
process_cwl_entity(driver, run_dict)
continue
# Process the list of inputs of the step
for input in step['in']:
# Create in-parameter node with ID as defined in the component and component ID equal to the path of the step
param_node = ensure_in_parameter_node(driver, input['id'], step_path)
param_node_internal_id = param_node[0]
# Inputs can have one or multiple data sources (data nodes)
if 'source' in input:
if isinstance(input['source'], str):
source_id = input['source']
process_parameter_source(driver, param_node_internal_id, source_id, workflow_id, step_lookup, step['id'])
elif isinstance(input['source'], list):
for source_id in input['source']:
process_parameter_source(driver, param_node_internal_id, source_id, workflow_id, step_lookup, step['id'])
# Process the "when" field, aka control dependencies
if 'when' in step:
when_expr = step['when']
# Exact parameter references within conditional
when_refs = extract_js_expression_dependencies(when_expr)
source = None
for ref in when_refs:
if ref[0] == "parameter":
# Retrieve the source of the referenced input parameter
source = get_input_source(step['in'], ref[1])
if not source:
# The reference already mentions the source (output of a step or workflow input)
source = ref[1]
if source:
# Create control dependencies from the in-parameters of the step to the source of the reference
if isinstance(source, list):
# If the source is a list, process each source ID individually
for source_id in source:
process_control_dependencies(driver, source_id, workflow_id, step_path, step_lookup, step['id'])
else:
# Process the single source dependency
process_control_dependencies(driver, source, workflow_id, step_path, step_lookup, step['id'])
def process_cwl_base_commands(driver: Driver, entity: dict, links: dict[str, dict]):
"""
Processes the 'baseCommand' field in a CWL entity, with the aim of creating relationships
to external GitLab in the graph.
Parameters:
driver: The Neo4j driver for executing queries.
entity (dict): The CWL entity containing the 'baseCommand' field.
links (dict): A dictionary containing:
- "commands": Mapping of command names (executables) to external Git repository they belong to.
- "paths": Mapping of file paths to external Git repository they originate from.
Returns:
list or None: A list of command strings if 'baseCommand' exists, otherwise None.
Process:
- Extracts the 'baseCommand' from the entity.
- If it's a list, retrieves the first command and joins all commands into a string.
- Checks if the first command has a file extension:
- If yes, searches for a matching executable in 'link_paths'.
- If no, checks if it's in 'link_commands'.
- If a match is found, ensures the command's Git node exists and
creates a reference relationship in the database.
"""
base_command_key = "baseCommand"
link_commands = links["commands"]
link_paths = links["paths"]
if base_command_key in entity:
commands = entity[base_command_key]
if isinstance(commands, list):
if commands:
first_command = commands[0]
all_commands = " ".join(commands)
extension = Path(first_command).suffix
if extension:
# If the first command has a file extension, look for an executable match
for key, value in link_paths.items():
if is_executable(key) and first_command in key:
git_internal_node_id = ensure_git_node(driver, value)[0]
create_references_relationship(driver, entity["path"], git_internal_node_id, all_commands)
break
else:
# If no extension, check if the first command exists in link_commands
if first_command in link_commands:
git_internal_node_id = ensure_git_node(driver, link_commands[first_command])[0]
create_references_relationship(driver, entity["path"], git_internal_node_id, all_commands)
return commands
return None
def is_executable(path):
prefix = Path("\\usr\\local\\bin")
return Path(path).parts[:len(prefix.parts)] == prefix.parts
def get_executable(path):
prefix = Path("\\usr\\local\\bin")
if Path(path).parts[:len(prefix.parts)] == prefix.parts:
return Path(*Path(path).parts[len(prefix.parts):])
def process_cwl_commandline(driver: Driver, entity: dict, links: dict[str, dict]) -> None:
"""
Processes the command-line tool CWL entity by resolving
dependencies and linking them to relevant Git repository nodes.
Parameters:
driver: The Noe4j driver for executing queries.
entity (dict): The CWL entity containing command-line tool definitions.
links (dict): A dictionary containing:
- "commands": Mapping of command names to file paths.
- "paths": Mapping of file paths to Git repository locations.
Process:
1. Calls `process_cwl_base_commands()` to extract the command list.
2. Extracts file listings from 'InitialWorkDirRequirement' in 'requirements'.
3. If both commands and listings exist:
- Maps entry names to their content.
- Iterates over commands, checking for references in the listing.
- If a match is found in either 'commands' or 'paths', creates a
reference relationship in the database.
- If the command is executable, checks for references using the
executable's path as well.
Returns:
None
"""
commands = process_cwl_base_commands(driver, entity, links)
listing = None
if "requirements" in entity:
if "InitialWorkDirRequirement" in entity["requirements"]:
listing = entity["requirements"]["InitialWorkDirRequirement"]["listing"]
else:
init_work_dir = next((item for item in entity["requirements"] if item.get('class') == 'InitialWorkDirRequirement'), None)
if init_work_dir:
listing = init_work_dir["listing"]
if commands and listing:
entry_map = {entry["entryname"]: entry["entry"] for entry in listing if "entryname" in entry}
for command in commands:
if command in entry_map:
all_links = links["commands"] | links["paths"]
print(entry_map[command])
for key, value in all_links.items():
path = str(Path(key).as_posix())
if bool(re.search(rf'\b{re.escape(path)}\b', entry_map[command])):
print(f"created: {value}")
git_internal_node_id = ensure_git_node(driver, value)[0]
create_references_relationship(driver, entity["path"], git_internal_node_id, entry_map[command])
if is_executable(key):
executable = str(get_executable(key))
if bool(re.search(rf'\b{re.escape(executable)}\b', entry_map[command])):
print(f"created: {value}")
git_internal_node_id = ensure_git_node(driver, value)[0]
create_references_relationship(driver, entity["path"], git_internal_node_id, entry_map[command])