Skip to content
Snippets Groups Projects
Commit 3c7a58a5 authored by Roy de Goei's avatar Roy de Goei
Browse files

TMSS-687: precalculation service created. Also added in tmss_test_environment

parent 0c4e0a1a
No related branches found
No related tags found
1 merge request!399Resolve TMSS-687
Showing
with 475 additions and 76 deletions
...@@ -6,4 +6,6 @@ lofar_add_package(TMSSPostgresListenerService tmss_postgres_listener) ...@@ -6,4 +6,6 @@ lofar_add_package(TMSSPostgresListenerService tmss_postgres_listener)
lofar_add_package(TMSSWebSocketService websocket) lofar_add_package(TMSSWebSocketService websocket)
lofar_add_package(TMSSWorkflowService workflow_service) lofar_add_package(TMSSWorkflowService workflow_service)
lofar_add_package(TMSSLTAAdapter tmss_lta_adapter) lofar_add_package(TMSSLTAAdapter tmss_lta_adapter)
lofar_add_package(TMSSPreCalculationsService precalculations_service)
lofar_package(TMSSPreCalculationsService 0.1)
lofar_find_package(PythonInterp 3.4 REQUIRED)
IF(NOT SKIP_TMSS_BUILD)
add_subdirectory(lib)
add_subdirectory(test)
ENDIF(NOT SKIP_TMSS_BUILD)
add_subdirectory(bin)
\ No newline at end of file
lofar_add_bin_scripts(tmss_precalculations_service)
# supervisord config files
lofar_add_sysconf_files(tmss_precalculations_service.ini DESTINATION supervisord.d)
#!/usr/bin/python3
# Copyright (C) 2012-2015 ASTRON (Netherlands Institute for Radio Astronomy)
# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
from lofar.sas.tmss.services.precalculations_service import main
if __name__ == "__main__":
main()
[program:tmss_precalculations_service]
command=docker run --rm --net=host -u 7149:7149 -v /opt/lofar/var/log:/opt/lofar/var/log -v /tmp/tmp -v /etc/passwd:/etc/passwd:ro -v /etc/group:/etc/group:ro -v /localhome/lofarsys:/localhome/lofarsys -e HOME=/localhome/lofarsys -e USER=lofarsys nexus.cep4.control.lofar:18080/tmss_django:latest /bin/bash -c 'source ~/.lofar/.lofar_env;source $LOFARROOT/lofarinit.sh;exec tmss_websocket_service'
user=lofarsys
stopsignal=INT ; KeyboardInterrupt
stopasgroup=true ; bash does not propagate signals
stdout_logfile=%(program_name)s.log
redirect_stderr=true
stderr_logfile=NONE
stdout_logfile_maxbytes=0
lofar_find_package(PythonInterp 3.4 REQUIRED)
include(PythonInstall)
set(_py_files
precalculations_service.py
)
python_install(${_py_files}
DESTINATION lofar/sas/tmss/services)
#!/usr/bin/env python3
# Copyright (C) 2012-2015 ASTRON (Netherlands Institute for Radio Astronomy)
# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
import logging
logger = logging.getLogger(__name__)
import os
import threading
import datetime
from datetime import timedelta
import time
from lofar.common.util import waitForInterrupt
# Default values of parameters
INTERVAL_TIME_SECONDS = 12 * 60 * 60 # 12 hours for now
NBR_DAYS_CALCULATE_AHEAD = 365 # 1 year
NBR_DAYS_BEFORE_TODAY = 1
def execute_populate_calculations(nbr_days_calculate_ahead, start_date):
"""
Execute the populate of calculations (sunrise/sunset) for given number of days stating at give date
:param nbr_days_calculate_ahead: Number of days to calculated
:param start_date: The date to start calculate
:return next_date: The next_date to process
"""
logger.info("execute_populate_calculations %s for %d days" % (start_date, nbr_days_calculate_ahead))
# Import here otherwise you get
# "django.core.exceptions.ImproperlyConfigured: Requested setting INSTALLED_APPS, but settings are not configured. You must either define the environment variable DJANGO_SETTINGS_MODULE or call settings.configure() before accessing settings."
from lofar.sas.tmss.tmss.tmssapp.populate import populate_calculations
populate_calculations(nbr_days=nbr_days_calculate_ahead, start_date=start_date)
# Return the next_date to process
next_date = start_date + datetime.timedelta(days=nbr_days_calculate_ahead)
return next_date
class TMSSPreCalculationsServiceJob(threading.Thread):
def __init__(self, interval, execute, *args, **kwargs):
threading.Thread.__init__(self)
self.daemon = False
self.stopped = threading.Event()
self.interval = interval
self.execute = execute
self.args = args
self.kwargs = kwargs
def __enter__(self):
pass
def __exit__(self, exc_type, exc_val, exc_tb):
pass
def stop(self):
self.stopped.set()
self.join()
def run(self):
start_time = time.time()
next_date = self.execute(*self.args, **self.kwargs)
# determine remaining time for exact heartbeat of the interval time
remaining_wait_time_in_sec = self.interval.total_seconds() - (time.time() - start_time)
while not self.stopped.wait(remaining_wait_time_in_sec):
self.kwargs["nbr_days_calculate_ahead"] = 1
self.kwargs["start_date"] = next_date
start_time = time.time()
next_date = self.execute(*self.args, **self.kwargs)
remaining_wait_time_in_sec = self.interval.total_seconds() - (time.time() - start_time)
def create_calculation_service_job(interval_time, nbr_days_calculate_ahead, nbr_days_before_today):
start_date = datetime.date.today() - datetime.timedelta(days=nbr_days_before_today)
return TMSSPreCalculationsServiceJob(interval=timedelta(seconds=interval_time),
execute=execute_populate_calculations,
nbr_days_calculate_ahead=nbr_days_calculate_ahead, start_date=start_date)
def main():
# make sure we run in UTC timezone
os.environ['TZ'] = 'UTC'
from optparse import OptionParser, OptionGroup
from lofar.common import dbcredentials
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
# Check the invocation arguments
parser = OptionParser('%prog [options]',
description='run the tmss_workflow_service which forwards TMSS events to the workflow engine.')
parser.add_option('-i', '--interval_time', dest='interval_time', type='int', default=INTERVAL_TIME_SECONDS,
help='The time between next calculation, default: %default')
parser.add_option('-d', '--nbr_days_calculate_ahead', dest='nbr_days_calculate_ahead', type='int', default=NBR_DAYS_CALCULATE_AHEAD,
help='The number of days to calculate the sunset/sunrise ahead, default: %default')
parser.add_option('-b', '--nbr_days_before_today', dest='nbr_days_before_today', type='int', default=NBR_DAYS_BEFORE_TODAY,
help='The number of days to calculate the sunset/sunrise before today (so yesterday=1), default: %default')
group = OptionGroup(parser, 'Django options')
parser.add_option_group(group)
group.add_option('-C', '--credentials', dest='dbcredentials', type='string', default=os.environ.get('TMSS_DBCREDENTIALS', 'TMSS'), help='django dbcredentials name, default: %default')
(options, args) = parser.parse_args()
from lofar.sas.tmss.tmss import setup_and_check_tmss_django_database_connection_and_exit_on_error
setup_and_check_tmss_django_database_connection_and_exit_on_error(options.dbcredentials)
job = create_calculation_service_job(options.interval_time, options.nbr_days_calculate_ahead, options.nbr_days_before_today)
job.start()
waitForInterrupt()
job.stop()
if __name__ == '__main__':
main()
# $Id: CMakeLists.txt 32679 2015-10-26 09:31:56Z schaap $
if(BUILD_TESTING)
include(LofarCTest)
lofar_add_test(t_precalculations_service)
endif()
#!/usr/bin/env python3
# Copyright (C) 2012-2015 ASTRON (Netherlands Institute for Radio Astronomy)
# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
import unittest
import time
import datetime
import logging
logger = logging.getLogger('lofar.' + __name__)
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
from lofar.sas.tmss.test.test_utils import TMSSTestEnvironment
from lofar.sas.tmss.services.precalculations_service import create_calculation_service_job
from lofar.common.test_utils import integration_test, exit_with_skipped_code_if_skip_integration_tests
exit_with_skipped_code_if_skip_integration_tests()
@integration_test
class TestPreCalculationService(unittest.TestCase):
"""
Tests for the TMSSPreCalculationsServiceJob
It will check the number of items created of the StationTimeline model based on the input of the service to start
It will not check the content of the sunrise/sunset data of the StationTimeline model itself
Note that 1 day calculation will take about 6 seconds
"""
@classmethod
def setUpClass(cls) -> None:
"""
Populate schema to be able to retrieve all stations
"""
cls.tmss_test_env = TMSSTestEnvironment(populate_schemas=True)
cls.tmss_test_env.start()
cls.test_data_creator = cls.tmss_test_env.create_test_data_creator()
@classmethod
def tearDownClass(cls) -> None:
cls.tmss_test_env.stop()
def setUp(self) -> None:
"""
Start every testcase with 'clean' StationTimeline model
"""
from lofar.sas.tmss.tmss.tmssapp.models.calculations import StationTimeline
StationTimeline.objects.all().delete()
def test_all_stations_calculated_for_one_day(self):
"""
Test if creating and starting, followed by stopping the (pre)calculation service results in 'one day'
of StationTimeline data for all stations
Note that 1 day calculation will take about 6 seconds
"""
# Import here otherwise you get
# "django.core.exceptions.ImproperlyConfigured: Requested setting INSTALLED_APPS, but settings are not configured. You must either define the environment variable DJANGO_SETTINGS_MODULE or call settings.configure() before accessing settings."
from lofar.sas.tmss.tmss.tmssapp.conversions import get_all_stations
from lofar.sas.tmss.tmss.tmssapp.models.calculations import StationTimeline
nbr_stations = len(get_all_stations())
# Initially there should be no data
self.assertEqual(0, len(StationTimeline.objects.all()))
# Now we are going to create and start the calculation service with a wait time of 60 sec,
# nbr days to calculate ahead is 1 and nbr days before today 1 -> so only 'yesterday' should be created
job = create_calculation_service_job(60, 1, 1)
job.start()
job.stop()
# Check what have been created
st_objects = StationTimeline.objects.all()
self.assertEqual(nbr_stations, len(st_objects))
# lets check with the timestamp of today, that should be zero
st_objects = StationTimeline.objects.filter(timestamp=datetime.date.today())
self.assertEqual(0, len(st_objects))
# lets check with the timestamp in future, that should be zero
st_objects = StationTimeline.objects.filter(timestamp__gt=datetime.date.today())
self.assertEqual(0, len(st_objects))
# lets check with the timestamp yesterday, that should be equal to the number of all stations
st_objects = StationTimeline.objects.filter(timestamp=datetime.date.today()-datetime.timedelta(days=1))
self.assertEqual(nbr_stations, len(st_objects))
def test_all_stations_calculated_for_multiple_days_with_one_trigger(self):
"""
Test if creating and starting, followed by stopping the (pre)calculation service results in 'multiple day'
of StationTimeline data for all stations
Note that 4 days calculation will take about 30 seconds
"""
# Import here otherwise you get
# "django.core.exceptions.ImproperlyConfigured: Requested setting INSTALLED_APPS, but settings are not configured. You must either define the environment variable DJANGO_SETTINGS_MODULE or call settings.configure() before accessing settings."
from lofar.sas.tmss.tmss.tmssapp.conversions import get_all_stations
from lofar.sas.tmss.tmss.tmssapp.models.calculations import StationTimeline
nbr_stations = len(get_all_stations())
# Initially there should be no data
self.assertEqual(0, len(StationTimeline.objects.all()))
# Now we are going to create and start the calculation service with a interval of 60 sec,
# nbr days to calculate ahead is 4 and nbr days before today 2 -> so 'day before yesterday, 'yesterday',
# 'today' and 'tomorrow' should be created
job = create_calculation_service_job(60, 4, 2)
job.start()
job.stop()
# Check what have been created
st_objects = StationTimeline.objects.all()
self.assertEqual(4*nbr_stations, len(st_objects))
# lets check with the timestamp of today, that should be equal to the number of all stations
st_objects = StationTimeline.objects.filter(timestamp=datetime.date.today())
self.assertEqual(nbr_stations, len(st_objects))
# lets check with the timestamp in future, that should be equal to the number of all stations
st_objects = StationTimeline.objects.filter(timestamp__gt=datetime.date.today())
self.assertEqual(nbr_stations, len(st_objects))
# lets check with the timestamp in the past, that should be equal to the 2 times number of all stations
st_objects = StationTimeline.objects.filter(timestamp__lt=datetime.date.today())
self.assertEqual(2*nbr_stations, len(st_objects))
def test_all_stations_calculated_for_multiple_days(self):
"""
Test if creating and starting, waiting for period (25 seconds), followed by stopping the (pre)calculation service results
in 'multiple day' of StationTimeline data for all stations.
It will test the scheduler with interval of 10 seconds, so three days should be calculated
"""
# Import here otherwise you get
# "django.core.exceptions.ImproperlyConfigured: Requested setting INSTALLED_APPS, but settings are not configured. You must either define the environment variable DJANGO_SETTINGS_MODULE or call settings.configure() before accessing settings."
from lofar.sas.tmss.tmss.tmssapp.conversions import get_all_stations
from lofar.sas.tmss.tmss.tmssapp.models.calculations import StationTimeline
nbr_stations = len(get_all_stations())
# Initially there should be no data
self.assertEqual(0, len(StationTimeline.objects.all()))
# Now we are going to create and start the calculation service with a interval of 10 sec (smaller will not make sense),
# nbr days to calculate ahead is 1 and nbr days before today 0 -> so it start with 'today' and after 10 seconds
# 'tomorrow' etc..,
job = create_calculation_service_job(10, 1, 0)
job.start()
time.sleep(25)
job.stop()
# Check what have been created with interval of 10 seconds we should have three days
st_objects = StationTimeline.objects.all()
self.assertEqual(3*nbr_stations, len(st_objects))
# lets check with the timestamp of today, that should be equal to the number of all stations
st_objects = StationTimeline.objects.filter(timestamp=datetime.date.today())
self.assertEqual(nbr_stations, len(st_objects))
# lets check with the timestamp in future, that should be equal to 2 times the number of all stations
st_objects = StationTimeline.objects.filter(timestamp__gt=datetime.date.today())
self.assertEqual(2*nbr_stations, len(st_objects))
# lets check with the timestamp in the past, that should be equal to zero
st_objects = StationTimeline.objects.filter(timestamp__lt=datetime.date.today())
self.assertEqual(0, len(st_objects))
def test_all_stations_calculated_for_when_interval_time_is_too_small(self):
"""
Check that if the interval time < calculation time it does not lead to exception
Test if creating and starting, waiting for period (10 seconds), followed by stopping the (pre)calculation service results
in 'multiple day' of StationTimeline data for all stations.
It will test the scheduler with interval of 2 seconds, which smaller than ~6 seconds
Stopping after 10 seconds makes 2 days calculated
"""
# Import here otherwise you get
# "django.core.exceptions.ImproperlyConfigured: Requested setting INSTALLED_APPS, but settings are not configured. You must either define the environment variable DJANGO_SETTINGS_MODULE or call settings.configure() before accessing settings."
from lofar.sas.tmss.tmss.tmssapp.conversions import get_all_stations
from lofar.sas.tmss.tmss.tmssapp.models.calculations import StationTimeline
nbr_stations = len(get_all_stations())
# Initially there should be no data
self.assertEqual(0, len(StationTimeline.objects.all()))
# Now we are going to create and start the calculation service with an interval of 2 sec
# nbr days to calculate ahead is 1 and nbr days before today 0 -> so it start with 'today' and after ~6 seconds
# 'tomorrow' etc..
job = create_calculation_service_job(2, 1, 0)
job.start()
time.sleep(10)
job.stop()
# Check what have been created with interval of 2 seconds we should have two days
st_objects = StationTimeline.objects.all()
self.assertEqual(2 * nbr_stations, len(st_objects))
# lets check with the timestamp of today, that should be equal to the number of all stations
st_objects = StationTimeline.objects.filter(timestamp=datetime.date.today())
self.assertEqual(nbr_stations, len(st_objects))
# lets check with the timestamp in future, that should be equal to the number of all stations
st_objects = StationTimeline.objects.filter(timestamp__gt=datetime.date.today())
self.assertEqual(nbr_stations, len(st_objects))
# lets check with the timestamp in the past, that should be equal to zero
st_objects = StationTimeline.objects.filter(timestamp__lt=datetime.date.today())
self.assertEqual(0, len(st_objects))
@integration_test
class TestPreCalculationServiceWithoutSchemas(unittest.TestCase):
"""
Tests for creation/start/stop of the TMSSPreCalculationsServiceJob when NO schemas exist
Check that no exception occur
"""
@classmethod
def setUpClass(cls) -> None:
"""
Do not populate schemas
"""
cls.tmss_test_env = TMSSTestEnvironment(populate_schemas=False)
cls.tmss_test_env.start()
cls.test_data_creator = cls.tmss_test_env.create_test_data_creator()
@classmethod
def tearDownClass(cls) -> None:
cls.tmss_test_env.stop()
def test_no_stations_calculated(self):
"""
Test when no stations exist, the StationTimeLine is empty
"""
# Import here otherwise you get
# "django.core.exceptions.ImproperlyConfigured: Requested setting INSTALLED_APPS, but settings are not configured. You must either define the environment variable DJANGO_SETTINGS_MODULE or call settings.configure() before accessing settings."
from lofar.sas.tmss.tmss.tmssapp.conversions import get_all_stations
from lofar.sas.tmss.tmss.tmssapp.models.calculations import StationTimeline
nbr_stations = len(get_all_stations())
self.assertEqual(0, nbr_stations)
self.assertEqual(0, len(StationTimeline.objects.all()))
# Now we are going to create and start the calculation service with a interval time of 60 sec,
# nbr days to calculate ahead is 1 and nbr days before today 1 ->
# nothing will be created because there are no stations
job = create_calculation_service_job(60, 1, 1)
job.start()
job.stop()
nbr_stations = len(get_all_stations())
self.assertEqual(0, nbr_stations)
self.assertEqual(0, len(StationTimeline.objects.all()))
if __name__ == '__main__':
#run the unit tests
unittest.main()
\ No newline at end of file
#!/bin/bash
# Run the unit test
source python-coverage.sh
python_coverage_test "*tmss*" t_precalculations_service.py
#!/bin/sh
./runctest.sh t_precalculations_service
\ No newline at end of file
...@@ -31,6 +31,7 @@ def create_astroplan_observer_for_station(station: str) -> 'Observer': ...@@ -31,6 +31,7 @@ def create_astroplan_observer_for_station(station: str) -> 'Observer':
# default angle to the horizon at which the sunset/sunrise starts and ends, as per LOFAR definition. # default angle to the horizon at which the sunset/sunrise starts and ends, as per LOFAR definition.
SUN_SET_RISE_ANGLE_TO_HORIZON = Angle(10, unit=astropy.units.deg) SUN_SET_RISE_ANGLE_TO_HORIZON = Angle(10, unit=astropy.units.deg)
# default n_grid_points; higher is more precise but very costly; astropy defaults to 150, errors now can be in the minutes, increase if this is not good enough # default n_grid_points; higher is more precise but very costly; astropy defaults to 150, errors now can be in the minutes, increase if this is not good enough
# TODO: To be considered, now we store the sunset/sunrise data in advanced, we can increase the number of points!!
SUN_SET_RISE_PRECISION = 30 SUN_SET_RISE_PRECISION = 30
......
...@@ -498,4 +498,4 @@ def populate_calculations(nbr_days=3, start_date=date.today()): ...@@ -498,4 +498,4 @@ def populate_calculations(nbr_days=3, start_date=date.today()):
lst_timestamps.append(dt) lst_timestamps.append(dt)
timestamps_and_stations_to_sun_rise_and_set(tuple(lst_timestamps), tuple(get_all_stations()), create_when_not_found=True) timestamps_and_stations_to_sun_rise_and_set(tuple(lst_timestamps), tuple(get_all_stations()), create_when_not_found=True)
logger.info("Done in %.1fs", (datetime.utcnow()-starttime_for_logging).total_seconds()) logger.info("Populate sunrise and sunset done in %.1fs", (datetime.utcnow()-starttime_for_logging).total_seconds())
...@@ -286,6 +286,7 @@ class TMSSTestEnvironment: ...@@ -286,6 +286,7 @@ class TMSSTestEnvironment:
start_pipeline_control: bool=False, start_websocket: bool=False, start_pipeline_control: bool=False, start_websocket: bool=False,
start_feedback_service: bool=False, start_feedback_service: bool=False,
start_workflow_service: bool=False, enable_viewflow: bool=False, start_workflow_service: bool=False, enable_viewflow: bool=False,
start_precalculations_service: bool=False,
ldap_dbcreds_id: str=None, db_dbcreds_id: str=None, client_dbcreds_id: str=None): ldap_dbcreds_id: str=None, db_dbcreds_id: str=None, client_dbcreds_id: str=None):
self._exchange = exchange self._exchange = exchange
self._broker = broker self._broker = broker
...@@ -332,6 +333,9 @@ class TMSSTestEnvironment: ...@@ -332,6 +333,9 @@ class TMSSTestEnvironment:
self.workflow_service = None self.workflow_service = None
os.environ['TMSS_ENABLE_VIEWFLOW'] = str(bool(self.enable_viewflow)) os.environ['TMSS_ENABLE_VIEWFLOW'] = str(bool(self.enable_viewflow))
self._start_precalculations_service = start_precalculations_service
self.precalculations_service = None
# Check for correct Django version, should be at least 3.0 # Check for correct Django version, should be at least 3.0
if django.VERSION[0] < 3: if django.VERSION[0] < 3:
print("\nWARNING: YOU ARE USING DJANGO VERSION '%s', WHICH WILL NOT SUPPORT ALL FEATURES IN TMSS!\n" % print("\nWARNING: YOU ARE USING DJANGO VERSION '%s', WHICH WILL NOT SUPPORT ALL FEATURES IN TMSS!\n" %
...@@ -430,6 +434,8 @@ class TMSSTestEnvironment: ...@@ -430,6 +434,8 @@ class TMSSTestEnvironment:
except Exception as e: except Exception as e:
logger.exception(e) logger.exception(e)
# wait for all services to be fully started in their background threads # wait for all services to be fully started in their background threads
for thread in service_threads: for thread in service_threads:
thread.join() thread.join()
...@@ -447,6 +453,14 @@ class TMSSTestEnvironment: ...@@ -447,6 +453,14 @@ class TMSSTestEnvironment:
logger.info("started TMSSTestEnvironment ldap/database/django + services + schemas + data in %.1fs", (datetime.datetime.utcnow()-starttime).total_seconds()) logger.info("started TMSSTestEnvironment ldap/database/django + services + schemas + data in %.1fs", (datetime.datetime.utcnow()-starttime).total_seconds())
# next service does not have a buslistener, it is just a simple time scheduler and currently rely and
# populated stations schema to retrieve all stations
if self._start_precalculations_service:
from lofar.sas.tmss.services.precalculations_service import create_calculation_service_job
# For testpurposes we can use a smaller range and higher interval frequency
self.precalculations_service = \
create_calculation_service_job(wait_time_seconds=60, nbr_days_calculate_ahead=3, nbr_days_before_today=1)
self.precalculations_service.start()
def stop(self): def stop(self):
if self.workflow_service is not None: if self.workflow_service is not None:
...@@ -477,6 +491,10 @@ class TMSSTestEnvironment: ...@@ -477,6 +491,10 @@ class TMSSTestEnvironment:
self.ra_test_environment.stop() self.ra_test_environment.stop()
self.ra_test_environment = None self.ra_test_environment = None
if self.precalculations_service is not None:
self.precalculations_service.stop()
self.precalculations_service = None
self.django_server.stop() self.django_server.stop()
self.ldap_server.stop() self.ldap_server.stop()
self.database.destroy() self.database.destroy()
...@@ -518,6 +536,7 @@ class TMSSTestEnvironment: ...@@ -518,6 +536,7 @@ class TMSSTestEnvironment:
from lofar.sas.tmss.test.tmss_test_data_rest import TMSSRESTTestDataCreator from lofar.sas.tmss.test.tmss_test_data_rest import TMSSRESTTestDataCreator
return TMSSRESTTestDataCreator(self.django_server.url, (self.django_server.ldap_dbcreds.user, self.django_server.ldap_dbcreds.password)) return TMSSRESTTestDataCreator(self.django_server.url, (self.django_server.ldap_dbcreds.user, self.django_server.ldap_dbcreds.password))
def main_test_database(): def main_test_database():
"""instantiate, run and destroy a test postgress django database""" """instantiate, run and destroy a test postgress django database"""
os.environ['TZ'] = 'UTC' os.environ['TZ'] = 'UTC'
...@@ -550,6 +569,7 @@ def main_test_database(): ...@@ -550,6 +569,7 @@ def main_test_database():
print("Press Ctrl-C to exit (and remove the test database automatically)") print("Press Ctrl-C to exit (and remove the test database automatically)")
waitForInterrupt() waitForInterrupt()
def main_test_environment(): def main_test_environment():
"""instantiate, run and destroy a full tmss test environment (postgress database, ldap server, django server)""" """instantiate, run and destroy a full tmss test environment (postgress database, ldap server, django server)"""
from optparse import OptionParser, OptionGroup from optparse import OptionParser, OptionGroup
...@@ -583,9 +603,9 @@ def main_test_environment(): ...@@ -583,9 +603,9 @@ def main_test_environment():
group.add_option('-V', '--viewflow_service', dest='viewflow_service', action='store_true', help='Enable the viewflow service. Implies --viewflow_app and --eventmessages') group.add_option('-V', '--viewflow_service', dest='viewflow_service', action='store_true', help='Enable the viewflow service. Implies --viewflow_app and --eventmessages')
group.add_option('-w', '--websockets', dest='websockets', action='store_true', help='Enable json updates pushed via websockets') group.add_option('-w', '--websockets', dest='websockets', action='store_true', help='Enable json updates pushed via websockets')
group.add_option('-f', '--feedbackservice', dest='feedbackservice', action='store_true', help='Enable feedbackservice to handle feedback from observations/pipelines which comes in via the (old qpid) otdb messagebus.') group.add_option('-f', '--feedbackservice', dest='feedbackservice', action='store_true', help='Enable feedbackservice to handle feedback from observations/pipelines which comes in via the (old qpid) otdb messagebus.')
group.add_option('-C', '--precalculations_service', dest='precalculations_service', action='store_true', help='Enable the PreCalculations service')
group.add_option('--all', dest='all', action='store_true', help='Enable/Start all the services, upload schemas and testdata') group.add_option('--all', dest='all', action='store_true', help='Enable/Start all the services, upload schemas and testdata')
group.add_option('--simulate', dest='simulate', action='store_true', help='Simulate a run of the first example scheduling_unit (implies --data and --eventmessages and --ra_test_environment)') group.add_option('--simulate', dest='simulate', action='store_true', help='Simulate a run of the first example scheduling_unit (implies --data and --eventmessages and --ra_test_environment)')
group.add_option('--calculation_service', dest='calculation_service', action='store_true', help='Enable the (Pre-)Calculations service')
group = OptionGroup(parser, 'Messaging options') group = OptionGroup(parser, 'Messaging options')
parser.add_option_group(group) parser.add_option_group(group)
...@@ -623,6 +643,7 @@ def main_test_environment(): ...@@ -623,6 +643,7 @@ def main_test_environment():
start_feedback_service=options.feedbackservice or options.all, start_feedback_service=options.feedbackservice or options.all,
enable_viewflow=options.viewflow_app or options.viewflow_service or options.all, enable_viewflow=options.viewflow_app or options.viewflow_service or options.all,
start_workflow_service=options.viewflow_service or options.all, start_workflow_service=options.viewflow_service or options.all,
start_precalculations_service=options.precalculations_service or options.all,
ldap_dbcreds_id=options.LDAP_ID, db_dbcreds_id=options.DB_ID, client_dbcreds_id=options.REST_CLIENT_ID) as tmss_test_env: ldap_dbcreds_id=options.LDAP_ID, db_dbcreds_id=options.DB_ID, client_dbcreds_id=options.REST_CLIENT_ID) as tmss_test_env:
# print some nice info for the user to use the test servers... # print some nice info for the user to use the test servers...
...@@ -657,16 +678,6 @@ def main_test_environment(): ...@@ -657,16 +678,6 @@ def main_test_environment():
except KeyboardInterrupt: except KeyboardInterrupt:
return return
# This is just a 'simple' timing service
if options.calculation_service:
stop_event = threading.Event()
create_calculation_service(stop_event=stop_event)
try:
stop_event.wait()
except KeyboardInterrupt:
print("KeyboardInterrupt")
return
waitForInterrupt() waitForInterrupt()
...@@ -969,70 +980,6 @@ def main_scheduling_unit_blueprint_simulator(): ...@@ -969,70 +980,6 @@ def main_scheduling_unit_blueprint_simulator():
pass pass
def create_calculation_service(stop_event: threading.Event):
"""
First implementation of a simple time scheduler, starting with one major task and then run once a day
It created and start the populate of sunset/sunrise calculations
Should be organised a little bit differently and better!!!!!!!
"""
print("create_calculation_service")
import threading, time, signal
from datetime import timedelta
# Import here otherwise you get
# "django.core.exceptions.ImproperlyConfigured: Requested setting INSTALLED_APPS, but settings are not configured. You must either define the environment variable DJANGO_SETTINGS_MODULE or call settings.configure() before accessing settings."
from lofar.sas.tmss.tmss.tmssapp.populate import populate_calculations
# Some parameters, currently as constants
WAIT_TIME_SECONDS = 300
NBR_DAYS_CALCULATE_AHEAD = 100
NBR_DAYS_BEFORE_TODAY = 1
class ProgramKilled(Exception):
pass
def populate_calculations_once_a_day():
days_offset = NBR_DAYS_CALCULATE_AHEAD - NBR_DAYS_BEFORE_TODAY
populate_calculations(nbr_days=1, start_date=datetime.date.today() + datetime.timedelta(days=days_offset))
def signal_handler(signum, frame):
raise ProgramKilled
class Job(threading.Thread):
def __init__(self, interval, execute, *args, **kwargs):
threading.Thread.__init__(self)
self.daemon = False
self.stopped = threading.Event()
self.interval = interval
self.execute = execute
self.args = args
self.kwargs = kwargs
def stop(self):
self.stopped.set()
self.join()
def run(self):
while not self.stopped.wait(self.interval.total_seconds()):
self.execute(*self.args, **self.kwargs)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
job = Job(interval=timedelta(seconds=WAIT_TIME_SECONDS), execute=populate_calculations_once_a_day)
job.start()
# Start one day before today
populate_calculations(nbr_days=NBR_DAYS_CALCULATE_AHEAD, start_date=datetime.date.today() - datetime.timedelta(days=NBR_DAYS_BEFORE_TODAY))
print("Execute Every %d sec" % WAIT_TIME_SECONDS )
while True:
try:
time.sleep(10)
except ProgramKilled:
print("Program killed: running cleanup code")
job.stop()
break
if __name__ == '__main__': if __name__ == '__main__':
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment