Skip to content
Snippets Groups Projects
Commit 85d36643 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

TMSS-61: use methods from TMSSsession to get parset and set state

parent 46a62c9f
No related branches found
No related tags found
1 merge request!154Resolve TMSS-60 and TMSS-171 and TMSS-198
...@@ -83,7 +83,6 @@ import os ...@@ -83,7 +83,6 @@ import os
import re import re
from socket import getfqdn from socket import getfqdn
import logging import logging
import requests
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -95,12 +94,6 @@ NUMBER_OF_CORES_PER_NODE = 24 ...@@ -95,12 +94,6 @@ NUMBER_OF_CORES_PER_NODE = 24
# We /4 because we can then run 4 pipelines, and -2 to reserve cores for TBBwriter # We /4 because we can then run 4 pipelines, and -2 to reserve cores for TBBwriter
DEFAULT_NUMBER_OF_TASKS = (NUMBER_OF_NODES // 4) * (NUMBER_OF_CORES_PER_NODE - 2) // DEFAULT_NUMBER_OF_CORES_PER_TASK DEFAULT_NUMBER_OF_TASKS = (NUMBER_OF_NODES // 4) * (NUMBER_OF_CORES_PER_NODE - 2) // DEFAULT_NUMBER_OF_CORES_PER_TASK
# todo: config file?
TMSS_URL = 'http://localhost:80/'
TMSS_USER = 'paulus'
TMSS_PASS = 'pauluspass'
TMSS_AUTHENTICATION_METHOD = TMSSsession.BASICAUTH
def runCommand(cmdline, input=None): def runCommand(cmdline, input=None):
logger.info("runCommand starting: %s", cmdline) logger.info("runCommand starting: %s", cmdline)
...@@ -353,20 +346,13 @@ class PipelineControlTMSSHandler(TMSSSubTaskEventMessageHandler): ...@@ -353,20 +346,13 @@ class PipelineControlTMSSHandler(TMSSSubTaskEventMessageHandler):
logger.info('PipelineControlTMSS busname=%s', exchange) logger.info('PipelineControlTMSS busname=%s', exchange)
self.exchange = exchange self.exchange = exchange
self.slurm = Slurm() self.slurm = Slurm()
self.tmss_client = TMSSsession.create_from_dbcreds_for_ldap()
def _setStatus(self, subtask_id, status): def start_handling(self):
with TMSSsession(TMSS_USER, TMSS_PASS, TMSS_URL, TMSS_AUTHENTICATION_METHOD) as session: self.tmss_client.open()
session.patch(url='%s/api/subtask/%s/' % (TMSS_URL, subtask_id),
data={'state': "%s/api/subtask_state/%s/" % (TMSS_URL, status)})
def _getParset(self, subtask_id): def stop_handling(self):
try: self.tmss_client.close()
with TMSSsession(TMSS_USER, TMSS_PASS, TMSS_URL, TMSS_AUTHENTICATION_METHOD) as session:
r = session.get(TMSS_URL + 'api/subtask/' + subtask_id + '/parset/')
return Parset(r)
except Exception as e:
logger.error("Cannot retrieve parset of task %s from TMSS: %s", subtask_id, e)
return None
def check_scheduled_pipelines(self): def check_scheduled_pipelines(self):
""" """
...@@ -377,10 +363,9 @@ class PipelineControlTMSSHandler(TMSSSubTaskEventMessageHandler): ...@@ -377,10 +363,9 @@ class PipelineControlTMSSHandler(TMSSSubTaskEventMessageHandler):
try: try:
logger.info("Checking for already scheduled pipelines in TMSS...") logger.info("Checking for already scheduled pipelines in TMSS...")
with TMSSsession(TMSS_USER, TMSS_PASS, TMSS_URL, TMSS_AUTHENTICATION_METHOD) as session: scheduled_subtasks = self.tmss_client.get_subtasks(state="scheduled")
r = session.get(TMSS_URL + 'api/subtask/?state__value=scheduled&format=json')
scheduled_pipelines = [] scheduled_pipelines = []
for subtask in r['results']: for subtask in scheduled_subtasks:
bits = subtask['url'].split['/'] bits = subtask['url'].split['/']
scheduled_pipelines.append(bits[bits.index("subtask") + 1]) scheduled_pipelines.append(bits[bits.index("subtask") + 1])
...@@ -389,7 +374,7 @@ class PipelineControlTMSSHandler(TMSSSubTaskEventMessageHandler): ...@@ -389,7 +374,7 @@ class PipelineControlTMSSHandler(TMSSSubTaskEventMessageHandler):
for subtask_id in scheduled_pipelines: for subtask_id in scheduled_pipelines:
logger.info("Checking if scheduled pipeline subtask_id=%s can start.", subtask_id) logger.info("Checking if scheduled pipeline subtask_id=%s can start.", subtask_id)
try: try:
parset = self._getParset(subtask_id) parset = Parset(self.tmss_client.get_subtask_parset(subtask_id))
if not parset or not self._shouldHandle(parset): if not parset or not self._shouldHandle(parset):
continue continue
self._startPipeline(subtask_id, parset) self._startPipeline(subtask_id, parset)
...@@ -400,7 +385,7 @@ class PipelineControlTMSSHandler(TMSSSubTaskEventMessageHandler): ...@@ -400,7 +385,7 @@ class PipelineControlTMSSHandler(TMSSSubTaskEventMessageHandler):
def onSubTaskScheduled(self, subtask_id: int, old_state: str, new_state: str): def onSubTaskScheduled(self, subtask_id: int, old_state: str, new_state: str):
try: try:
parset = self._getParset(subtask_id) parset = Parset(self.tmss_client.get_subtask_parset(subtask_id))
if parset and self._shouldHandle(parset): if parset and self._shouldHandle(parset):
self._startPipeline(subtask_id, parset) self._startPipeline(subtask_id, parset)
except Exception as e: except Exception as e:
...@@ -604,10 +589,10 @@ class PipelineControlTMSSHandler(TMSSSubTaskEventMessageHandler): ...@@ -604,10 +589,10 @@ class PipelineControlTMSSHandler(TMSSSubTaskEventMessageHandler):
logger.info("Scheduled SLURM job %s for abort trigger for subtask_id=%s", slurm_cancel_job_id, subtask_id) logger.info("Scheduled SLURM job %s for abort trigger for subtask_id=%s", slurm_cancel_job_id, subtask_id)
logger.info("Handed over pipeline %s to SLURM, setting status to QUEUED", subtask_id) logger.info("Handed over pipeline %s to SLURM, setting status to QUEUED", subtask_id)
self._setStatus(subtask_id, "queued") self.tmss_client.set_subtask_status(subtask_id, "queued")
except Exception as e: except Exception as e:
logger.error(str(e)) logger.error(str(e))
self._setStatus(subtask_id, "finished") self.tmss_client.set_subtask_status(subtask_id, "finished")
class PipelineControlHandler( OTDBEventMessageHandler): class PipelineControlHandler( OTDBEventMessageHandler):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment