Skip to content
Snippets Groups Projects

Resolve TMSS-2829

Merged Jorrit Schaap requested to merge TMSS-2829 into master
4 files
+ 139
41
Compare changes
  • Side-by-side
  • Inline
Files
4
  • 1339b2a0
    TMSS-2829: implemented copy service and test to prove its workings · 1339b2a0
    Jorrit Schaap authored
@@ -25,6 +25,8 @@ logger = logging.getLogger(__name__)
from lofar.sas.tmss.client.tmssbuslistener import *
from lofar.sas.tmss.client.tmss_http_rest_client import TMSSsession
from subprocess import call
from lofar.common.cep4_utils import *
class TMSSCopyServiceEventMessageHandler(TMSSEventMessageHandler):
@@ -44,7 +46,52 @@ class TMSSCopyServiceEventMessageHandler(TMSSEventMessageHandler):
if status == 'scheduled':
subtask = self.tmss_client.get_subtask(id)
if subtask['subtask_type'] == 'copy':
logger.info("TODO: implement copy action")
try:
self.tmss_client.set_subtask_status(id, 'starting')
self.tmss_client.set_subtask_status(id, 'started')
# cache to reduced rest-calls
producer2cluster = {}
for input_dataproduct in self.tmss_client.get_subtask_input_dataproducts(subtask['id']):
if subtask['specifications_doc'].get('managed_output', False):
output_dataproduct = self.tmss_client.get_subtask_transformed_output_dataproduct(subtask['id'], input_dataproduct['id'])
destination_path = output_dataproduct['filepath']
else:
destination_path = os.path.join(subtask['specifications_doc']['destination'].rstrip('/'), input_dataproduct['filename'])
# strip localhost as destination host to save on ssh wrapping
destination_path = destination_path.lstrip('localhost:').lstrip('127.0.0.1:')
# prepare the actual copy command
cmd = ['rsync', '-a', '--mkpath', input_dataproduct['filepath'], destination_path]
# check which cluster we run on
if input_dataproduct['producer'] not in producer2cluster:
producer = self.tmss_client.get_url_as_json_object(input_dataproduct['producer'])
filesystem = self.tmss_client.get_url_as_json_object(producer['filesystem'])
cluster = self.tmss_client.get_url_as_json_object(filesystem['cluster'])
producer2cluster[input_dataproduct['producer']] = cluster
# wrap in cep4 ssh call if cep4
cluster = producer2cluster[input_dataproduct['producer']]
if cluster['name'].lower() == 'cep4':
cmd = wrap_command_in_cep4_available_node_with_lowest_load_ssh_call(cmd, via_head=True)
logger.info("copying dataproduct id=%s for copy subtask id=%s, executing: %s", input_dataproduct['id'], subtask['id'], ' '.join(cmd))
if call(cmd) == 0:
logger.info("copied dataproduct id=%s for copy subtask id=%s to '%s'", input_dataproduct['id'], subtask['id'], destination_path)
else:
msg = "could not copy dataproduct id=%s for copy subtask id=%s to '%s'" % (input_dataproduct['id'], subtask['id'], destination_path)
logger.error(msg)
self.tmss_client.set_subtask_status(id, 'error', error_reason=msg)
return
self.tmss_client.set_subtask_status(id, 'finishing')
self.tmss_client.set_subtask_status(id, 'finished')
except Exception as e:
self.tmss_client.set_subtask_status(id, 'error', error_reason=str(e))
Loading