Skip to content
Snippets Groups Projects
Commit f1adb477 authored by Chiara Liotta's avatar Chiara Liotta
Browse files

first phase algorithm dependenciesdata

parent fd795843
No related branches found
No related tags found
No related merge requests found
Showing
with 249 additions and 0 deletions
.venv/
repos/
Neo4j-25ebc0db-Created-2024-11-17.txt
\ No newline at end of file
File added
File added
File added
File added
File added
from pathlib import Path
from cwl_utils.parser import save
from cwl_utils.parser.cwl_v1_2_utils import load_inputfile
def get_cwl_from_repo(repo_path: str) -> list[dict]:
cwl_entities = []
pathlist = Path(repo_path).glob('**/*.cwl')
for path in pathlist:
path_in_str = str(path)
cwl_obj = load_inputfile(path_in_str)
saved_obj = save(cwl_obj, relative_uris=True)
saved_obj['path'] = path_in_str
cwl_entities.append(saved_obj)
return cwl_entities
\ No newline at end of file
from graph_creation.utils import create_input_nodes_and_relationships, process_source_relationship
from neo4j_queries.node_queries import ensure_component_node, ensure_data_node, ensure_parameter_node
from neo4j_queries.edge_queries import create_data_relationship, create_out_param_relationship
from pathlib import Path
def process_cwl_inputs(driver, cwl_entity: dict):
component_id = cwl_entity['path']
if type(cwl_entity['inputs']) == list:
for input in cwl_entity['inputs']:
if type(input) == dict:
create_input_nodes_and_relationships(driver, input['id'], component_id)
elif type(cwl_entity['inputs']) == dict:
for key in cwl_entity['inputs'].keys():
create_input_nodes_and_relationships(driver, key, component_id)
def process_cwl_outputs(driver, cwl_entity: dict):
component_id = cwl_entity['path']
for output in cwl_entity['outputs']:
if type(output) == dict:
# Create out-parameter node o_node with id = o.id and component_id = c_node.id
param_node = ensure_parameter_node(driver, output['id'], component_id, 'out')
# Create a directed data edge from o_node to c_node
param_node_internal_id = param_node[0]
create_out_param_relationship(driver, component_id, param_node_internal_id)
if 'outputSource' in output:
if type(output['outputSource']) == str:
process_source_relationship(driver, output['outputSource'], component_id, param_node_internal_id)
elif type(output['outputSource']) == list:
for o in output['outputSource']:
process_source_relationship(driver, o, component_id, param_node_internal_id)
def process_cwl_steps(driver, cwl_entity: dict, repo: str):
for step in cwl_entity['steps']:
combined_path = Path(repo) / step['run']
step_path = str(combined_path)
# if a component node with the same path (run) as s does not exist then
# Create component node s_node unique to s with id equal to run
s_node = ensure_component_node(driver, step_path)
s_node_internal_id = s_node[0]
for i in step['in']:
# Create in-parameter node i_node with id = i.id and component_id = s.run
param_node = ensure_parameter_node(driver, i['id'], step_path, 'in')
param_node_internal_id = param_node[0]
# Create a data edge from s_node to i_node
create_data_relationship(driver, s_node_internal_id, param_node_internal_id)
if 'source' in i:
if type(i['source']) == str:
source_id = i['source']
process_source_relationship(driver, source_id, cwl_entity['path'], param_node_internal_id)
elif type(i['source']) == list:
for source_id in i['source']:
process_source_relationship(driver, source_id, cwl_entity['path'], param_node_internal_id)
for o in step['out']:
if type(o) == dict:
o_id = o['id']
else:
o_id = o
# Create out-parameter node o_node with id = o.id and component_id = s.run
param_node = ensure_parameter_node(driver, o_id, step_path, 'out')
param_node_internal_id = param_node[0]
# Create a data edge from o_node to s_node
create_data_relationship(driver, param_node_internal_id, s_node_internal_id)
# Workflow-level outputs of a step have \texttt{id} corresponding to \texttt{[[step ID]/[output ID as defined in workflow]]}
# and a \texttt{component\_id} property equal to the ID of the workflow
# Create data node o_data_node with id = step_id/output_id and component_id = c_node.id
output_id = f"{step['id']}/{o_id}"
data_node = ensure_data_node(driver, output_id, cwl_entity['path'])
data_node_internal_id = data_node[0]
# Create a data edge from o_node to o_data_node
create_data_relationship(driver, param_node_internal_id, data_node_internal_id)
\ No newline at end of file
from graph_creation.cwl_parsing import get_cwl_from_repo
from graph_creation.cwl_processing import process_cwl_inputs, process_cwl_outputs, process_cwl_steps
from neo4j_queries.node_queries import ensure_component_node
def process_repos(repo_list: list, driver):
cwl_entities = {}
for repo in repo_list:
cwl_entities[repo]= get_cwl_from_repo(repo)
for entity in cwl_entities[repo]:
# if a component node with the same path as c does not exist then
# create component node c_node unique to c with id equal to path and alias equal to a empty dictionary
component_id = entity['path']
ensure_component_node(driver, component_id)
process_cwl_inputs(driver, entity)
process_cwl_outputs(driver, entity)
if entity['class'] == 'Workflow':
process_cwl_steps(driver, entity, repo)
from neo4j_queries.node_queries import ensure_data_node, ensure_parameter_node
from neo4j_queries.edge_queries import create_data_relationship, create_in_param_relationship
def create_input_nodes_and_relationships(driver, input_id, component_id):
# Create in-parameter node i_node with id = i.id and component_id = c_node.id
param_node = ensure_parameter_node(driver, input_id, component_id, 'in')
param_node_internal_id = param_node[0]
# Create a directed data edge from c_node to i_node
create_in_param_relationship(driver, component_id, param_node_internal_id)
# Create a data node i_data_node with id = i.id and component_id = c_node.id
data_node = ensure_data_node(driver, input_id, component_id)
data_node_internal_id = data_node[0]
# Create a data edge from i_data_node to i_node
create_data_relationship(driver, data_node_internal_id, param_node_internal_id)
def process_source_relationship(driver, source_id, component_id, param_node_internal_id):
data_node = ensure_data_node(driver, source_id, component_id)
data_node_internal_id = data_node[0]
create_data_relationship(driver, param_node_internal_id, data_node_internal_id)
\ No newline at end of file
main.py 0 → 100644
from graph_creation.repo_processing import process_repos
from neo4j import GraphDatabase
import dotenv
import os
import gitlab
import subprocess
def clone_repos(repo_list: list, folder_name: str):
gl = gitlab.Gitlab('https://git.astron.nl')
projects = gl.projects.list(iterator=True, get_all=True)
for project in projects:
repo_name = project.attributes['path_with_namespace']
if repo_name in repo_list:
git_url = project.ssh_url_to_repo
subprocess.call(['git', 'clone', git_url, f'./{folder_name}/{repo_name}'])
if __name__ == '__main__':
relevant_repos = ['ldv/imaging_compress_pipeline']
folder = 'repos'
clone_repos(relevant_repos)
load_status = dotenv.load_dotenv("Neo4j-25ebc0db-Created-2024-11-17.txt")
if load_status is False:
raise RuntimeError('Environment variables not loaded.')
URI = os.getenv("NEO4J_URI")
AUTH = (os.getenv("NEO4J_USERNAME"), os.getenv("NEO4J_PASSWORD"))
repo_paths = [f'{folder}/{path}' for path in relevant_repos]
print(repo_paths)
with GraphDatabase.driver(URI, auth=AUTH) as driver:
driver.verify_connectivity()
print("Connection established.")
driver = GraphDatabase.driver(URI, auth=AUTH)
process_repos(repo_paths, driver)
driver.close()
File added
File added
File added
File added
from neo4j_queries.utils import clean_component_id
def create_in_param_relationship(driver, prefixed_component_id, parameter_internal_id):
component_id = clean_component_id(prefixed_component_id)
query = """
MATCH (c:Component {component_id: $component_id}), (p)
WHERE id(p) = $parameter_internal_id
MERGE (c)-[:DATA]->(p)
RETURN c.id AS component_id, p.parameter_id AS parameter_id
"""
with driver.session() as session:
result = session.run(query, component_id=component_id,
parameter_internal_id=parameter_internal_id)
record = result.single()
return record["component_id"], record["parameter_id"]
def create_out_param_relationship(driver, prefixed_component_id, parameter_internal_id):
component_id = clean_component_id(prefixed_component_id)
query = """
MATCH (c:Component {component_id: $component_id}), (p)
WHERE id(p) = $parameter_internal_id
MERGE (c)<-[:DATA]-(p)
RETURN c.component_id AS component_id, p.parameter_id AS parameter_id
"""
with driver.session() as session:
result = session.run(query, component_id=component_id,
parameter_internal_id=parameter_internal_id)
record = result.single()
return record["component_id"], record["parameter_id"]
def create_data_relationship(driver, from_internal_node_id, to_internal_node_id):
query = """
MATCH (a), (b)
WHERE id(a) = $from_internal_node_id AND id(b) = $to_internal_node_id
MERGE (a)-[:DATA]->(b)
RETURN a.id AS id_1, b.id AS id_2
"""
with driver.session() as session:
result = session.run(query, from_internal_node_id=from_internal_node_id,
to_internal_node_id=to_internal_node_id)
record = result.single()
return record["id_1"], record["id_2"]
\ No newline at end of file
from neo4j_queries.utils import clean_component_id
def ensure_component_node(driver, prefixed_component_id):
component_id = clean_component_id(prefixed_component_id)
query = """
MERGE (c:Component {component_id: $component_id})
RETURN id(c) AS node_internal_id, c.id AS id_property
"""
with driver.session() as session:
result = session.run(query, component_id=component_id)
record = result.single()
return record["node_internal_id"], record["id_property"]
def ensure_parameter_node(driver, node_id, prefixed_component_id, param_type):
component_id = clean_component_id(prefixed_component_id)
query = """
MERGE (n:Parameter {parameter_id: $node_id, component_id: $component_id})
ON CREATE SET
n.component_id = $component_id,
n.parameter_type = $param_type
RETURN id(n) AS node_internal_id, n.parameter_id AS id_property, n.component_id AS component_id_property,
n.parameter_type AS parameter_type_property
"""
with driver.session() as session:
result = session.run(query, node_id=node_id, component_id=component_id, param_type=param_type)
record = result.single()
return record["node_internal_id"], record["id_property"], record["component_id_property"], record['parameter_type_property']
def ensure_data_node(driver, node_id, prefixed_component_id):
component_id = clean_component_id(prefixed_component_id)
query = """
MERGE (n:Data {data_id: $node_id, component_id: $component_id})
ON CREATE SET
n.component_id = $component_id
RETURN id(n) AS node_internal_id, n.data_id AS id_property, n.component_id AS component_id_property
"""
with driver.session() as session:
result = session.run(query, node_id=node_id, component_id=component_id)
record = result.single()
return record["node_internal_id"], record["id_property"], record["component_id_property"]
\ No newline at end of file
def clean_component_id(prefixed_component_id: str) -> str:
component_id = prefixed_component_id.removeprefix("repos\\")
return component_id
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment