Skip to content
Snippets Groups Projects
Select Git revision
  • 7b5fe4058ccfd578ea83c8a40edeadd5b8d293b4
  • master default protected
  • L2SS-1914-fix_job_dispatch
  • TMSS-3170
  • TMSS-3167
  • TMSS-3161
  • TMSS-3158-Front-End-Only-Allow-Changing-Again
  • TMSS-3133
  • TMSS-3319-Fix-Templates
  • test-fix-deploy
  • TMSS-3134
  • TMSS-2872
  • defer-state
  • add-custom-monitoring-points
  • TMSS-3101-Front-End-Only
  • TMSS-984-choices
  • SDC-1400-Front-End-Only
  • TMSS-3079-PII
  • TMSS-2936
  • check-for-max-244-subbands
  • TMSS-2927---Front-End-Only-PXII
  • Before-Remove-TMSS
  • LOFAR-Release-4_4_318 protected
  • LOFAR-Release-4_4_317 protected
  • LOFAR-Release-4_4_316 protected
  • LOFAR-Release-4_4_315 protected
  • LOFAR-Release-4_4_314 protected
  • LOFAR-Release-4_4_313 protected
  • LOFAR-Release-4_4_312 protected
  • LOFAR-Release-4_4_311 protected
  • LOFAR-Release-4_4_310 protected
  • LOFAR-Release-4_4_309 protected
  • LOFAR-Release-4_4_308 protected
  • LOFAR-Release-4_4_307 protected
  • LOFAR-Release-4_4_306 protected
  • LOFAR-Release-4_4_304 protected
  • LOFAR-Release-4_4_303 protected
  • LOFAR-Release-4_4_302 protected
  • LOFAR-Release-4_4_301 protected
  • LOFAR-Release-4_4_300 protected
  • LOFAR-Release-4_4_299 protected
41 results

storage.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    cwl_processing.py 14.69 KiB
    from pathlib import Path
    import re
    from neo4j import Driver
    from graph_creation.utils import extract_js_expression_dependencies, get_input_source, process_control_dependencies, process_in_param, process_output, process_parameter_source, process_step_lookup
    from neo4j_graph_queries.create_node_queries import ensure_git_node, ensure_in_parameter_node, ensure_out_parameter_node
    from neo4j_graph_queries.create_edge_queries import  create_out_param_relationship, create_references_relationship
    from neo4j_graph_queries.utils import get_is_workflow
    
    
    def process_cwl_entity(driver, entity):
        is_workflow = get_is_workflow(entity)
        steps = None
        if is_workflow:
            steps = process_step_lookup(entity)
        process_cwl_inputs(driver, entity)
        process_cwl_outputs(driver, entity, steps)
        if steps:
            process_cwl_steps(driver, entity, steps)
    
    def process_cwl_inputs(driver: Driver, cwl_entity: dict) -> None:
        """
        Processes the inputs of a CWL  entity, either as a list or a dictionary of inputs,
        and processes each input parameter by calling `process_in_param`.
    
        Parameters:
            driver (Driver): The Neo4j driver used to execute queries.
            cwl_entity (dict): A dictionary representing a CWL entity, which includes an 'inputs' key containing
                               either a list or dictionary of input parameters.
    
        Returns:
            None
        """
        component_id = cwl_entity['path']
        # Process the inputs based on their type (list or dictionary)
        if isinstance(cwl_entity['inputs'], list):
            # If 'inputs' is a list, iterate over each input (which is expected to be a dictionary)
            for input in cwl_entity['inputs']:
                if isinstance(input, dict):
                    process_in_param(driver, input['id'], component_id, input['type'], cwl_entity['class'])
        elif isinstance(cwl_entity['inputs'], dict):
            # If 'inputs' is a dictionary, iterate over the keys (which are the input IDs)
            input_dict = cwl_entity['inputs']
            for key, value in input_dict.items():
                if isinstance(value, dict):
                    process_in_param(driver, key, component_id, value['type'], cwl_entity['class'])
                else:
                    process_in_param(driver, key, component_id, value, cwl_entity['class'])
    
    def process_cwl_outputs(driver: Driver, cwl_entity: dict, step_lookup: dict) -> None:
        """
        Processes the output parameters of a CWL entity by creating the necessary nodes 
        and relationships for each output parameter in a graph or database. The function handles both singular and 
        list-based output sources, ensuring that each output is linked to its corresponding source or sources.
    
        For each output in the CWL entity:
            - An out-parameter node is created for the output.
            - If the output contains an 'outputSource', the function processes the relationship between the output 
              parameter and its source(s). The 'outputSource' can either be a single source ID or a list of source IDs.
    
        Parameters:
            driver (Driver): The Neo4j driver used to execute queries
            cwl_entity (dict): A dictionary representing a CWL entity, which includes:
                - 'path' (str): The path to the CWL file, used as the component ID.
                - 'outputs' (list): A list of output parameters. Each output is a dictionary containing:
                    - 'id' (str): The unique identifier of the output parameter.
                    - 'outputSource' (str or list of str): The source(s) for the output parameter, which can be a single
                      source ID or a list of source IDs.
            step_lookup (dict): A dictionary that maps step IDs to their corresponding resolved paths. This is used to 
                                 resolve the source ID(s) in the 'outputSource' field to their correct locations.
    
        Returns:
            None
        """
        component_id = cwl_entity['path']
        outputs = cwl_entity['outputs']
    
        if isinstance(outputs, list):
            for output in cwl_entity['outputs']:
                output_id = output['id']
                output_source = None
                if 'outputSource' in output:
                    output_source = output['outputSource']
                    process_output(driver, output_id, output['type'], component_id, cwl_entity['class'], step_lookup, output_source)
                else:
                    process_output(driver, output_id, output['type'], component_id, cwl_entity['class'], step_lookup, output_source)
        elif isinstance(outputs, dict):
            for output_id, details in outputs.items():
                if isinstance(details, str):
                    process_output(driver, output_id, details, component_id, cwl_entity['class'], step_lookup)
                else:
                    if 'outputSource' in details:
                        output_source = details['outputSource']
                        process_output(driver, output_id, output['type'], component_id, cwl_entity['class'], step_lookup, output_source)
                    else:
                        process_output(driver, output_id, details['type'], component_id, cwl_entity['class'], step_lookup)
    
    
    def process_cwl_steps(driver: Driver, cwl_entity: dict, step_lookup) -> None:   
        """
        Processes the steps of a CWL entity, creating necessary nodes and relationships 
        for each step. The function handles the inputs, outputs, and control dependencies associated with each step 
        in the workflow
    
        For each step in the CWL entity:
            - The inputs are processed by creating in-parameter nodes and establishing relationships with the step
            - The "when" field (control dependencies) is processed by extracting the dependent parameters or outputs 
              and creating control relationships
    =
        Parameters:
            driver (Driver): The Neo4j driver used to execute queries
            cwl_entity (dict): A dictionary representing a CWL entity, which includes:
                - 'path' (str): The path to the CWL file, used as the component ID.
                - 'steps' (list): A list of steps in the workflow, each step being a dictionary containing:
                    - 'id' (str): The unique identifier for the step.
                    - 'in' (list): A list of inputs for the step.
                    - 'when' (str, optional): A conditional expression controlling the execution of the step.
            step_lookup (dict): A dictionary that maps step IDs to their resolved paths. This is used to resolve 
                                 the actual paths of steps when processing their inputs, outputs, and control dependencies.
    
        Returns:
            None
        """ 
        workflow_id = cwl_entity['path']
    
        for step in cwl_entity['steps']:
    
            # Get the resolved path of the step from the step_lookup
            step_path: str = step_lookup[step['id']]
    
            if not isinstance(step['run'], str):
                run_dict = step['run']
                run_dict['path'] = step_path
                print(f"processing {step_path}")
                process_cwl_entity(driver, run_dict)
                continue
    
            # Process the list of inputs of the step 
            for input in step['in']:
                # Create in-parameter node with ID as defined in the component and component ID equal to the path of the step
                param_node = ensure_in_parameter_node(driver, input['id'], step_path)
                param_node_internal_id = param_node[0]
    
                # Inputs can have one or multiple data sources (data nodes)
                if 'source' in input:
                    if isinstance(input['source'], str):
                        source_id = input['source']
                        process_parameter_source(driver, param_node_internal_id, source_id, workflow_id, step_lookup, step['id'])
                    elif isinstance(input['source'], list):
                        for source_id in input['source']:
                            process_parameter_source(driver, param_node_internal_id, source_id, workflow_id, step_lookup, step['id'])
    
            # Process the "when" field, aka control dependencies
            if 'when' in step:
                when_expr = step['when']
                # Exact parameter references within conditional
                when_refs = extract_js_expression_dependencies(when_expr)
                source = None
                for ref in when_refs:
                    if ref[0] == "parameter":
                        # Retrieve the source of the referenced input parameter
                        source = get_input_source(step['in'], ref[1])
                            
                    if not source: 
                        # The reference already mentions the source (output of a step or workflow input)
                        source = ref[1]
                    if source:
                        # Create control dependencies from the in-parameters of the step to the source of the reference
                        if isinstance(source, list):
                            # If the source is a list, process each source ID individually
                            for source_id in source:
                                process_control_dependencies(driver, source_id, workflow_id, step_path, step_lookup, step['id'])
                        else:
                            # Process the single source dependency
                            process_control_dependencies(driver, source, workflow_id, step_path, step_lookup, step['id'])
    
    
    def process_cwl_base_commands(driver: Driver, entity: dict, links: dict[str, dict]):
        """
        Processes the 'baseCommand' field in a CWL entity, with the aim of creating relationships 
        to external GitLab in the graph.
    
        Parameters:
            driver: The Neo4j driver for executing queries.
            entity (dict): The CWL entity containing the 'baseCommand' field.
            links (dict): A dictionary containing:
                - "commands": Mapping of command names (executables) to external Git repository they belong to.
                - "paths": Mapping of file paths to external Git repository they originate from.
    
        Returns:
            list or None: A list of command strings if 'baseCommand' exists, otherwise None.
    
        Process:
            - Extracts the 'baseCommand' from the entity.
            - If it's a list, retrieves the first command and joins all commands into a string.
            - Checks if the first command has a file extension:
                - If yes, searches for a matching executable in 'link_paths'.
                - If no, checks if it's in 'link_commands'.
            - If a match is found, ensures the command's Git node exists and 
              creates a reference relationship in the database.
        """
        base_command_key = "baseCommand"
        link_commands = links["commands"]
        link_paths = links["paths"]
        if base_command_key in entity:
            commands = entity[base_command_key]
            if isinstance(commands, list):
                if commands:
                    first_command = commands[0]
                    all_commands = " ".join(commands)
                    extension = Path(first_command).suffix
                    if extension:
                        # If the first command has a file extension, look for an executable match
                        for key, value in link_paths.items():
                            if is_executable(key) and first_command in key:
                                git_internal_node_id = ensure_git_node(driver, value)[0]
                                create_references_relationship(driver, entity["path"], git_internal_node_id, all_commands)
                                break
                    else:
                        # If no extension, check if the first command exists in link_commands
                        if first_command in link_commands:
                            git_internal_node_id = ensure_git_node(driver, link_commands[first_command])[0]
                            create_references_relationship(driver, entity["path"], git_internal_node_id, all_commands)
            return commands
        return None
    
    def is_executable(path):
        prefix = Path("\\usr\\local\\bin")
        return Path(path).parts[:len(prefix.parts)] == prefix.parts
    
    def get_executable(path):
        prefix = Path("\\usr\\local\\bin")
        if Path(path).parts[:len(prefix.parts)] == prefix.parts:
            return Path(*Path(path).parts[len(prefix.parts):])
    
    def process_cwl_commandline(driver: Driver, entity: dict, links: dict[str, dict]) -> None:
        """
        Processes the command-line tool CWL entity by resolving 
        dependencies and linking them to relevant Git repository nodes.
    
        Parameters:
            driver: The Noe4j driver for executing queries.
            entity (dict): The CWL entity containing command-line tool definitions.
            links (dict): A dictionary containing:
                - "commands": Mapping of command names to file paths.
                - "paths": Mapping of file paths to Git repository locations.
    
        Process:
            1. Calls `process_cwl_base_commands()` to extract the command list.
            2. Extracts file listings from 'InitialWorkDirRequirement' in 'requirements'.
            3. If both commands and listings exist:
                - Maps entry names to their content.
                - Iterates over commands, checking for references in the listing.
                - If a match is found in either 'commands' or 'paths', creates a 
                  reference relationship in the database.
                - If the command is executable, checks for references using the 
                  executable's path as well.
    
        Returns:
            None
        """
        commands = process_cwl_base_commands(driver, entity, links)
        listing = None
        if "requirements" in entity:
            if "InitialWorkDirRequirement" in entity["requirements"]:
                listing = entity["requirements"]["InitialWorkDirRequirement"]["listing"]
            else:
                init_work_dir = next((item for item in entity["requirements"] if item.get('class') == 'InitialWorkDirRequirement'), None)
                if init_work_dir:
                    listing = init_work_dir["listing"]
        if commands and listing:
            entry_map = {entry["entryname"]: entry["entry"] for entry in listing if "entryname" in entry}
            for command in commands:
                if command in entry_map:
                    all_links = links["commands"] | links["paths"]
                    print(entry_map[command])
    
                    for key, value in all_links.items():
                        path = str(Path(key).as_posix())
                        if bool(re.search(rf'\b{re.escape(path)}\b', entry_map[command])):
                            print(f"created: {value}")
                            git_internal_node_id = ensure_git_node(driver, value)[0]
                            create_references_relationship(driver, entity["path"], git_internal_node_id, entry_map[command])
                        if is_executable(key):
                            executable = str(get_executable(key))
                            if bool(re.search(rf'\b{re.escape(executable)}\b', entry_map[command])):
                                print(f"created: {value}")
                                git_internal_node_id = ensure_git_node(driver, value)[0]
                                create_references_relationship(driver, entity["path"], git_internal_node_id, entry_map[command])