Skip to content
Snippets Groups Projects
Commit ddeec2ff authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

TMSS-493: start background services in parallel in the background (because to...

TMSS-493: start background services in parallel in the background (because to are independent of each other, and this saves startup wait time)
parent 26205b32
Branches
Tags
1 merge request!324Resolve TMSS-493
...@@ -362,17 +362,22 @@ class TMSSTestEnvironment: ...@@ -362,17 +362,22 @@ class TMSSTestEnvironment:
user.is_superuser = True user.is_superuser = True
user.save() user.save()
logger.debug("started TMSSTestEnvironment ldap/database/django in %.1fs", (datetime.datetime.utcnow()-starttime).total_seconds()) logger.info("started TMSSTestEnvironment ldap/database/django in %.1fs", (datetime.datetime.utcnow()-starttime).total_seconds())
# start all (needed) services in background threads, keep track of them.
service_threads = []
if self._start_ra_test_environment: if self._start_ra_test_environment:
self.ra_test_environment = RATestEnvironment(exchange=self._exchange, broker=self._broker) self.ra_test_environment = RATestEnvironment(exchange=self._exchange, broker=self._broker)
self.ra_test_environment.start() service_threads.append(threading.Thread(target=self.ra_test_environment.start))
service_threads[-1].start()
if self._start_postgres_listener: if self._start_postgres_listener:
# start the TMSSPGListener, so the changes in the database are posted as EventMessages on the bus # start the TMSSPGListener, so the changes in the database are posted as EventMessages on the bus
from lofar.sas.tmss.services.tmss_postgres_listener import TMSSPGListener from lofar.sas.tmss.services.tmss_postgres_listener import TMSSPGListener
self.postgres_listener = TMSSPGListener(exchange=self._exchange, broker=self._broker, dbcreds=self.database.dbcreds) self.postgres_listener = TMSSPGListener(exchange=self._exchange, broker=self._broker, dbcreds=self.database.dbcreds)
self.postgres_listener.start() service_threads.append(threading.Thread(target=self.postgres_listener.start))
service_threads[-1].start()
if self._start_websocket: if self._start_websocket:
# start the websocket service, so the changes in the database are posted (via the messagebus) to an http web socket # start the websocket service, so the changes in the database are posted (via the messagebus) to an http web socket
...@@ -380,12 +385,15 @@ class TMSSTestEnvironment: ...@@ -380,12 +385,15 @@ class TMSSTestEnvironment:
self._start_pg_listener = True self._start_pg_listener = True
from lofar.sas.tmss.services.websocket_service import create_service from lofar.sas.tmss.services.websocket_service import create_service
self.websocket_service = create_service(exchange=self._exchange, broker=self._broker) self.websocket_service = create_service(exchange=self._exchange, broker=self._broker)
self.websocket_service.start_listening() service_threads.append(threading.Thread(target=self.websocket_service.start_listening))
service_threads[-1].start()
if self._start_subtask_scheduler: if self._start_subtask_scheduler:
from lofar.sas.tmss.services.scheduling.subtask_scheduling import create_subtask_scheduling_service from lofar.sas.tmss.services.scheduling.subtask_scheduling import create_subtask_scheduling_service
self.subtask_scheduler = create_subtask_scheduling_service(exchange=self._exchange, broker=self._broker) self.subtask_scheduler = create_subtask_scheduling_service(exchange=self._exchange, broker=self._broker)
self.subtask_scheduler.start_listening() service_threads.append(threading.Thread(target=self.subtask_scheduler.start_listening()))
service_threads[-1].start()
if self._start_dynamic_scheduler: if self._start_dynamic_scheduler:
from lofar.sas.tmss.services.scheduling.dynamic_scheduling import create_dynamic_scheduling_service, models from lofar.sas.tmss.services.scheduling.dynamic_scheduling import create_dynamic_scheduling_service, models
...@@ -395,27 +403,38 @@ class TMSSTestEnvironment: ...@@ -395,27 +403,38 @@ class TMSSTestEnvironment:
setting.value = True setting.value = True
setting.save() setting.save()
self.dynamic_scheduler = create_dynamic_scheduling_service(exchange=self._exchange, broker=self._broker) self.dynamic_scheduler = create_dynamic_scheduling_service(exchange=self._exchange, broker=self._broker)
self.dynamic_scheduler.start_listening() service_threads.append(threading.Thread(target=self.dynamic_scheduler.start_listening))
service_threads[-1].start()
if self._start_workflow_service: if self._start_workflow_service:
from lofar.sas.tmss.services.workflow_service import create_workflow_service from lofar.sas.tmss.services.workflow_service import create_workflow_service
self.workflow_service = create_workflow_service(exchange=self._exchange, broker=self._broker) self.workflow_service = create_workflow_service(exchange=self._exchange, broker=self._broker)
self.workflow_service.start_listening() service_threads.append(threading.Thread(target=self.workflow_service.start_listening))
service_threads[-1].start()
if self._start_feedback_service: if self._start_feedback_service:
try: try:
from lofar.sas.tmss.services.feedback_handling import TMSSFeedbackListener from lofar.sas.tmss.services.feedback_handling import TMSSFeedbackListener
self.feedback_service = TMSSFeedbackListener() self.feedback_service = TMSSFeedbackListener()
self.feedback_service.start_handling() service_threads.append(threading.Thread(target=self.feedback_service.start_handling))
service_threads[-1].start()
except Exception as e: except Exception as e:
logger.exception(e) logger.exception(e)
# wait for all services to be fully started in their background threads
for thread in service_threads:
thread.join()
logger.info("started TMSSTestEnvironment ldap/database/django + services in %.1fs", (datetime.datetime.utcnow()-starttime).total_seconds())
if self._populate_schemas or self._populate_test_data: if self._populate_schemas or self._populate_test_data:
self.populate_schemas() self.populate_schemas()
if self._populate_test_data: if self._populate_test_data:
self.populate_test_data() self.populate_test_data()
logger.info("started TMSSTestEnvironment ldap/database/django + services + schemas + data in %.1fs", (datetime.datetime.utcnow()-starttime).total_seconds())
def stop(self): def stop(self):
if self.workflow_service is not None: if self.workflow_service is not None:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment