Commit 5a9077e5 authored by Jorrit Schaap's avatar Jorrit Schaap

Merged LOFAR-Release-4_0 into master. saved production edits from post stop-day tweaks and fixes

parents 3246e12c 8d943635
......@@ -19,12 +19,12 @@
try:
import proton
import proton.utils
import uuid
MESSAGING_ENABLED = True
except ImportError:
from . import noqpidfallback as proton
MESSAGING_ENABLED = False
import uuid
import xml.dom.minidom as xml
import xml.parsers.expat as expat
from xml.sax.saxutils import escape
......@@ -223,7 +223,7 @@ class MessageContent(object):
def _property_list(self):
""" List of XML elements that are exposed as properties. """
return {
return {
"system": "message.header.system",
"headerVersion": "message.header.version",
"protocol": "message.header.protocol.name",
......@@ -288,4 +288,3 @@ if __name__ == "__main__":
m = MessageContent("FROM", "FORUSER", "SUMMARY", "PROTOCOL", "1.2.3", "11111", "22222")
print(str(m))
print(m.content())
......@@ -24,6 +24,7 @@ set(_py_files
postgres.py
datetimeutils.py
flask_utils.py
h5_utils.py
subprocess_utils.py
xmlparse.py
json_utils.py
......
# Copyright (C) 2012-2015 ASTRON (Netherlands Institute for Radio Astronomy)
# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
import os.path
from datetime import datetime, timedelta
from time import sleep
import errno
import os
# prevent annoying h5py future/deprecation warnings
os.environ["TF_CPP_MIN_LOG_LEVEL"]="3"
import h5py
import logging
logger = logging.getLogger(__name__)
class SharedH5File():
"""
Wrapper class aroung h5py.File to open an hdf5 file in read, write, or read/write mode safely,
even when the file might be used simultanously by other processes.
It waits for <timeout> seconds until the file becomes available.
Example usage:
with SharedH5File("foo.h5", 'r') as file:
file["foo"] = "bar"
"""
def __init__(self, path, mode='r', timeout=900):
self._path = path
self._mode = mode
self._timeout = timeout
self._file = None
def open(self):
start_timestamp = datetime.utcnow()
while self._file is None:
try:
self._file = h5py.File(self._path, self._mode)
return self._file
except IOError:
if self._path.startswith('~'):
# try again with tilde replaced
self._path = os.path.expanduser(self._path)
continue
if not os.path.exists(self._path):
raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), self._path)
logger.warning("Cannot open file '%s' with mode '%s'. Trying again in 1 sec...",
self._path, self._mode)
sleep(max(0, min(1, self._timeout)))
if datetime.utcnow() - start_timestamp > timedelta(seconds=self._timeout):
logger.error("Cannot open file '%s' with mode '%s', even after trying for %s seconds",
self._path, self._mode, self._timeout)
raise
def close(self):
self._file.close()
self._file = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
return self.close()
class WorkingDirContext():
"""
A WorkingDirContext can be used to make sure that the raw-sibling-file of an h5 file can be read,
because the working dir needs to be the dir of the h5 file (h5py demands that).
When leaving this context, the working dir is restored to the situation before the context was entered.
Example usage:
with h5py.File("foo.h5", "r") as my_h5_file:
with WorkingDirContext(my_h5_file):
my_h5_file["foo"] = "bar"
"""
def __init__(self, h5_file: h5py.File):
self._h5_file = h5_file
self._original_cwd = os.getcwd()
def change_dir_to_h5_file(self):
self._original_cwd = os.getcwd()
os.chdir(os.path.dirname(self._h5_file.filename))
def change_dir_to_original(self):
os.chdir(self._original_cwd)
def __enter__(self):
self.change_dir_to_h5_file()
return self._h5_file
def __exit__(self, exc_type, exc_val, exc_tb):
self.change_dir_to_original()
......@@ -48,4 +48,4 @@ def assertEqualXML(test, expected):
integration_test = unittest.skipIf(os.environ.get('SKIP_INTEGRATION_TESTS', default='False').lower() in ['1', 'true'],
'Integration tests are disabled via env SKIP_INTEGRATION_TESTS')
unit_test = unittest.skipIf(os.environ.get('SKIP_UNIT_TESTS', default='False').lower() in ['1', 'true'],
'Unit tests are disabled via env SKIP_UNIT_TESTS')
\ No newline at end of file
'Unit tests are disabled via env SKIP_UNIT_TESTS')
......@@ -55,10 +55,11 @@ class ObservationControlHandler(ServiceMessageHandler):
killed = False
pid_line = self.connection.run('pidof ObservationControl').stdout
pid_line = self.connection.run('/usr/sbin/pidof ObservationControl').stdout.strip('\n')
pids = pid_line.split(' ')
for pid in pids:
logger.info("Running: ps -p %s --no-heading -o command | awk -F[{}] '{ printf $2; }'", pid)
pid_sas_id = self.connection.run(
"ps -p %s --no-heading -o command | awk -F[{}] '{ printf $2; }'" % pid).stdout
if str(pid_sas_id) == str(sas_id):
......
......@@ -19,4 +19,5 @@
lofar_add_bin_scripts(show_hdf5_info
find_hdf5
add_parset_to_hdf5
create_test_hypercube)
......@@ -45,7 +45,6 @@ def main():
if len(args) != 1:
print('Please provide a file name for the h5 file which you want to create...\n')
parser.print_help()
exit(1)
......
......@@ -25,7 +25,7 @@ if __name__ == '__main__':
import os.path
import sys
import fnmatch
import glob
from pathlib import Path
from optparse import OptionParser, OptionGroup
from datetime import datetime, timedelta
......@@ -99,10 +99,10 @@ if __name__ == '__main__':
path = os.path.dirname(os.path.expanduser(args[0]) if len(args) == 1 else os.getcwd())
files = glob.glob(os.path.join(path, '*.h*5'))
files = [str(p) for p in Path(path).glob('*.h*5')]
if path == os.getcwd():
files = [os.path.basename(file) for file in files]
if not files:
print("Could not find any h5/hdf5 files in", path)
files = sorted(files)
......@@ -174,9 +174,9 @@ if __name__ == '__main__':
if options.info:
for file in files:
print read_info_from_hdf5(file, read_data_info=False)
print(read_info_from_hdf5(file, read_data_info=False))
else:
print '\n'.join(files)
print('\n'.join(files))
......@@ -26,7 +26,6 @@ if __name__ == '__main__':
from optparse import OptionParser
from lofar.qa.hdf5_io import *
from lofar.parameterset import *
# make sure we run in UTC timezone
os.environ['TZ'] = 'UTC'
......@@ -40,7 +39,7 @@ if __name__ == '__main__':
(options, args) = parser.parse_args()
if len(args) != 1:
print parser.print_help()
parser.print_help()
exit(-1)
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
......@@ -48,4 +47,10 @@ if __name__ == '__main__':
hdf_path = os.path.expanduser(args[0])
print read_info_from_hdf5(hdf_path, read_data_info=options.data, read_parset_info=True)
info_string = read_info_from_hdf5(hdf_path, read_data_info=options.data, read_parset_info=True)
if info_string:
print(info_string)
else:
print("Could not find any info in", hdf_path)
exit(1)
This diff is collapsed.
......@@ -116,7 +116,7 @@ def create_hypercube(num_saps=3, num_stations=5, num_timestamps=11, num_subbands
# generate 'ticks' along the central_frequencies-axes
# fill the HBA frequency range of 120-240MHz
central_frequencies = [120e6+i*120e6/max(1,num_subbands-1) for i in range(num_subbands)]
sb_offset = sum([len(sap['subbands']) for sap in list(data.values())])
sb_offset = sum([len(sap['subbands']) for sap in data.values()])
subbands = [i for i in range(sb_offset, sb_offset+num_subbands)]
# create some synthetic antenna locations
......
This diff is collapsed.
......@@ -20,36 +20,57 @@
import logging
logger = logging.getLogger(__name__)
from lofar.common.cep4_utils import *
from subprocess import call
from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RADBRPC
from lofar.messaging import ToBus, EventMessage, DEFAULT_BROKER, adaptNameToEnvironment
from optparse import OptionParser, OptionGroup
from datetime import datetime
from datetime import datetime, timedelta
if __name__ == '__main__':
# Check the invocation arguments
parser = OptionParser("%prog <otdb_id>",description='enqueue a conversion from MS to hdf5, cluster the baselines, and create the default plots in the qa_service.')
parser = OptionParser("%prog -o <otdb_id>",description='enqueue a conversion from MS to hdf5, cluster the baselines, and create the default plots in the qa_service.')
group = OptionGroup(parser, 'Messaging options')
group.add_option('-b', '--broker', dest='broker', type='string', default=DEFAULT_BROKER,
help='Address of the messaging broker, default: %default')
group.add_option('-q', "--queue", dest="queue", type="string",
default=adaptNameToEnvironment('lofar.queue.for.qa_service.QAService.on.OTDB.notification.TaskStatus'),
default=adaptNameToEnvironment('lofar.queue.for.qa_service.QAFilteredOTDBBusListener.on.filtered.OTDB.notification.TaskStatus'),
help="queue where the QAService listens for the notification messages. [default: %default]")
group.add_option('-o', '--otdb_id', dest="otdb_id", type="int",
help="the otdb_id of the task which needs to be enqueued for qa plots")
group.add_option('-a', '--all', dest="all", action='store_true',
help="enqueue all tasks which were not converted yet in the past two weeks. (pipelines are prioritized 2 below observations)")
group.add_option('-p', '--priority', dest="priority", type="int", default=4,
help="priority of the enqueued task. (low=0, normal=4, high=9) [default: %default]")
parser.add_option_group(group)
(options, args) = parser.parse_args()
if len(args) != 1:
if (not options.otdb_id and not options.all) or (options.otdb_id and options.all):
print(parser.print_help())
exit(-1)
otdb_id = int(args[0])
options.priority = min(9, max(1, options.priority))
if options.all:
with RADBRPC.create(broker=options.broker) as rpc:
tasks = rpc.getTasks(lower_bound=datetime.utcnow()-timedelta(days=14), task_type=['observation','pipeline'], task_status=['finished'])
otdb_id_priorities = [(t['otdb_id'], options.priority if t['type']=='observation' else max(1, options.priority-2))
for t in tasks]
else:
otdb_id_priorities = [(options.otdb_id, options.priority)]
#config logging
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
with ToBus(exchange='', broker=options.broker) as tobus:
for status in ['completing', 'finished']:
content = {"treeID": otdb_id, "state": status, "time_of_change": datetime.utcnow() }
msg = EventMessage(subject=options.queue, content=content, priority=options.priority)
logging.info("sending: %s", msg)
tobus.send(msg)
for otdb_id, priority in otdb_id_priorities:
plots_path = '/qa/plots/L%s' % otdb_id
cmd = ['ls', plots_path, '>&', '/dev/null']
cmd = wrap_command_in_cep4_head_node_ssh_call(cmd)
if call(cmd, stdout=None, stderr=None) != 0:
for status in ['completing', 'finished']:
content = {"treeID": otdb_id, "state": status, "time_of_change": datetime.utcnow() }
msg = EventMessage(subject=options.queue, content=content, priority=priority)
logging.info("sending: %s", msg)
tobus.send(msg)
......@@ -34,7 +34,8 @@ from lofar.sas.otdb.otdbrpc import OTDBRPC
logger = logging.getLogger(__name__)
QA_BASE_DIR = '/data/qa'
QA_LUSTRE_BASE_DIR = '/data/qa'
QA_NFS_BASE_DIR = '/qa'
DEFAULT_FILTERED_OTDB_NOTIFICATION_SUBJECT = "filtered.%s" % (DEFAULT_OTDB_NOTIFICATION_SUBJECT,)
#TODO: idea: convert periodically while observing?
......@@ -83,9 +84,8 @@ class QAFilteredOTDBBusListener(BusListener):
upon observation/pipeline completion. The qa processes convert MS (measurement sets) to hdf5 qa files,
and then starts generating plots from the hdf5 file.
'''
def __init__(self, qa_base_dir: str=QA_BASE_DIR):
def __init__(self):
super().__init__()
self.qa_base_dir = qa_base_dir
self._unfinished_otdb_id_map = {}
def onObservationCompleting(self, otdb_id, modificationTime):
......@@ -117,13 +117,13 @@ class QAFilteredOTDBBusListener(BusListener):
del self._unfinished_otdb_id_map[otdb_id]
try:
# import here and not at top of file
# because on some systems h5py is not available
# and then only this method fails, which is less bad than the whole service failing.
from lofar.qa.hdf5_io import add_parset_to_hypercube
cmd = ['add_parset_to_hdf5', hdf5_file_path]
cmd = wrap_command_for_docker(cmd, 'adder', 'latest')
cmd = wrap_command_in_cep4_random_node_ssh_call(cmd, partition=SLURM_CPU_PARTITION, via_head=True)
with OTDBRPC.create(exchange=self.exchange, broker=self.broker) as otdbrpc:
add_parset_to_hypercube(hdf5_file_path, otdbrpc)
logger.info(' '.join(cmd))
if call(cmd) == 0:
self._copy_hdf5_to_nfs_dir(hdf5_file_path)
except Exception as e:
logger.warning("Cannot add parset with feedback for otdb=%s. error: %s", otdb_id, e)
else:
......@@ -161,7 +161,10 @@ class QAFilteredOTDBBusListener(BusListener):
# cluster it
self._cluster_h5_file(hdf5_file_path, otdb_id)
self._copy_hdf5_to_nfs_dir(hdf5_file_path)
plot_dir_path = self._create_plots_for_h5_file(hdf5_file_path, otdb_id)
plot_dir_path = self._move_plots_to_nfs_dir(plot_dir_path)
# and notify that we're finished
self._send_event_message('Finished', {'otdb_id': otdb_id,
......@@ -189,7 +192,7 @@ class QAFilteredOTDBBusListener(BusListener):
try:
# define default h5 filename use default cep4 qa output dir
h5_filename = 'L%s.MS_extract.h5' % otdb_id
h5_dir_path = os.path.join(self.qa_base_dir, 'ms_extract')
h5_dir_path = os.path.join(QA_LUSTRE_BASE_DIR, 'ms_extract')
hdf5_path = os.path.join(h5_dir_path, h5_filename)
cmd = ['ls', hdf5_path]
......@@ -234,7 +237,7 @@ class QAFilteredOTDBBusListener(BusListener):
'''
try:
#use default cep4 qa output dir.
plot_dir_path = os.path.join(self.qa_base_dir, 'inspectionplots')
plot_dir_path = os.path.join(QA_LUSTRE_BASE_DIR, 'plots')
task_plot_dir_path = ''
all_plots_succeeded = True
......@@ -261,11 +264,12 @@ class QAFilteredOTDBBusListener(BusListener):
logger.error(msg)
self._send_event_message('Error', {'otdb_id': otdb_id,
'message': msg})
if all_plots_succeeded:
self._send_event_message('CreatedInspectionPlots', {'otdb_id': otdb_id,
'hdf5_file_path': hdf5_path,
'plot_dir_path': task_plot_dir_path})
return task_plot_dir_path
self._send_event_message('CreatedInspectionPlots', {'otdb_id': otdb_id,
'hdf5_file_path': hdf5_path,
'plot_dir_path': task_plot_dir_path})
return task_plot_dir_path
except Exception as e:
logging.exception('error in _create_plots_for_h5_file: %s', e)
self._send_event_message('Error', {'otdb_id': otdb_id, 'message': str(e)})
......@@ -282,7 +286,7 @@ class QAFilteredOTDBBusListener(BusListener):
try:
# define default h5 filename use default cep4 qa output dir
h5_filename = 'L%s.MS_extract.h5' % otdb_id
h5_dir_path = os.path.join(self.qa_base_dir, 'ms_extract')
h5_dir_path = os.path.join(QA_LUSTRE_BASE_DIR, 'ms_extract')
hdf5_path = os.path.join(h5_dir_path, h5_filename)
cmd = ['ls', hdf5_path]
......@@ -319,6 +323,40 @@ class QAFilteredOTDBBusListener(BusListener):
self._send_event_message('Error', {'otdb_id': otdb_id, 'message': str(e)})
return None
def _copy_hdf5_to_nfs_dir(self, hdf5_path):
try:
hdf5_filename = os.path.basename(hdf5_path)
hdf5_nfs_path = os.path.join(QA_NFS_BASE_DIR, 'h5', hdf5_filename)
cmd = ['cp', hdf5_path, hdf5_nfs_path]
cmd = wrap_command_in_cep4_head_node_ssh_call(cmd)
logger.debug('copying h5 file to nfs dir: %s', ' '.join(cmd))
if call(cmd) == 0:
logger.info('copied h5 file to nfs dir: %s -> %s', hdf5_path, hdf5_nfs_path)
return hdf5_nfs_path
except Exception as e:
logging.exception('error in _copy_hdf5_to_nfs_dir: %s', e)
def _move_plots_to_nfs_dir(self, plot_dir_path):
try:
plot_dir_name = os.path.basename(plot_dir_path)
plot_nfs_path = os.path.join(QA_NFS_BASE_DIR, 'plots', plot_dir_name)
cmd = ['cp', '-rf', plot_dir_path, plot_nfs_path]
cmd = wrap_command_in_cep4_head_node_ssh_call(cmd)
logger.info('copying plots: %s', ' '.join(cmd))
if call(cmd) == 0:
logger.info('copied plots from %s to nfs dir: %s', plot_dir_path, plot_nfs_path)
cmd = ['rm', '-rf', plot_dir_path]
cmd = wrap_command_in_cep4_head_node_ssh_call(cmd)
logger.debug('removing plots: %s', ' '.join(cmd))
if call(cmd) == 0:
logger.info('removed plots from %s after they were copied to nfs dir %s', plot_dir_path, plot_nfs_path)
return plot_nfs_path
except Exception as e:
logging.exception('error in _copy_hdf5_to_nfs_dir: %s', e)
def _cluster_h5_file(self, hdf5_path, otdb_id=None):
'''
......@@ -363,22 +401,22 @@ class QAFilteredOTDBBusListener(BusListener):
logging.exception('error in _cluster_h5_file: %s', e)
self._send_event_message('Error', {'otdb_id': otdb_id, 'message': str(e)})
def __init__(self, qa_base_dir: str=QA_BASE_DIR, exchange: str = DEFAULT_BUSNAME, broker: str = DEFAULT_BROKER):
def __init__(self, exchange: str = DEFAULT_BUSNAME, broker: str = DEFAULT_BROKER):
super().__init__(handler_type=QAFilteredOTDBBusListener.QAFilteredOTDBEventMessageHandler,
handler_kwargs={'qa_base_dir': qa_base_dir},
handler_kwargs={},
exchange=exchange,
routing_key="%s.#" % (DEFAULT_FILTERED_OTDB_NOTIFICATION_SUBJECT,),
num_threads=1,
broker=broker)
class QAService:
def __init__(self, qa_base_dir: str=QA_BASE_DIR, exchange: str=DEFAULT_BUSNAME, broker: str=DEFAULT_BROKER):
def __init__(self, exchange: str=DEFAULT_BUSNAME, broker: str=DEFAULT_BROKER):
"""
:param exchange: valid message exchange address
:param broker: valid broker host (default: None, which means localhost)
"""
self.filtering_buslistener = QAFilteringOTDBBusListener(exchange = exchange, broker = broker)
self.filtered_buslistener = QAFilteredOTDBBusListener(qa_base_dir=qa_base_dir, exchange = exchange, broker = broker)
self.filtered_buslistener = QAFilteredOTDBBusListener(exchange = exchange, broker = broker)
def __enter__(self):
self.filtering_buslistener.start_listening()
......
......@@ -232,7 +232,7 @@ class TestQAService(unittest.TestCase):
self.wrap_command_for_docker_mock.side_effect = mocked_wrap_command_for_docker
# start the QAService (the object under test)
qaservice = QAService(exchange=self.tmp_exchange.address, qa_base_dir=self.TEST_DIR)
qaservice = QAService(exchange=self.tmp_exchange.address)
with qaservice, BusListenerJanitor(qaservice.filtering_buslistener), BusListenerJanitor(qaservice.filtered_buslistener):
# start listening for QA event messages from the QAService
......@@ -334,7 +334,7 @@ class TestQAService(unittest.TestCase):
self.wrap_command_for_docker_mock.side_effect = mocked_wrap_command_for_docker
# start the QAService (the object under test)
qaservice = QAService(exchange=self.tmp_exchange.address, qa_base_dir=self.TEST_DIR)
qaservice = QAService(exchange=self.tmp_exchange.address)
with qaservice, BusListenerJanitor(qaservice.filtering_buslistener), BusListenerJanitor(qaservice.filtered_buslistener):
# start listening for QA event messages from the QAService
......@@ -404,7 +404,7 @@ class TestQAService(unittest.TestCase):
self.wrap_command_for_docker_mock.side_effect = mocked_wrap_command_for_docker
# start the QAService (the object under test)
qaservice = QAService(exchange=self.tmp_exchange.address, qa_base_dir=self.TEST_DIR)
qaservice = QAService(exchange=self.tmp_exchange.address)
with qaservice, BusListenerJanitor(qaservice.filtering_buslistener), BusListenerJanitor(qaservice.filtered_buslistener):
# start listening for QA event messages from the QAService
......@@ -457,7 +457,7 @@ class TestQAService(unittest.TestCase):
self.wrap_command_in_cep4_node_ssh_call_mock.side_effect = mocked_wrap_command_in_cep4_node_ssh_call
# start the QAService (the object under test)
qaservice = QAService(exchange=self.tmp_exchange.address, qa_base_dir=self.TEST_DIR)
qaservice = QAService(exchange=self.tmp_exchange.address)
with qaservice, BusListenerJanitor(qaservice.filtering_buslistener), BusListenerJanitor(qaservice.filtered_buslistener):
# start listening for QA event messages from the QAService
......
......@@ -50,35 +50,46 @@ def autocleanup_all_finished_ingested_pipelines(do_submit_to_autocleanup: bool,
finished_pipelines = radbrpc.getTasks(task_type='pipeline', task_status='finished', otdb_ids=otdb_ids_with_data_on_disk)
finished_pipeline_otdb_ids = [pipeline['otdb_id'] for pipeline in finished_pipelines if 'otdb_id' in pipeline]
finished_pipeline_mom_ids = [pipeline['mom_id'] for pipeline in finished_pipelines if 'mom_id' in pipeline]
dps_dict = momrpc.getDataProducts(finished_pipeline_mom_ids)
mom_details_dict = momrpc.getObjectDetails(finished_pipeline_mom_ids)
for pipeline in sorted(finished_pipelines, key=lambda t: t['endtime']):
for pipeline in sorted(finished_pipelines, key=lambda t: t['endtime'], reverse=False):
otdb_id = pipeline['otdb_id']
logger.debug("checking if finished pipeline with otdb_id %d has been fully ingested...", otdb_id)
mom2id = pipeline.get('mom_id')
if mom2id is None:
continue
dps = dps_dict.get(mom2id)
dps = momrpc.getDataProducts(mom2id).get(mom2id)
#import pprint
#pprint.pprint(dps)
if dps is None or len(dps) <= 0:
logger.debug("could not find dataproducts for otdb_id=%d mom2id=%s to check if they are all ingested...", otdb_id, mom2id)
continue
ingested_dps = [dp for dp in dps if dp['status'] == 'ingested']
ingestable_dps = [dp for dp in dps if dp['status'] is not None and dp['fileformat'] != 'none']
ingested_dps = [dp for dp in ingestable_dps if dp['status'] == 'ingested']
not_ingested_dps = [dp for dp in ingestable_dps if dp['status'] != 'ingested']
#pprint.pprint(ingestable_dps)
#pprint.pprint(dps)
is_ingested = len(ingested_dps) == len(dps)
is_ingested = len(ingested_dps)>0 and len(ingested_dps) == len(ingestable_dps)
is_partially_ingested = len(ingested_dps) > 0 and len(ingested_dps) < len(ingestable_dps)
logger.debug("checking diskusage for finished %singested pipeline with otdb_id %d ...", ("fully " if is_ingested else "not-"), otdb_id)
#if not is_ingested:
#logger.info("finished pipeline with otdb_id %d was %singested. Not deleting anything for this pipeline and/or its predecessors.",
# otdb_id, "partially " if is_partially_ingested else "not ")
#continue
try:
logger.debug("finished pipeline with otdb_id %d was fully ingested. checking diskusage...", otdb_id)
du_result = sqrpc.getDiskUsageForOTDBId(otdb_id)
if du_result.get('needs_update'):
du_result = sqrpc.getDiskUsageForOTDBId(otdb_id, force_update=True)
if not du_result.get('found') or (du_result.get('disk_usage', 0) or 0) == 0:
logger.debug(du_result)
continue
mom_details = mom_details_dict.get(mom2id)
mom_details = momrpc.getObjectDetails(mom2id).get(mom2id)
except Exception as e:
logger.warning(e)
continue
......@@ -90,11 +101,12 @@ def autocleanup_all_finished_ingested_pipelines(do_submit_to_autocleanup: bool,
du_result.get('disk_usage_readable'), du_result.get('disk_usage'))
if is_ingested and do_submit_to_autocleanup:
tobus.send(EventMessage(subject="%s.TaskFinished" % INGEST_NOTIFICATION_PREFIX,
content={'type': 'MoM',
'otdb_id': otdb_id,
'message': 'resubmit of TaskFinished event for autocleanupservice'}))
msg = EventMessage(subject="%s.TaskFinished" % INGEST_NOTIFICATION_PREFIX,
content={'type': 'MoM',
'otdb_id': otdb_id,
'message': 'resubmit of TaskFinished event for autocleanupservice'})
logger.info("sending msg to %s: %s", tobus.exchange, msg)
tobus.send(msg)
def main():
from optparse import OptionParser
......@@ -118,4 +130,3 @@ def main():
if __name__ == '__main__':
main()
......@@ -12,6 +12,7 @@ from datetime import datetime
from optparse import OptionParser
from lofar.messaging import RPCService, ServiceMessageHandler
from lofar.messaging import EventMessage, ToBus, DEFAULT_BROKER, DEFAULT_BUSNAME
from lofar.messaging.rpc import RPCTimeoutException
from lofar.common.util import waitForInterrupt, humanreadablesize
from lofar.common.subprocess_utils import communicate_returning_strings
......@@ -241,14 +242,12 @@ class CleanupHandler(ServiceMessageHandler):
claims = radbrpc.getResourceClaims(task_ids=task['id'], resource_type='storage')
cep4_storage_claim_ids = [c['id'] for c in claims if c['resource_id'] == cep4_storage_resource['id']]
for claim_id in cep4_storage_claim_ids:
logger.info("ending the storage claim %s on resource %s %s for task radb_id=%s otdb_id=%s",
claim_id, cep4_storage_resource['id'], cep4_storage_resource['name'],
task['id'], task['otdb_id'])
radbrpc.deleteResourceClaim(claim_id)
logger.info("setting endtime for claim %s on resource %s %s to now", claim_id, cep4_storage_resource['id'], cep4_storage_resource['name'])
radbrpc.updateResourceClaim(claim_id, endtime=datetime.utcnow())
except Exception as e:
logger.error(str(e))
def _removePath(self, path, do_recurse=True):
def _removePath(self, path, do_recurse=False):
logger.info("Remove path: %s" % (path,))
# do various sanity checking to prevent accidental deletes
......@@ -289,7 +288,10 @@ class CleanupHandler(ServiceMessageHandler):
logger.warn(message)
return {'deleted': True, 'message': message, 'path': path}
du_result = self._sqrpc.getDiskUsageForPath(path) if do_recurse else {}
try:
du_result = self._sqrpc.getDiskUsageForPath(path) if do_recurse else {}
except RPCTimeoutException:
du_result = {}
if du_result.get('found'):
logger.info("Attempting to delete %s in %s", du_result.get('disk_usage_readable', '?B'), path)
......
......@@ -423,6 +423,10 @@ class CacheManager:
def onTaskDeleted(self, otdb_id, deleted, paths, message=''):
self._onDiskActivityForOTDBId(otdb_id)
with self._cacheLock:
if deleted and otdb_id != None and otdb_id in self._cache['otdb_id2path']:
del self._cache['otdb_id2path'][otdb_id]
def _onDiskActivityForOTDBId(self, otdb_id):
result = self.disk_usage.getDiskUsageForOTDBId(otdb_id)
self._updateCache(result)
......@@ -493,6 +497,15 @@ class CacheManager:
return task_du_result
# still no path(s) found for otdb_id, now try from cache and ignore possible scratch paths
if otdb_id != None:
with self._cacheLock:
path = self._cache['otdb_id2path'].get(otdb_id)
if path:
logger.info('Using path from cache for otdb_id %s %s (ignoring possible scratch/share paths)', otdb_id, path)
return self.getDiskUsageForPath(path, force_update=force_update)
return {'found': False, 'path': path_result['path']}
def getDiskUsageForTasks(self, radb_ids=None, mom_ids=None, otdb_ids=None, include_scratch_paths=True, force_update=False):
......
......@@ -40,7 +40,7 @@ def createService(exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, cache_manager
handler_kwargs={'cache_manager':cache_manager},
exchange=exchange,
broker=broker,
num_threads=6)
num_threads=12)
def main():
# make sure we run in UTC timezone
......
<!doctype html>
<!-- $Id$ -->
<!-- $Id: index.html 38233 2017-08-29 15:23:10Z schoenmakers $ -->
<html lang='en' ng-app="raeApp">
<head>
<meta http-equiv='Content-Type' content='text/html; charset=utf-8'/>
......@@ -104,6 +104,10 @@
<div style="float:left; min-width:50px; padding-top:30px; font-size:14px;">
<a href="/projects" target="_blank">Projects</a>
</div>
<div style="float:left; min-width:50px; padding-top:30px; padding-left:16px; padding-right:16px; font-size:14px;">|</div>
<div style="float:left; min-width:50px; padding-top:30px; font-size:14px;">
<a href="https://proxy.lofar.eu/qa" target="_blank">QA-plots</a>
</div>
</div>
<uib-progress style="float:left; width:100%; height:6px; margin:0px;"
...