Skip to content
Snippets Groups Projects
Commit 187b786a authored by Nico Vermaas's avatar Nico Vermaas
Browse files

Merge branch 'add_script_csv_gen' into 'main'

Add script csv gen

See merge request !7
parents 427e49fe 884195fc
No related branches found
No related tags found
2 merge requests!8update dev-nico to make small changes,!7Add script csv gen
Pipeline #34448 passed
Showing
with 353 additions and 24 deletions
*_fixture.json filter=lfs diff=lfs merge=lfs -text
......@@ -9,10 +9,11 @@ workflow:
- if: $CI_COMMIT_REF_NAME == $CI_DEFAULT_BRANCH
variables:
DOCKER_BUILD_IMAGE_TAG: ":stable"
DOCKER_IMAGE_TAG: ":latest"
- if: $CI_COMMIT_REF_NAME != $CI_DEFAULT_BRANCH
variables:
DOCKER_BUILD_IMAGE_TAG: ":latest"
DOCKER_IMAGE_TAG: ":$CI_COMMIT_REF_NAME"
test-code:
image: python:3.10
......@@ -34,11 +35,14 @@ docker-test-build:
if: main
# Official docker image.
image: docker$DOCKER_BUILD_IMAGE_TAG
before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
stage: build
services:
- docker:dind
script:
- docker build --pull -t "$CI_REGISTRY_IMAGE" ldvspec
- docker build --pull -t "$CI_REGISTRY_IMAGE$DOCKER_IMAGE_TAG" ldvspec
- docker push $CI_REGISTRY_IMAGE$DOCKER_IMAGE_TAG
docker-build-master:
......
......@@ -25,12 +25,33 @@ services:
- ldv-spec-db:/var/lib/postgresql/data
restart: always
rabbitmq:
image: rabbitmq:3-management
networks:
- ldv_network
container_name: ldv-spec-rabbit
ldv-specification-background:
container_name: ldv-specification-background
image: git.astron.nl:5000/astron-sdc/ldv-specification:${LDVSPEC_VERSION:-latest}
networks:
- ldv_network
depends_on:
- ldv-spec-db
environment:
CELERY_BROKER_URL: amqp://guest@rabbitmq:5672
DJANGO_SETTINGS_MODULE: 'ldvspec.settings.docker_sdc'
env_file:
- $HOME/shared/ldvspec.env
command: celery -A ldvspec worker -l INFO
restart: always
ldv-specification:
container_name: ldv-specification
image: git.astron.nl:5000/astron-sdc/ldv-specification:latest
image: git.astron.nl:5000/astron-sdc/ldv-specification:${LDVSPEC_VERSION:-latest}
expose:
- "8000"
ports:
- "8000:8000"
networks:
- traefik_proxy
- ldv_network
......@@ -43,11 +64,10 @@ services:
depends_on:
- ldv-spec-db
environment:
CELERY_BROKER_URL: amqp://guest@rabbitmq:5672
env_file:
- $HOME/shared/ldvspec.env
# command: >
# sh -c "python manage.py collectstatic --settings=ldvspec.settings.docker_sdc --noinput &&
# python manage.py migrate --settings=ldvspec.settings.docker_sdc"
restart: always
volumes:
......
Source diff could not be displayed: it is stored in LFS. Options to address this: view the blob.
import os
from celery import Celery
# Set the default Django settings module for the 'celery' program.
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "ldvspec.settings.dev")
app = Celery("lofardata")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
......@@ -37,6 +37,7 @@ INSTALLED_APPS = [
'corsheaders',
'django_filters',
'django_extensions',
'uws'
]
MIDDLEWARE = [
......@@ -131,4 +132,7 @@ REST_FRAMEWORK = {
'PAGE_SIZE': 100
}
ATDB_HOST = os.environ.get('ATDB_HOST', 'http://localhost:8000/atdb/')
# Recommended to use an environment variable to set the broker URL.
CELERY_BROKER_URL = os.getenv("CELERY_BROKER_URL", "amqp://guest@localhost:5672")
UWS_WORKERS = ['lofardata.workers.query.Echo']
\ No newline at end of file
......@@ -22,3 +22,4 @@ DATABASES = {
# https://docs.djangoproject.com/en/1.10/ref/settings/#auth-password-validators
AUTH_PASSWORD_VALIDATORS = []
from django.contrib import admin
# Register your models here.
from .models import DataProduct, DataProductFilter
from .models import DataProduct, DataProductFilter, ATDBProcessingSite
admin.site.register(DataProduct)
admin.site.register(DataProductFilter)
admin.site.register(ATDBProcessingSite)
# Generated by Django 4.1 on 2022-08-10 13:23
from django.conf import settings
import django.contrib.postgres.fields
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
('lofardata', '0004_auto_20220728_1007'),
]
operations = [
migrations.AlterField(
model_name='dataproduct',
name='id',
field=models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'),
),
migrations.AlterField(
model_name='dataproductfilter',
name='id',
field=models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'),
),
migrations.CreateModel(
name='WorkSpecification',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('created_on', models.DateTimeField(auto_now_add=True)),
('filters', models.JSONField(null=True)),
('inputs', models.JSONField(null=True)),
('related_tasks', django.contrib.postgres.fields.ArrayField(base_field=models.IntegerField(), null=True, size=None)),
('is_ready', models.BooleanField(default=False)),
('is_defined', models.BooleanField(default=False)),
('async_task_result', models.CharField(max_length=100, null=True)),
('created_by', models.ForeignKey(null=True, on_delete=django.db.models.deletion.DO_NOTHING, to=settings.AUTH_USER_MODEL)),
],
),
]
# Generated by Django 4.1 on 2022-08-10 15:56
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('lofardata', '0005_include_work_specification'),
]
operations = [
migrations.CreateModel(
name='ATDBProcessingSite',
fields=[
('name', models.CharField(max_length=100, primary_key=True, serialize=False)),
('url', models.URLField()),
],
),
migrations.AddField(
model_name='workspecification',
name='processing_site',
field=models.ForeignKey(null=True, on_delete=django.db.models.deletion.DO_NOTHING, to='lofardata.atdbprocessingsite'),
),
]
# Generated by Django 4.1 on 2022-08-11 09:24
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('lofardata', '0006_atdbprocessingsite_workspecification_processing_site'),
]
operations = [
migrations.AddField(
model_name='workspecification',
name='selected_workflow',
field=models.CharField(max_length=500, null=True),
),
]
# Generated by Django 4.1 on 2022-08-11 11:56
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('lofardata', '0007_workspecification_selected_workflow'),
]
operations = [
migrations.AddField(
model_name='atdbprocessingsite',
name='access_token',
field=models.CharField(max_length=1000, null=True),
),
]
from django.db import models
from django_filters import rest_framework as filters
from urllib.parse import urlsplit
from django.contrib.auth.models import User
from django.contrib.postgres.fields import ArrayField
from celery.result import AsyncResult
class DataLocation(models.Model):
......@@ -74,3 +76,32 @@ class DataProductFilter(models.Model):
field = models.CharField(max_length=100)
name = models.CharField(max_length=20)
lookup_type = models.CharField(max_length=100)
class ATDBProcessingSite(models.Model):
name = models.CharField(primary_key=True, max_length=100)
url = models.URLField()
access_token = models.CharField(max_length=1000, null=True)
class WorkSpecification(models.Model):
created_on = models.DateTimeField(auto_now_add=True)
created_by = models.ForeignKey(User, on_delete=models.DO_NOTHING, null=True)
filters = models.JSONField(null=True)
inputs = models.JSONField(null=True)
selected_workflow = models.CharField(max_length=500, null=True)
related_tasks = ArrayField(models.IntegerField(), null=True)
is_ready = models.BooleanField(default=False)
is_defined = models.BooleanField(default=False)
async_task_result = models.CharField(max_length=100, null=True)
processing_site = models.ForeignKey(ATDBProcessingSite, null=True, on_delete=models.DO_NOTHING)
def save(self, force_insert=False, force_update=False, using=None,
update_fields=None):
super(WorkSpecification, self).save(force_insert=force_insert, force_update=force_update, using=using,
update_fields=update_fields)
if self.async_task_result is None:
from lofardata.tasks import define_work_specification
res: AsyncResult = define_work_specification.delay(self.pk)
self.async_task_result = res.id
super(WorkSpecification, self).save(update_fields=['async_task_result'])
from abc import ABC
from rest_framework import serializers
from .models import DataProduct, DataLocation
from .models import DataProduct, DataLocation, WorkSpecification
class DataLocationSerializer(serializers.ModelSerializer):
......@@ -23,3 +23,14 @@ class DataProductFlatSerializer(serializers.ModelSerializer):
def create(self, validated_data):
return DataProduct.insert_dataproduct(**validated_data)
class WorkSpecificationSerializer(serializers.ModelSerializer):
class Meta:
model = WorkSpecification
fields = '__all__'
def create(self, validated_data):
validated_data['created_by'] = self.context['request'].user
instance = super().create(validated_data)
return instance
from ldvspec.celery import app
from lofardata.models import WorkSpecification, DataProduct
@app.task
def define_work_specification(workspecification_id):
specification = WorkSpecification.objects.get(pk=workspecification_id)
filters = specification.filters
dataproducts = DataProduct.objects.filter(**filters)
inputs = {'surls': [dataproduct.surl for dataproduct in dataproducts]}
specification.inputs = inputs
specification.is_ready = True
specification.save()
......@@ -8,9 +8,18 @@
<table class="table table-striped table-bordered table-sm">
<tbody>
<tr><td>atdb_host</td><td>{{ atdb_host }}</td></tr>
<tr><td>atdb_host</td>
<td><ul>
{% for atdb_host in atdb_hosts %}
<li>{{ atdb_host.name }} ( <a href="{{ atdb_host.url }}">{{ atdb_host.url }}</a> )</li>
{% endfor %}
</ul>
</td>
</tr>
<tr><td>api data</td><td><a href="{% url 'dataproduct' %}">{% url 'dataproduct' %}</a></td></tr>
<tr><td>api data-location</td><td><a href="{% url 'datalocation' %}">{% url 'datalocation' %}</a></td></tr>
<tr><td>api work-specification</td><td><a href="{% url 'workspecification' %}">{% url 'workspecification' %}</a></td></tr>
<tr><td>api-schema</td><td><a href="{% url 'openapi-schema' %}">{% url 'openapi-schema' %}</a></td></tr>
</tbody>
......
import django.test as dtest
import rest_framework.test as rtest
import unittest.mock
from django.contrib.auth.models import User
import rest_framework.status as response_status
from lofardata.models import WorkSpecification, DataProduct
import lofardata.tasks as tasks
class TestWorkSpecificationRequest(rtest.APITestCase):
def setUp(self):
self.user = User.objects.create_superuser('admin')
self.client.force_authenticate(self.user)
@unittest.mock.patch('lofardata.tasks.define_work_specification.delay')
def test_insert_work_request(self, mocked_task):
# Mocking business
mocked_task.return_value = unittest.mock.MagicMock()
mocked_task.return_value.id = 'asyncresultid'
# ----------------------------------
# Insert test data
DataProduct.insert_dataproduct(12345, 'lta',
'myniceinstrument', 'ms', 'blubby', 'pipline',
'srm://lofarlta/mynice_file.tar.gz', 123456, {'dysco_compression': False})
response = self.client.post('/ldvspec/api/v1/workspecification/',
data={'filters': {'obs_id': 12345}}, format='json')
self.assertEqual(201, response.status_code)
inserted_work_specification = response.json()
mocked_task.assert_called_with(inserted_work_specification['id'])
self.assertEqual('asyncresultid', inserted_work_specification['async_task_result'])
tasks.define_work_specification(inserted_work_specification['id'])
work_spec_object = WorkSpecification.objects.get(pk=inserted_work_specification['id'])
self.assertTrue(work_spec_object.is_ready)
self.assertEqual({'surls': ['srm://lofarlta/mynice_file.tar.gz']}, work_spec_object.inputs)
@unittest.mock.patch('lofardata.tasks.define_work_specification.delay')
def test_insert_work_request_nested_fields(self, mocked_task):
# Mocking business
mocked_task.return_value = unittest.mock.MagicMock()
mocked_task.return_value.id = 'asyncresultid'
# ----------------------------------
# Insert test data
DataProduct.insert_dataproduct(12345, 'lta',
'myniceinstrument', 'ms', 'blubby', 'pipline',
'srm://lofarlta/mynice_file.tar.gz', 123456, {'dysco_compression': False})
DataProduct.insert_dataproduct(12345, 'lta',
'myniceinstrument', 'ms', 'blubby', 'pipline',
'srm://lofarlta/mynice_file_with_rhythm.tar.gz', 123456,
{'dysco_compression': True})
response = self.client.post('/ldvspec/api/v1/workspecification/',
data={'filters': {'obs_id': 12345, 'additional_meta__dysco_compression': True}},
format='json')
self.assertEqual(201, response.status_code)
inserted_work_specification = response.json()
mocked_task.assert_called_with(inserted_work_specification['id'])
self.assertEqual('asyncresultid', inserted_work_specification['async_task_result'])
tasks.define_work_specification(inserted_work_specification['id'])
work_spec_object = WorkSpecification.objects.get(pk=inserted_work_specification['id'])
self.assertTrue(work_spec_object.is_ready)
self.assertEqual({'surls': ['srm://lofarlta/mynice_file_with_rhythm.tar.gz']}, work_spec_object.inputs)
......@@ -17,7 +17,10 @@ urlpatterns = [
path('api/v1/insert_dataproduct/', views.InsertMultiDataproductView.as_view(), name='dataproduct-insert'),
path('api/v1/data-location/', views.DataLocationView.as_view(), name='datalocation'),
path('api/v1/data/<int:pk>/', views.DataProductDetailsView.as_view(), name='dataproduct-detail-view-api'),
path('api/v1/workspecification/', views.InsertWorkSpecification.as_view(),
name='workspecification'),
path('api/v1/uws/', include('uws.urls')),
path('api/v1/openapi/', get_schema_view(
title="LDV Specification",
description="API description",
......
import logging
from django.shortcuts import render
from django.conf import settings
from django.contrib.auth.models import User
from rest_framework import generics, pagination
from rest_framework.views import APIView
from rest_framework import status
from rest_framework.response import Response
from rest_framework import generics
from django_filters import rest_framework as filters
from rest_framework.schemas.openapi import AutoSchema
from .models import DataProduct, DataProductFilter, DataLocation
from .serializers import DataProductSerializer, DataProductFlatSerializer, DataLocationSerializer
from .models import DataProduct, DataProductFilter, DataLocation, WorkSpecification, ATDBProcessingSite
from .serializers import DataProductSerializer, \
DataProductFlatSerializer, DataLocationSerializer, \
WorkSpecificationSerializer
class DynamicFilterSet(filters.FilterSet):
......@@ -44,8 +42,8 @@ class DataProductFilterSet(DynamicFilterSet):
# ---------- GUI Views -----------
def index(request):
atdb_host = settings.ATDB_HOST
return render(request, "lofardata/index.html", {'atdb_host': atdb_host})
atdb_hosts = ATDBProcessingSite.objects.values('name', 'url')
return render(request, "lofardata/index.html", {'atdb_hosts': atdb_hosts})
# ---------- REST API views ----------
......@@ -72,15 +70,33 @@ class DataLocationView(generics.ListCreateAPIView):
queryset = DataLocation.objects.all().order_by('name')
class InsertWorkSpecificationSchema(AutoSchema):
def get_operation_id_base(self, path, method, action):
return 'createDataProductMulti'
class InsertMultiDataproductView(generics.CreateAPIView):
"""
Add single DataProduct
"""
queryset = DataProduct.objects.all()
serializer_class = DataProductFlatSerializer
schema = InsertWorkSpecificationSchema()
def get_serializer(self, *args, **kwargs):
""" if an array is passed, set serializer to many """
if isinstance(kwargs.get('data', {}), list):
kwargs['many'] = True
return super().get_serializer(*args, **kwargs)
class InsertWorkSpecification(generics.CreateAPIView, generics.ListCreateAPIView):
queryset = WorkSpecification.objects.all()
serializer_class = WorkSpecificationSerializer
def get_queryset(self):
current_user: User = self.request.user
if not current_user.is_staff or not current_user.is_superuser:
return self.queryset.filter(created_by=current_user.id)
else:
return self.queryset
from uws.classes import UWSJob
from uws.client import Client
from uws.workers import Worker
from lofardata.models import DataProduct, WorkSpecification
class Echo(Worker):
"""A worker echoing all parameters"""
def __init__(self):
self._type = "echo"
def run(self, job: UWSJob, job_token: str, client: Client) -> None:
data = [{"key": p.key, "value": p.value} for p in job.parameters]
client.add_results(job.jobId, data, job_token)
class PrepareWorkSpecification(Worker):
def __init__(self):
self._type = "query_dataproducts"
def run(self, job: UWSJob, job_token: str, client: Client) -> None:
pass
# parameters = {p.key: p.value for p in job.parameters}
# specification_id = parameters.pop('specification_id')
work_specification = WorkSpecification.objects.get(pk=specification_id)
# dataproducts = DataProduct.objects.filter(**filters)
# for data in data
# client.add_results(job.jobId, [], job_token)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment