diff --git a/MAC/Services/src/PipelineControl.py b/MAC/Services/src/PipelineControl.py index eca0cc35376b26c3d248716aefd8ed1fc1b63600..180ad921e29b6135e22213bfcce2938ca6528355 100755 --- a/MAC/Services/src/PipelineControl.py +++ b/MAC/Services/src/PipelineControl.py @@ -83,7 +83,6 @@ import os import re from socket import getfqdn import logging -import requests logger = logging.getLogger(__name__) @@ -95,12 +94,6 @@ NUMBER_OF_CORES_PER_NODE = 24 # We /4 because we can then run 4 pipelines, and -2 to reserve cores for TBBwriter DEFAULT_NUMBER_OF_TASKS = (NUMBER_OF_NODES // 4) * (NUMBER_OF_CORES_PER_NODE - 2) // DEFAULT_NUMBER_OF_CORES_PER_TASK -# todo: config file? -TMSS_URL = 'http://localhost:80/' -TMSS_USER = 'paulus' -TMSS_PASS = 'pauluspass' -TMSS_AUTHENTICATION_METHOD = TMSSsession.BASICAUTH - def runCommand(cmdline, input=None): logger.info("runCommand starting: %s", cmdline) @@ -353,20 +346,13 @@ class PipelineControlTMSSHandler(TMSSSubTaskEventMessageHandler): logger.info('PipelineControlTMSS busname=%s', exchange) self.exchange = exchange self.slurm = Slurm() + self.tmss_client = TMSSsession.create_from_dbcreds_for_ldap() - def _setStatus(self, subtask_id, status): - with TMSSsession(TMSS_USER, TMSS_PASS, TMSS_URL, TMSS_AUTHENTICATION_METHOD) as session: - session.patch(url='%s/api/subtask/%s/' % (TMSS_URL, subtask_id), - data={'state': "%s/api/subtask_state/%s/" % (TMSS_URL, status)}) + def start_handling(self): + self.tmss_client.open() - def _getParset(self, subtask_id): - try: - with TMSSsession(TMSS_USER, TMSS_PASS, TMSS_URL, TMSS_AUTHENTICATION_METHOD) as session: - r = session.get(TMSS_URL + 'api/subtask/' + subtask_id + '/parset/') - return Parset(r) - except Exception as e: - logger.error("Cannot retrieve parset of task %s from TMSS: %s", subtask_id, e) - return None + def stop_handling(self): + self.tmss_client.close() def check_scheduled_pipelines(self): """ @@ -377,10 +363,9 @@ class PipelineControlTMSSHandler(TMSSSubTaskEventMessageHandler): try: logger.info("Checking for already scheduled pipelines in TMSS...") - with TMSSsession(TMSS_USER, TMSS_PASS, TMSS_URL, TMSS_AUTHENTICATION_METHOD) as session: - r = session.get(TMSS_URL + 'api/subtask/?state__value=scheduled&format=json') + scheduled_subtasks = self.tmss_client.get_subtasks(state="scheduled") scheduled_pipelines = [] - for subtask in r['results']: + for subtask in scheduled_subtasks: bits = subtask['url'].split['/'] scheduled_pipelines.append(bits[bits.index("subtask") + 1]) @@ -389,7 +374,7 @@ class PipelineControlTMSSHandler(TMSSSubTaskEventMessageHandler): for subtask_id in scheduled_pipelines: logger.info("Checking if scheduled pipeline subtask_id=%s can start.", subtask_id) try: - parset = self._getParset(subtask_id) + parset = Parset(self.tmss_client.get_subtask_parset(subtask_id)) if not parset or not self._shouldHandle(parset): continue self._startPipeline(subtask_id, parset) @@ -400,7 +385,7 @@ class PipelineControlTMSSHandler(TMSSSubTaskEventMessageHandler): def onSubTaskScheduled(self, subtask_id: int, old_state: str, new_state: str): try: - parset = self._getParset(subtask_id) + parset = Parset(self.tmss_client.get_subtask_parset(subtask_id)) if parset and self._shouldHandle(parset): self._startPipeline(subtask_id, parset) except Exception as e: @@ -604,10 +589,10 @@ class PipelineControlTMSSHandler(TMSSSubTaskEventMessageHandler): logger.info("Scheduled SLURM job %s for abort trigger for subtask_id=%s", slurm_cancel_job_id, subtask_id) logger.info("Handed over pipeline %s to SLURM, setting status to QUEUED", subtask_id) - self._setStatus(subtask_id, "queued") + self.tmss_client.set_subtask_status(subtask_id, "queued") except Exception as e: logger.error(str(e)) - self._setStatus(subtask_id, "finished") + self.tmss_client.set_subtask_status(subtask_id, "finished") class PipelineControlHandler( OTDBEventMessageHandler):