Skip to content
Snippets Groups Projects
Commit 629614a3 authored by Nico Vermaas's avatar Nico Vermaas
Browse files

connecting relationships with signals

parent ab93d0fe
No related branches found
No related tags found
No related merge requests found
Pipeline #8094 passed
...@@ -2,10 +2,18 @@ ...@@ -2,10 +2,18 @@
## Apertif Task Database for LOFAR Data Valorization ## Apertif Task Database for LOFAR Data Valorization
Test Environment: ### Test Environment on sdc.astron.nl:
main GUI:
* https://sdc.astron.nl:5554/atdb/ * https://sdc.astron.nl:5554/atdb/
admin interface:
* https://sdc.astron.nl:5554/atdb/admin/ * https://sdc.astron.nl:5554/atdb/admin/
REST API
* workflows: http://localhost:8000/atdb/workflows/
* tasks: http://localhost:8000/atdb/tasks/
## Micro Services (in separate repo) ## Micro Services (in separate repo)
* https://git.astron.nl/astron-sdc/ldv-services * https://git.astron.nl/astron-sdc/ldv-services
......
...@@ -7,23 +7,32 @@ from django.db.models import Sum ...@@ -7,23 +7,32 @@ from django.db.models import Sum
datetime_format_string = '%Y-%m-%dT%H:%M:%SZ' datetime_format_string = '%Y-%m-%dT%H:%M:%SZ'
class Workflow(models.Model): class Workflow(models.Model):
repository = models.CharField(max_length=100, blank=True, null=True, default="unknown") workflow_uri = models.CharField(max_length=30, blank=True, null=True)
repository = models.CharField(max_length=100, blank=True, null=True)
commit_id = models.CharField(max_length=30, blank=True, null=True) commit_id = models.CharField(max_length=30, blank=True, null=True)
path = models.CharField(max_length=100, blank=True, null=True, default="unknown") path = models.CharField(max_length=100, blank=True, null=True)
class LogEntry(models.Model): def __str__(self):
step_name = models.CharField(max_length=30, blank=True, null=True, default="unknown") return str(self.id)+' - '+str(self.workflow_uri)
class Task(models.Model): class Task(models.Model):
taskID = models.CharField(db_index=True, max_length=30, blank=True, null=True) taskID = models.CharField(db_index=True, max_length=30, blank=True, null=True)
task_type = models.CharField(max_length=20, default="task") task_type = models.CharField(max_length=20, default="task")
# note: the apparent naming reversal is intentional. Predecessors are somebody elses successors.
predecessors = models.ForeignKey('self', related_name='task_successors', on_delete=models.SET_NULL, null=True,blank=True)
successors = models.ForeignKey('self', related_name='task_predecessors', on_delete=models.SET_NULL, null=True,blank=True)
workflow = models.ForeignKey(Workflow, related_name='tasks', on_delete=models.SET_NULL, null=True)
project = models.CharField(max_length=100, blank=True, null=True, default="unknown") project = models.CharField(max_length=100, blank=True, null=True, default="unknown")
sas_id = models.CharField(max_length=30, blank=True, null=True) sas_id = models.CharField(max_length=30, blank=True, null=True)
priority = models.IntegerField(default=0) priority = models.IntegerField(default=0)
purge_policy = models.CharField(max_length=5, default="no", blank=True, null=True) purge_policy = models.CharField(max_length=5, default="no", blank=True, null=True)
workflow_id = models.CharField(max_length=12, blank=True, null=True) desired_workflow_id = models.CharField(max_length=12, blank=True, null=True)
desired_workflow_uri = models.CharField(max_length=100, blank=True, null=True)
workflow = models.ForeignKey(Workflow, related_name='tasks', on_delete=models.SET_NULL, null=True,blank=True)
inputs = models.JSONField(null=True, blank=True) inputs = models.JSONField(null=True, blank=True)
outputs = models.JSONField(null=True, blank=True) outputs = models.JSONField(null=True, blank=True)
...@@ -31,20 +40,33 @@ class Task(models.Model): ...@@ -31,20 +40,33 @@ class Task(models.Model):
skip = models.BooleanField(default=False) skip = models.BooleanField(default=False)
creationTime = models.DateTimeField(default=datetime.utcnow, blank=True) creationTime = models.DateTimeField(default=datetime.utcnow, blank=True)
new_status = models.CharField(max_length=50, default="defined", null=True) desired_status = models.CharField(max_length=50, default="defining", null=True)
my_status = models.CharField(db_index=True, max_length=50,default="defined") status = models.CharField(db_index=True, default="unknown", max_length=50,blank=True, null=True)
def __str__(self): def __str__(self):
return str(self.id) return str(self.id) + ' - ' + str(self.taskID) + ' - ' + str(self.sas_id)
# this translates a view-name (from urls.py) back to a url, to avoid hardcoded url's in the html templates # this translates a view-name (from urls.py) back to a url, to avoid hardcoded url's in the html templates
# bad : <td><a href="/atdb/observations/{{ observation.id }}/" target="_blank">{{ observation.taskID }} </a> </td> # bad : <td><a href="/atdb/taaks/{{ task.id }}/" target="_blank">{{ task.taskID }} </a> </td>
# good: <td><a href="{{ observation.get_absolute_url }}" target="_blank">{{ observation.taskID }} </a> </td> # good: <td><a href="{{ task.get_absolute_url }}" target="_blank">{{ task.taskID }} </a> </td>
def get_absolute_url(self): def get_absolute_url(self):
return reverse('task-detail-view-api', kwargs={'pk': self.pk}) return reverse('task-detail-view-api', kwargs={'pk': self.pk})
class LogEntry(models.Model):
task = models.ForeignKey(Task, related_name='tasks', on_delete=models.CASCADE, null=False)
cpu_cycles = models.IntegerField(null=True,blank=True)
wall_clock_time = models.IntegerField(null=True,blank=True)
url_to_log_file = models.CharField(max_length=100, blank=True, null=True)
step_name = models.CharField(max_length=30, blank=True, null=True)
start_time = models.DateTimeField(blank=True, null=True)
end_time = models.DateTimeField(blank=True, null=True)
desired_status = models.CharField(max_length=50, default="defined", blank=True, null=True)
status = models.CharField(max_length=50,default="defined", blank=True, null=True)
def __str__(self):
return str(self.id)+' - ('+str(self.task__taskID)+')'
class Status(models.Model): class Status(models.Model):
name = models.CharField(max_length=50, default="unknown") name = models.CharField(max_length=50, default="unknown")
timestamp = models.DateTimeField(default=datetime.utcnow, blank=True) timestamp = models.DateTimeField(default=datetime.utcnow, blank=True)
......
...@@ -12,8 +12,10 @@ class TaskSerializer(serializers.ModelSerializer): ...@@ -12,8 +12,10 @@ class TaskSerializer(serializers.ModelSerializer):
class Meta: class Meta:
model = Task model = Task
fields = ('id','task_type','taskID', fields = ('id','task_type','taskID',
'project','sas_id','priority','purge_policy','skip','workflow_id', 'predecessors','successors',
'my_status','new_status', 'project','sas_id','priority','purge_policy','skip',
'desired_workflow_id','desired_workflow_uri','workflow',
'status','desired_status','desired_workflow_uri',
'inputs','outputs','status_history') 'inputs','outputs','status_history')
...@@ -26,6 +28,11 @@ class StatusSerializer(serializers.ModelSerializer): ...@@ -26,6 +28,11 @@ class StatusSerializer(serializers.ModelSerializer):
class WorkflowSerializer(serializers.ModelSerializer): class WorkflowSerializer(serializers.ModelSerializer):
# tasks = serializers.StringRelatedField(
# many=True,
# required=False,
# )
class Meta: class Meta:
model = Workflow model = Workflow
fields = "__all__" fields = "__all__"
......
...@@ -5,7 +5,7 @@ from django.core.signals import request_started, request_finished ...@@ -5,7 +5,7 @@ from django.core.signals import request_started, request_finished
from django.contrib.auth.models import User from django.contrib.auth.models import User
from django.dispatch import receiver from django.dispatch import receiver
from django.contrib.contenttypes.models import ContentType from django.contrib.contenttypes.models import ContentType
from taskdatabase.models import Task, Status from taskdatabase.models import Task, Workflow, LogEntry, Status
from . import jobs from . import jobs
""" """
...@@ -48,17 +48,33 @@ def handle_pre_save(sender, **kwargs): ...@@ -48,17 +48,33 @@ def handle_pre_save(sender, **kwargs):
return None return None
# handle status change # handle status change
my_status = str(myTaskObject.my_status) status = str(myTaskObject.status)
new_status = str(myTaskObject.new_status) desired_status = str(myTaskObject.desired_status)
if (new_status!=None) and (my_status!=new_status):
if (desired_status!=None) and (status!=desired_status):
# set the new status # set the new status
myTaskObject.my_status = new_status myTaskObject.status = desired_status
# add the new to the status history by brewing a status object out of it # add the new to the status history by brewing a status object out of it
myStatus = Status(name=new_status, task=myTaskObject) myStatus = Status(name=desired_status, task=myTaskObject)
myStatus.save() myStatus.save()
# connect the task to a workflow after posting a (flat) task through the REST API
desired_workflow_id = myTaskObject.desired_workflow_id
desired_workflow_uri = myTaskObject.desired_workflow_uri
# first try to find the workflow by desired workflow_id
desired_workflow = Workflow.objects.get(id=desired_workflow_id)
if (desired_workflow == None):
# then try workflow_uri
desired_workflow = Workflow.objects.get(uri=desired_workflow_uri)
# first check if works needs to be done at all
if (myTaskObject.workflow != desired_workflow):
# set the new status
myTaskObject.workflow = desired_workflow
# temporarily disconnect the post_save handler to save the dataproduct (again) and avoiding recursion. # temporarily disconnect the post_save handler to save the dataproduct (again) and avoiding recursion.
# I don't use pre_save, because then the 'created' key is not available, which is the most handy way to # I don't use pre_save, because then the 'created' key is not available, which is the most handy way to
# determine if this dataproduct already exists. (I could also check the database, but this is easier). # determine if this dataproduct already exists. (I could also check the database, but this is easier).
...@@ -67,8 +83,8 @@ def handle_pre_save(sender, **kwargs): ...@@ -67,8 +83,8 @@ def handle_pre_save(sender, **kwargs):
connect_signals() connect_signals()
# dispatch a job if the status has changed. # dispatch a job if the status has changed.
if (new_status != None) and (my_status != new_status): if (desired_status != None) and (status != desired_status):
jobs.dispatchJob(myTaskObject, new_status) jobs.dispatchJob(myTaskObject, desired_status)
@receiver(post_save, sender=Task) @receiver(post_save, sender=Task)
...@@ -79,7 +95,7 @@ def post_save_task_handler(sender, **kwargs): ...@@ -79,7 +95,7 @@ def post_save_task_handler(sender, **kwargs):
def handle_post_save(sender, **kwargs): def handle_post_save(sender, **kwargs):
""" """
pre_save handler for both Task. To create and write its initial status post_save handler for Task. To create and write its initial status
:param (in) sender: The model class that sends the trigger :param (in) sender: The model class that sends the trigger
:param (in) kwargs: The instance of the object that sends the trigger. :param (in) kwargs: The instance of the object that sends the trigger.
""" """
...@@ -91,10 +107,10 @@ def handle_post_save(sender, **kwargs): ...@@ -91,10 +107,10 @@ def handle_post_save(sender, **kwargs):
logger.info("save new "+str(myTaskObject.task_type)) logger.info("save new "+str(myTaskObject.task_type))
# set status # set status
myTaskObject.my_status = myTaskObject.new_status myTaskObject.status = myTaskObject.desired_status
# add the new to the status history by brewing a status object out of it # add the new to the status history by brewing a status object out of it
myStatus = Status(name=myTaskObject.new_status, task=myTaskObject) myStatus = Status(name=myTaskObject.desired_status, task=myTaskObject)
myStatus.save() myStatus.save()
# temporarily disconnect the post_save handler to save the dataproduct (again) and avoiding recursion. # temporarily disconnect the post_save handler to save the dataproduct (again) and avoiding recursion.
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
<meta charset="utf-8"> <meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no"> <meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no">
<title>{% block myBlockTitle %}ATDB{% endblock %}</title> <title>{% block myBlockTitle %}ATDB-LDV{% endblock %}</title>
<!-- loads the path to static files --> <!-- loads the path to static files -->
<link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/4.1.3/css/bootstrap.min.css"> <link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/4.1.3/css/bootstrap.min.css">
......
...@@ -44,7 +44,7 @@ ...@@ -44,7 +44,7 @@
</div> </div>
{% include 'taskdatabase/pagination.html' %} {% include 'taskdatabase/pagination.html' %}
</div> </div>
<p class="footer"> Version 1.0.0 (15 jan 2021 - 17:30) <p class="footer"> Version 1.0.0 (17 jan 2021 - 09:00)
<script type="text/javascript"> <script type="text/javascript">
(function(seconds) { (function(seconds) {
var refresh, var refresh,
......
...@@ -24,7 +24,7 @@ urlpatterns = [ ...@@ -24,7 +24,7 @@ urlpatterns = [
name='get-next-taskid-view'), name='get-next-taskid-view'),
# --- controller resources --- # --- controller resources ---
path('tasks/<int:pk>/setstatus/<new_status>/<page>', path('tasks/<int:pk>/setstatus/<desired_status>/<page>',
views.TaskSetStatus, views.TaskSetStatus,
name='task-setstatus-view'), name='task-setstatus-view'),
......
...@@ -30,7 +30,7 @@ class TaskFilter(filters.FilterSet): ...@@ -30,7 +30,7 @@ class TaskFilter(filters.FilterSet):
fields = { fields = {
'project': ['exact', 'icontains'], 'project': ['exact', 'icontains'],
'sas_id': ['exact', 'icontains'], 'sas_id': ['exact', 'icontains'],
'my_status': ['exact', 'icontains', 'in', 'startswith'], 'status': ['exact', 'icontains', 'in', 'startswith'],
'taskID': ['gt', 'lt', 'gte', 'lte','exact', 'icontains', 'startswith','in'], 'taskID': ['gt', 'lt', 'gte', 'lte','exact', 'icontains', 'startswith','in'],
'purge_policy': ['exact'], 'purge_policy': ['exact'],
'priority': ['exact'], 'priority': ['exact'],
...@@ -70,18 +70,18 @@ class IndexView(ListView): ...@@ -70,18 +70,18 @@ class IndexView(ListView):
context_object_name = 'my_tasks' context_object_name = 'my_tasks'
def get_queryset(self): def get_queryset(self):
my_status = self.request.GET.get('my_status') status = self.request.GET.get('status')
not_my_status = self.request.GET.get('not_my_status') not_status = self.request.GET.get('not_status')
search_box = self.request.GET.get('search_box', None) search_box = self.request.GET.get('search_box', None)
if (search_box is not None): if (search_box is not None):
tasks = get_searched_tasks(search_box) tasks = get_searched_tasks(search_box)
else: else:
tasks = Task.objects.order_by('-creationTime') tasks = Task.objects.order_by('-creationTime')
if (my_status is not None): if (status is not None):
tasks = get_filtered_tasks(my_status) tasks = get_filtered_tasks(status)
if (not_my_status is not None): if (not_status is not None):
tasks = get_unfiltered_tasks(not_my_status) tasks = get_unfiltered_tasks(not_status)
paginator = Paginator(tasks, config.TASKS_PER_PAGE) # Show 50 tasks per page paginator = Paginator(tasks, config.TASKS_PER_PAGE) # Show 50 tasks per page
...@@ -100,17 +100,17 @@ class IndexView(ListView): ...@@ -100,17 +100,17 @@ class IndexView(ListView):
# an attempt to get a filtering mechanism in the GUI # an attempt to get a filtering mechanism in the GUI
# filter on a single status # filter on a single status
# http://localhost:8000/atdb/query?my_status=scheduled # http://localhost:8000/atdb/query?status=scheduled
def get_filtered_tasks(my_status): def get_filtered_tasks(status):
q = Task.objects.order_by('-creationTime') q = Task.objects.order_by('-creationTime')
q = q.filter(my_status=my_status) q = q.filter(status=status)
#q = q.exclude(my_status__icontains='removed') #q = q.exclude(status__icontains='removed')
return q return q
# http://localhost:8000/atdb/query?not_my_status=removed # http://localhost:8000/atdb/query?not_status=removed
def get_unfiltered_tasks(my_status): def get_unfiltered_tasks(status):
q = Task.objects.order_by('-creationTime') q = Task.objects.order_by('-creationTime')
q = q.exclude(my_status=my_status) q = q.exclude(status=status)
return q return q
def get_searched_tasks(search): def get_searched_tasks(search):
...@@ -118,7 +118,7 @@ def get_searched_tasks(search): ...@@ -118,7 +118,7 @@ def get_searched_tasks(search):
Q(taskID__contains=search) | Q(taskID__contains=search) |
Q(sas_id__contains=search) | Q(sas_id__contains=search) |
Q(task_type__icontains=search) | Q(task_type__icontains=search) |
Q(my_status__icontains=search) | Q(status__icontains=search) |
Q(project__icontains=search)).order_by('-creationTime') Q(project__icontains=search)).order_by('-creationTime')
return tasks return tasks
...@@ -189,7 +189,7 @@ class LogEntryDetailsViewAPI(generics.RetrieveUpdateDestroyAPIView): ...@@ -189,7 +189,7 @@ class LogEntryDetailsViewAPI(generics.RetrieveUpdateDestroyAPIView):
# --- controller resources, triggered by a button in the GUI or directoy with a URL --- # --- controller resources, triggered by a button in the GUI or directoy with a URL ---
# set task status to 'new_status' - called from the GUI # set task status to 'desired_status' - called from the GUI
def Skip(request,pk,skip_it,page): def Skip(request,pk,skip_it,page):
...@@ -200,10 +200,10 @@ def Skip(request,pk,skip_it,page): ...@@ -200,10 +200,10 @@ def Skip(request,pk,skip_it,page):
return redirect('/atdb/?page='+page) return redirect('/atdb/?page='+page)
def TaskSetStatus(request,pk,new_status,page): def TaskSetStatus(request,pk,desired_status,page):
model = Task model = Task
task = Task.objects.get(pk=pk) task = Task.objects.get(pk=pk)
task.new_status = new_status task.desired_status = desired_status
task.save() task.save()
return redirect('/atdb/?page='+page) return redirect('/atdb/?page='+page)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment