Newer
Older
#!/usr/bin/env python3
# Copyright (C) 2012-2015 ASTRON (Netherlands Institute for Radio Astronomy)
# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
import os
from optparse import OptionParser, OptionGroup
import logging
logger = logging.getLogger(__name__)
from lofar.sas.tmss.client.tmssbuslistener import *
from lofar.sas.tmss.client.tmss_http_rest_client import TMSSsession
from subprocess import call
from lofar.common.cep4_utils import *

Jorrit Schaap
committed
from lofar.common.subprocess_utils import check_output_returning_strings
class TMSSCopyServiceEventMessageHandler(TMSSEventMessageHandler):
'''
'''
def __init__(self, tmss_client_credentials_id: str="TMSSClient"):
super().__init__()
self.tmss_client = TMSSsession.create_from_dbcreds_for_ldap(tmss_client_credentials_id)

Jorrit Schaap
committed
self._last_df_check_timestamp = datetime.min
def start_handling(self):
self.tmss_client.open()
def stop_handling(self):
self.tmss_client.close()

Jorrit Schaap
committed
def before_receive_message(self):
# use 1-sec event loop to poll queued subtasks (rate limited at 60sec)

Jorrit Schaap
committed
super().before_receive_message()
if datetime.utcnow() - self._last_df_check_timestamp > timedelta(seconds=60):

Jorrit Schaap
committed
self._last_df_check_timestamp = datetime.utcnow()
self.check_and_run_queued_copy_subtask_if_enough_disk_space()
def onSubTaskStatusChanged(self, id: int, status:str):
if status in ('scheduled', 'queued'):
subtask = self.tmss_client.get_subtask(id)
if subtask['subtask_type'] == 'copy':
if status == 'scheduled':
self.queue_copy_subtask(subtask)
elif status == 'queued':

Jorrit Schaap
committed
self.run_copy_subtask_if_enough_disk_space(subtask)
else:
logger.info("skipping subtask id=%s status=%s type=%s", subtask['id'], subtask['state_value'], subtask['subtask_type'])
def queue_copy_subtask(self, subtask):
if subtask['subtask_type'] != 'copy':
return
if subtask['state_value'] != 'scheduled':
return
self.tmss_client.set_subtask_status(subtask['id'], 'queueing')
self.tmss_client.set_subtask_status(subtask['id'], 'queued')

Jorrit Schaap
committed
def check_and_run_queued_copy_subtask_if_enough_disk_space(self):
subtasks = self.tmss_client.get_subtasks(subtask_type='copy', state='queued')
for subtask in subtasks:
self.run_copy_subtask_if_enough_disk_space(subtask)
def run_copy_subtask_if_enough_disk_space(self, subtask):
if subtask['subtask_type'] != 'copy':
return
if subtask['state_value'] != 'queued':
return
try:
# determine destination host and root_dir
destination = subtask['specifications_doc']['destination']
dst_host = destination[:destination.find(':')] if ':' in destination else ''
dst_root_dir = '/'+destination.replace(dst_host + ':', '').split('/')[1]
# remove unneeded localhost to prevent unneeded ssh calls
dst_host = dst_host.replace('localhost:','').replace('127.0.0.1:','')
df_cmd = ['df', dst_root_dir]
if dst_host:
df_cmd = wrap_command_in_ssh_call(df_cmd, dst_host)
logger.info("checking free disk space for copy-subtask id=%s, executing: %s", subtask['id'], ' '.join(df_cmd))
# run df cmd, and parse output for total free disk space
df_result = check_output_returning_strings(df_cmd)
df_result_line = df_result.splitlines()[-1]
df_result_line_parts = df_result_line.split()
df_bytes = int(df_result_line_parts[3])
input_dp_sizes = self.tmss_client.get_url_as_json_object(subtask['url'].rstrip('/')+'/input_dataproducts?fields=size')
total_size = sum(x['size'] for x in input_dp_sizes)
if df_bytes > total_size:
logger.info("enough free disk space available for copy-subtask id=%s destination=%s df=%d needed=%d", subtask['id'], destination, df_bytes, total_size)
# # clear previously set "not enough free disk space available" error_reason if set
# if subtask.get('error_reason'):
# self.tmss_client.do_request_and_get_result_as_string('PATCH', subtask['url'], {'error_reason': None})
# run it
self.run_copy_subtask(subtask)
else:
msg = "not enough free disk space available to start copy-subtask id=%s df=%d needed=%d" % (subtask['id'], df_bytes, total_size)
logger.warning(msg)
# self.tmss_client.do_request_and_get_result_as_string('PATCH', subtask['url'], {'error_reason': msg})
except Exception as e:
logger.exception(str(e))
# try to run it anyway, maybe it fails on not enough disk space.
# if it fails while running, then it results in an error status, and the user can take appropriate action.

Jorrit Schaap
committed
self.run_copy_subtask(subtask)
def run_copy_subtask(self, subtask):
if subtask['subtask_type'] != 'copy':
return
if subtask['state_value'] != 'queued':
return
try:
self.tmss_client.set_subtask_status(subtask['id'], 'starting')
self.tmss_client.set_subtask_status(subtask['id'], 'started')
# cache to reduced rest-calls
# maps producer_id to tuple of producing subtask id and cluster name
_cache = {}
# cache to reduce (ssh) mkdir calls. Only create parent dirs once.
_created_dir_cache = set()

Jorrit Schaap
committed
# ToDo: maybe parallelize this? Are multiple parallel rsync's faster?
for input_dataproduct in self.tmss_client.get_subtask_input_dataproducts(subtask['id']):
# fetch producing subtask id and cluster name for cache if needed
if input_dataproduct['producer'] not in _cache:
producer = self.tmss_client.get_url_as_json_object(input_dataproduct['producer'])
filesystem = self.tmss_client.get_url_as_json_object(producer['filesystem'])
cluster = self.tmss_client.get_url_as_json_object(filesystem['cluster'])
_cache[input_dataproduct['producer']] = {'producing_subtask_id': producer['subtask_id'], 'cluster_name': cluster['name'] }
if subtask['specifications_doc'].get('managed_output', False):
output_dataproduct = self.tmss_client.get_subtask_transformed_output_dataproduct(subtask['id'], input_dataproduct['id'])
output_dp_path = output_dataproduct['filepath']
else:
output_dp_path = os.path.join(subtask['specifications_doc']['destination'].rstrip('/'),
('L' + str(_cache[input_dataproduct['producer']]['producing_subtask_id'])) if subtask['specifications_doc'].get('group_by_id', True) else '',
input_dataproduct['filename'])
# split in host & parent_dir_path
dst_host = output_dp_path[:output_dp_path.find(':')] if ':' in output_dp_path else ''
dst_parent_dir_path = output_dp_path.replace(dst_host + ':','').replace(input_dataproduct['filename'],'')
# replace unneeded localhost to prevent under-the-hood ssh wrapping in rsync, and to prevent unneed ssh calls
dst_host = dst_host.replace('localhost:','').replace('127.0.0.1:', '')
dst_parent_dir_path = dst_parent_dir_path.replace('localhost:','').replace('127.0.0.1:','')
if dst_parent_dir_path not in _created_dir_cache:
# create dst_parent_dir_path directories if needed, prepend them to the cmd
mkdir_cmd = ['mkdir', '-p', dst_parent_dir_path]
if dst_host:
mkdir_cmd = wrap_command_in_ssh_call(mkdir_cmd, dst_host)

Jorrit Schaap
committed
logger.info("creating parent destination dir if needed for copy-subtask id=%s, executing: %s",subtask['id'], ' '.join(mkdir_cmd))
if call(mkdir_cmd) == 0:
_created_dir_cache.add(dst_parent_dir_path)
else:

Jorrit Schaap
committed
msg = "could not create parent destination dir '%s' for copy-subtask id=%s" % (dst_parent_dir_path, subtask['id'])
logger.error(msg)
self.tmss_client.set_subtask_status(subtask['id'], 'error', error_reason=msg)
return
# prepare the actual copy command
cmd = ['rsync', '-a', input_dataproduct['filepath'].rstrip('/'), dst_parent_dir_path]
# wrap in cep4 ssh call if cep4
cluster_name = _cache[input_dataproduct['producer']]['cluster_name']
if cluster_name.lower() == 'cep4':
cmd = wrap_command_in_cep4_available_node_with_lowest_load_ssh_call(cmd, via_head=True)

Jorrit Schaap
committed
logger.info("copying dataproduct id=%s for copy-subtask id=%s, executing: %s", input_dataproduct['id'], subtask['id'], ' '.join(cmd))
if call(cmd) == 0:

Jorrit Schaap
committed
logger.info("copied dataproduct id=%s for copy-subtask id=%s to '%s'", input_dataproduct['id'], subtask['id'], dst_parent_dir_path)

Jorrit Schaap
committed
msg = "could not copy dataproduct id=%s for copy-subtask id=%s to '%s'" % (input_dataproduct['id'], subtask['id'], dst_parent_dir_path)
logger.error(msg)
self.tmss_client.set_subtask_status(subtask['id'], 'error', error_reason=msg)
return
self.tmss_client.set_subtask_status(subtask['id'], 'finishing')
self.tmss_client.set_subtask_status(subtask['id'], 'finished')
except Exception as e:
logger.error(e)
self.tmss_client.set_subtask_status(subtask['id'], 'error', error_reason=str(e))
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
def create_copy_service(exchange: str=DEFAULT_BUSNAME, broker: str=DEFAULT_BROKER, tmss_client_credentials_id: str="TMSSClient"):
return TMSSBusListener(handler_type=TMSSCopyServiceEventMessageHandler,
handler_kwargs={'tmss_client_credentials_id': tmss_client_credentials_id},
exchange=exchange, broker=broker)
def main():
# make sure we run in UTC timezone
os.environ['TZ'] = 'UTC'
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
# Check the invocation arguments
parser = OptionParser('%prog [options]',
description='run the tmss_copy_service which runs the copy-pipeline for scheduled copy-subtasks')
group = OptionGroup(parser, 'Messaging options')
group.add_option('-b', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, help='Address of the message broker, default: %default')
group.add_option('-e', "--exchange", dest="exchange", type="string", default=DEFAULT_BUSNAME, help="exchange where the TMSS event messages are published. [default: %default]")
parser.add_option_group(group)
group = OptionGroup(parser, 'Django options')
parser.add_option_group(group)
group.add_option('-R', '--tmss_client_credentials_id', dest='tmss_client_credentials_id', type='string', default='TMSSClient', help='TMSS django REST API credentials name, default: %default')
(options, args) = parser.parse_args()
# check TMSS is up and running via the client
TMSSsession.check_connection_and_exit_on_error(options.tmss_client_credentials_id)
from lofar.common.util import waitForInterrupt
with create_copy_service(options.exchange, options.broker, options.tmss_client_credentials_id):
waitForInterrupt()
if __name__ == '__main__':
main()