Commit 29dc9f45 authored by Jorrit Schaap's avatar Jorrit Schaap

SW-857: manually merged in the changes for the qa_service from the...

SW-857: manually merged in the changes for the qa_service from the adder-branch via LOFAR-Release-4_0 to master
parents 81982444 778faf0e
......@@ -3,6 +3,8 @@
#
FROM lofar-base:${LOFAR_TAG}
RUN apt-get update && apt-get install -y git python python3 g++ make
#
# *******************
# Blitz
......@@ -14,8 +16,7 @@ FROM lofar-base:${LOFAR_TAG}
RUN apt-get update && apt-get install -y git python && \
mkdir /src && cd /src && git clone --branch=1.0.1 https://github.com/blitzpp/blitz.git && \
cd /src/blitz && ./configure && make install && \
rm -rf /src/blitz && \
apt-get purge git python
rm -rf /src/blitz
#
# *******************
......@@ -53,16 +54,14 @@ RUN export BUILD_PACKAGES="git cmake g++ swig3.0 python3-setuptools python3-dev
RUN apt-get update && apt-get install -y binutils liblog4cplus-1.1-9 libxml2 libboost-thread${BOOST_VERSION}.1 libboost-filesystem${BOOST_VERSION}.1 libboost-date-time${BOOST_VERSION}.1 libpng16-16 libsigc++-2.0-dev libxml++2.6-2v5 libboost-regex${BOOST_VERSION}.1
# Tell image build information
ENV LOFAR_BRANCH=${LOFAR_BRANCH} \
LOFAR_REVISION=${LOFAR_REVISION} \
LOFAR_BUILDVARIANT=gnucxx11_2018_optarch
ENV LOFAR_BRANCH=${LOFAR_VERSION} \
LOFAR_BUILDVARIANT=gnucxx11_opt
# Install
RUN apt-get update && apt-get install -y subversion cmake g++ gfortran bison flex autogen liblog4cplus-dev libhdf5-dev libboost-dev boost-python${BOOST_VERSION}-dev libxml2-dev pkg-config libpng-dev libfftw3-dev libunittest++-dev libxml++2.6-dev libboost-filesystem${BOOST_VERSION}-dev libboost-date-time${BOOST_VERSION}-dev libboost-thread${BOOST_VERSION}-dev libboost-regex${BOOST_VERSION}-dev binutils-dev libopenblas-dev libcfitsio-dev wcslib-dev && \
RUN apt-get update && apt-get install -y git cmake g++ gfortran bison flex autogen liblog4cplus-dev libhdf5-dev libboost-dev boost-python${BOOST_VERSION}-dev libxml2-dev pkg-config libpng-dev libfftw3-dev libunittest++-dev libxml++2.6-dev libboost-filesystem${BOOST_VERSION}-dev libboost-date-time${BOOST_VERSION}-dev libboost-thread${BOOST_VERSION}-dev libboost-regex${BOOST_VERSION}-dev binutils-dev libopenblas-dev libcfitsio-dev wcslib-dev libcap2-bin && \
mkdir -p ${INSTALLDIR}/lofar/build/${LOFAR_BUILDVARIANT} && \
cd ${INSTALLDIR}/lofar && \
svn --non-interactive -q co -r ${LOFAR_REVISION} -N ${LOFAR_BRANCH_URL} src; \
svn --non-interactive -q up src/CMake && \
cd ${INSTALLDIR}/lofar && git clone https://git.astron.nl/ro/lofar.git ${INSTALLDIR}/lofar/src && \
cd ${INSTALLDIR}/lofar/src && git checkout ${LOFAR_VERSION} && \
cd ${INSTALLDIR}/lofar/build/${LOFAR_BUILDVARIANT} && cmake -DBUILD_PACKAGES=Online_OutputProc -DBUILD_TESTING=OFF -DCMAKE_INSTALL_PREFIX=${INSTALLDIR}/lofar/ -DCASACORE_ROOT_DIR=${INSTALLDIR}/casacore/ -DQPID_ROOT_DIR=/opt/qpid/ -DDAL_ROOT_DIR=${INSTALLDIR}/DAL -DUSE_OPENMP=True ${INSTALLDIR}/lofar/src/ && \
cd ${INSTALLDIR}/lofar/build/${LOFAR_BUILDVARIANT} && sed -i '29,31d' include/ApplCommon/PosixTime.h && \
cd ${INSTALLDIR}/lofar/build/${LOFAR_BUILDVARIANT} && make -j ${J} && \
......
This diff is collapsed.
......@@ -11,7 +11,7 @@ except ImportError:
logger = logging.getLogger()
class SubprocessTimoutError(RuntimeError):
class SubprocessTimoutError(TimeoutError):
'''an error class indication that the running subprocess to longer than expected to complete'''
pass
......@@ -49,10 +49,12 @@ def check_output_returning_strings(*popenargs, timeout=None, **kwargs):
return output.decode('UTF-8')
return output
def execute_in_parallel(cmd_lists, timeout=3600, max_parallel=32):
def execute_in_parallel(cmd_lists, gather_stdout_stderr: bool=True, timeout=3600, max_parallel=32):
"""
Execute all commands in the cmd_lists in parallel, limited to max_parallel concurrent processes.
:param list cmd_lists: a list of subprocess-cmd-list's
:param bool gather_stdout_stderr: when True, then do gather the output from stdout and stderr,
else stdout and stderr are 'mixed in' this program's stdout and stderr.
:param int timeout: time out after this many seconds
:param int max_parallel: maximum number of concurrent executed commands.
:raises a SubprocessTimoutError if any of the commands time out
......@@ -73,7 +75,10 @@ def execute_in_parallel(cmd_lists, timeout=3600, max_parallel=32):
# not limited by the number of parallel procs (anymore)
# so, start a new proc
logger.info('Executing %s', ' '.join(cmd_list))
proc = Popen(cmd_list, stdout=PIPE, stderr=PIPE)
if gather_stdout_stderr:
proc = Popen(cmd_list, stdout=PIPE, stderr=PIPE)
else:
proc = Popen(cmd_list, bufsize=-1)
procs.append(proc)
sleep(0.01)
......@@ -92,13 +97,16 @@ def execute_in_parallel(cmd_lists, timeout=3600, max_parallel=32):
PopenResult = namedtuple('PopenResult', ['returncode', 'stdout', 'stderr'])
results = [PopenResult(p.returncode, p.stdout.read(), p.stderr.read()) for p in procs]
results = [PopenResult(p.returncode,
p.stdout.read().decode('utf-8').strip() if gather_stdout_stderr else None,
p.stderr.read().decode('utf-8').strip() if gather_stdout_stderr else None)
for p in procs]
# log results of commands
for cmd_list, result in zip(cmd_lists, results):
logger.info("Results for cmd: %s\n returncode=%s\n stdout=%s\n stderr=%s",
logger.debug("Results for cmd: %s\n returncode=%s\n stdout=%s\n stderr=%s",
" ".join(cmd_list),
result.returncode, result.stdout.rstrip(), result.stderr.rstrip())
result.returncode, result.stdout, result.stderr)
return results
class PipeReader:
......
......@@ -29,7 +29,8 @@ logger = logging.getLogger(__name__)
@integration_test
class TestCep4Utils(unittest.TestCase):
def setUpClass(self):
@classmethod
def setUpClass(cls):
try:
cep4_true_cmd = wrap_command_in_cep4_head_node_ssh_call(['true'])
if call(cep4_true_cmd) == 0:
......@@ -46,27 +47,27 @@ class TestCep4Utils(unittest.TestCase):
self.assertEqual(0, call(cmd))
def test_02_get_cep4_available_cpu_nodes(self):
node_nrs = get_cep4_available_cpu_nodes()
node_nrs = get_cep4_available_nodes()
self.assertTrue(isinstance(node_nrs, list))
self.assertTrue(len(node_nrs) > 0)
def test_03_wrap_command_in_cep4_random_cpu_node_ssh_call(self):
def test_03_wrap_command_in_cep4_random_node_ssh_call(self):
'''
this test calls and tests the functionality of the following methods via
wrap_command_in_cep4_random_cpu_node_ssh_call: get_cep4_available_cpu_nodes, wrap_command_in_cep4_cpu_node_ssh_call
wrap_command_in_cep4_random_node_ssh_call: get_cep4_available_nodes, _wrap_command_in_cep4_node_ssh_call
'''
cmd = wrap_command_in_cep4_random_cpu_node_ssh_call(['true'], via_head=True)
cmd = wrap_command_in_cep4_random_node_ssh_call(['true'], via_head=True)
logger.info('executing command: %s', ' '.join(cmd))
self.assertEqual(0, call(cmd))
def test_04_wrap_command_in_cep4_available_cpu_node_with_lowest_load_ssh_call(self):
def test_04_wrap_command_in_cep4_available_node_with_lowest_load_ssh_call(self):
'''
this test calls and tests the functionality of the following methods via
wrap_command_in_cep4_random_cpu_node_ssh_call:
get_cep4_available_cpu_nodes, get_cep4_cpu_nodes_loads,
get_cep4_available_cpu_nodes_sorted_ascending_by_load, wrap_command_in_cep4_cpu_node_ssh_call
wrap_command_in_cep4_random_node_ssh_call:
get_cep4_available_nodes, get_cep4_nodes_loads,
get_cep4_available_cpu_nodes_sorted_ascending_by_load, _wrap_command_in_cep4_node_ssh_call
'''
cmd = wrap_command_in_cep4_available_cpu_node_with_lowest_load_ssh_call(['true'], via_head=True)
cmd = wrap_command_in_cep4_available_node_with_lowest_load_ssh_call(['true'], via_head=True)
logger.info('executing command: %s', ' '.join(cmd))
self.assertEqual(0, call(cmd))
......
......@@ -195,7 +195,8 @@ class MoMClient(BaseMoMClient):
mom_id = archive_id - 1000000 # stupid mom one million archive_id offset
# logger.info('%s: GetSip call: %s %s', log_prefix, self.__momURLgetSIP, data)
response = self.session.get(self.__momURLgetSIP, params={"command" : "GETSIP", "id" : mom_id})
params={"command" : "GETSIP", "id" : mom_id}
response = self.session.get(self.__momURLgetSIP, params=params)
result = response.text
if 'DOCTYPE HTML PUBLIC' in result:
......
......@@ -16,11 +16,11 @@ from lofar.common.lcu_utils import translate_user_station_string_into_station_li
def restart_tbb_recording(stations):
logging.info("Restarting TBB recording")
stations = translate_user_station_string_into_station_list(stations)
station_hostname_csv_string = ','.join(stationname2hostname(s) for s in stations)
logging.info("Restarting TBB recording on stations %s", stations)
relay = lcurun_command + [station_hostname_csv_string]
cmd = relay + [tbb_command, "--record"]
logging.info("Executing %s" % " ".join(cmd))
......@@ -35,3 +35,7 @@ def parse_args():
def main():
args = parse_args()
restart_tbb_recording(args.stations)
if __name__ == '__main__':
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
main()
\ No newline at end of file
......@@ -17,7 +17,7 @@
# $Id$
lofar_add_bin_scripts(qa_service qa_webservice)
lofar_add_bin_scripts(qa_service qa_webservice qa_enqueue_task)
# supervisord config files
install(FILES
......
#!/usr/bin/env python3
# Copyright (C) 2012-2015 ASTRON (Netherlands Institute for Radio Astronomy)
# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
import logging
logger = logging.getLogger(__name__)
from lofar.messaging import ToBus, EventMessage, DEFAULT_BROKER, adaptNameToEnvironment
from optparse import OptionParser, OptionGroup
from datetime import datetime
if __name__ == '__main__':
# Check the invocation arguments
parser = OptionParser("%prog <otdb_id>",description='enqueue a conversion from MS to hdf5, cluster the baselines, and create the default plots in the qa_service.')
group = OptionGroup(parser, 'Messaging options')
group.add_option('-b', '--broker', dest='broker', type='string', default=DEFAULT_BROKER,
help='Address of the messaging broker, default: %default')
group.add_option('-q', "--queue", dest="queue", type="string",
default=adaptNameToEnvironment('lofar.queue.for.qa_service.QAService.on.OTDB.notification.TaskStatus'),
help="queue where the QAService listens for the notification messages. [default: %default]")
group.add_option('-p', '--priority', dest="priority", type="int", default=4,
help="priority of the enqueued task. (low=0, normal=4, high=9) [default: %default]")
parser.add_option_group(group)
(options, args) = parser.parse_args()
if len(args) != 1:
print(parser.print_help())
exit(-1)
otdb_id = int(args[0])
#config logging
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
with ToBus(exchange='', broker=options.broker) as tobus:
for status in ['completing', 'finished']:
content = {"treeID": otdb_id, "state": status, "time_of_change": datetime.utcnow() }
msg = EventMessage(subject=options.queue, content=content, priority=options.priority)
logging.info("sending: %s", msg)
tobus.send(msg)
......@@ -6,4 +6,3 @@ stopasgroup=true ; bash does not propagate signals
stdout_logfile=%(program_name)s.log
redirect_stderr=true
stderr_logfile=NONE
stdout_logfile_maxbytes=0
......@@ -18,11 +18,13 @@
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
from lofar.common.cep4_utils import *
from lofar.common import isProductionEnvironment, isTestEnvironment
from subprocess import call
import socket
import logging
import signal
from lofar.common import isProductionEnvironment, isTestEnvironment
logger = logging.getLogger(__name__)
def kill_zombies():
......
......@@ -8,4 +8,3 @@ stopasgroup=true ; bash does not propagate signals
stdout_logfile=%(program_name)s.log
redirect_stderr=true
stderr_logfile=NONE
stdout_logfile_maxbytes=0
......@@ -51,6 +51,8 @@ class QAEventMessageHandler(AbstractMessageHandler):
if stripped_subject == 'ConvertedMS2Hdf5':
self.onConvertedMS2Hdf5(msg.content)
elif stripped_subject == 'ConvertedBF2Hdf5':
self.onConvertedBF2Hdf5(msg.content)
elif stripped_subject == 'CreatedInspectionPlots':
self.onCreatedInspectionPlots(msg.content)
elif stripped_subject == 'Clustered':
......@@ -65,6 +67,9 @@ class QAEventMessageHandler(AbstractMessageHandler):
def onConvertedMS2Hdf5(self, msg_content):
logger.info("%s.onConvertedMS2Hdf5(%s)", self.__class__.__name__, single_line_with_single_spaces(msg_content))
def onConvertedBF2Hdf5(self, msg_content):
logger.info("%s.onConvertedBF2Hdf5(%s)", self.__class__.__name__, single_line_with_single_spaces(msg_content))
def onClustered(self, msg_content):
logger.info("%s.onClustered(%s)", self.__class__.__name__, single_line_with_single_spaces(msg_content))
......
This diff is collapsed.
This diff is collapsed.
......@@ -58,7 +58,7 @@ namespace LOFAR
static void pwrite(int fd, const void *ptr, size_t size, off64_t offset)
{
LOG_INFO_STR("TBB pwrite fd=" << fd << \
LOG_DEBUG_STR("TBB pwrite fd=" << fd << \
" ptr=" << ptr << \
" size=" << size << \
" offset=" << offset);
......@@ -657,7 +657,7 @@ namespace LOFAR
+ " crc32: " + boost::lexical_cast<string>(crc32));
}
LOG_INFO_STR("TBB: offset in samples calculated for the HDF5 data set "
LOG_DEBUG_STR("TBB: offset in samples calculated for the HDF5 data set "
"= "
<< offset
<< " for sub-band #"
......
......@@ -56,7 +56,7 @@ namespace LOFAR
{
//TODO: implement like https://github.com/liamconnor/tbb-tools/blob/master/read_tbb_data.py#L157
//for now, just return 0 to test the signal path
LOG_INFO("TBB_Header::getFirstBandSelNr returning band=0 for testing");
LOG_DEBUG("TBB_Header::getFirstBandSelNr returning band=0 for testing");
return 0;
// 64 bit scans would be ~8x faster, but require fixes for band order (and endian)
......
......@@ -83,7 +83,7 @@ namespace LOFAR
void TBB_Station::processPayload(const TBB_Frame& frame)
{
LOG_INFO_STR("TBB_Station::processPayload for station "
LOG_DEBUG_STR("TBB_Station::processPayload for station "
<< static_cast< uint32_t >(frame.header.stationID));
// Guard against bogus incoming rsp/rcu IDs with at().
......@@ -91,7 +91,7 @@ namespace LOFAR
// Each dipole stream is sent to a single port (thread), so no need to grab a mutex here to avoid double init.
if (!dipole.isInitialized()) {
LOG_INFO_STR("TBB_Station::processPayload for station "
LOG_DEBUG_STR("TBB_Station::processPayload for station "
<< static_cast< uint32_t >(frame.header.stationID)
<< " dipole init");
// Do pass a ref to the h5 mutex for when writing into the HDF5 file.
......@@ -103,7 +103,7 @@ namespace LOFAR
if (doTransient()) {
dipole.processTransientFrameData(frame);
} else { // spectral mode
LOG_INFO_STR("TBB_Station::processPayload in spectral mode for station "
LOG_DEBUG_STR("TBB_Station::processPayload in spectral mode for station "
<< static_cast< uint32_t >(frame.header.stationID));
dipole.processSpectralFrameData(frame);
......
......@@ -246,7 +246,7 @@ namespace LOFAR
LOG_WARN(itsLogPrefix + exc.text());
return;
}
LOG_INFO(itsLogPrefix + "reading incoming data from " + itsInputStreamName);
LOG_DEBUG(itsLogPrefix + "reading incoming data from " + itsInputStreamName);
const unsigned expectedFrameSize = doTransient() ? TBB_Frame::transientFrameSize : TBB_Frame::spectralFrameSize;
while (true) {
......@@ -313,7 +313,7 @@ namespace LOFAR
break;
}
LOG_INFO_STR("mainOutputLoop: we have a frame: " << frame->header.to_string());
LOG_DEBUG_STR("mainOutputLoop: we have a frame: " << frame->header.to_string());
#ifdef TBB_PRINT_QUEUE_LEN
LOG_INFO(itsLogPrefix + "recvqsz=" + boost::lexical_cast<string>(itsReceiveQueue.size()));
......
......@@ -110,7 +110,7 @@ namespace LOFAR
ScopedLock sl(itsStationsMutex); // protect against insert below
map<unsigned, SmartPtr<TBB_Station> >::iterator stIt(itsStations.find(header.stationID));
if (stIt != itsStations.end()) {
LOG_INFO_STR("TBB_Writer::getStation: found known station for id "
LOG_DEBUG_STR("TBB_Writer::getStation: found known station for id "
<< static_cast< uint32_t >(header.stationID));
return stIt->second.get(); // common case
......@@ -130,7 +130,7 @@ namespace LOFAR
itsAllSubbandCentralFreqs, stMetaData, h5Filename, subbandSize);
}
LOG_INFO_STR("TBB_Writer::getStation: returning new station for id "
LOG_DEBUG_STR("TBB_Writer::getStation: returning new station for id "
<< static_cast< uint32_t >(header.stationID));
return itsStations.insert(make_pair(header.stationID, station)).first->second.get();
}
......
......@@ -241,8 +241,10 @@ class CleanupHandler(ServiceMessageHandler):
claims = radbrpc.getResourceClaims(task_ids=task['id'], resource_type='storage')
cep4_storage_claim_ids = [c['id'] for c in claims if c['resource_id'] == cep4_storage_resource['id']]
for claim_id in cep4_storage_claim_ids:
logger.info("setting endtime for claim %s on resource %s %s to now", claim_id, cep4_storage_resource['id'], cep4_storage_resource['name'])
radbrpc.updateResourceClaim(claim_id, endtime=datetime.utcnow())
logger.info("ending the storage claim %s on resource %s %s for task radb_id=%s otdb_id=%s",
claim_id, cep4_storage_resource['id'], cep4_storage_resource['name'],
task['id'], task['otdb_id'])
radbrpc.deleteResourceClaim(claim_id)
except Exception as e:
logger.error(str(e))
......
......@@ -2898,7 +2898,105 @@ class ResourceAssignmentDatabaseTest(RADBCommonTestMixin, unittest.TestCase):
self.assertEqual(capacities[resource_id], resource_with_claimable_capacity['claimable_capacity'])
def test_bugfix_SW_833_removed_finished_claimes_and_usages(self):
'''
See: https://support.astron.nl/jira/browse/SW-833
'''
now = datetime.utcnow()
now -= timedelta(minutes=now.minute, seconds=now.second, microseconds=now.microsecond) # round to full hour
# we should start with a clean usage table
usages = self.radb.executeQuery("SELECT * from resource_allocation.resource_usage;", fetch=FETCH_ALL)
usage_deltas = self.radb.executeQuery("SELECT * from resource_allocation.resource_usage_delta;", fetch=FETCH_ALL)
self.assertEqual(0, len(usages))
self.assertEqual(0, len(usage_deltas))
task_id = self.radb.insertOrUpdateSpecificationAndTask(mom_id=0, otdb_id=0, task_status='approved', task_type='observation',
starttime=now+timedelta(hours=1), endtime=now+timedelta(hours=2),
content="", cluster="CEP4")['task_id']
task = self.radb.getTask(task_id)
self.radb.insertResourceClaim(resource_id=117, task_id=task_id,
starttime=task['starttime'], endtime=task['endtime'], claim_size=100,
username="", user_id=0)
self.radb.insertResourceClaim(resource_id=118, task_id=task_id,
starttime=task['starttime'], endtime=task['endtime']+timedelta(days=1), claim_size=100,
username="", user_id=0)
claims = self.radb.getResourceClaims(task_ids=task_id)
self.assertEqual(2, len(claims))
for claim in claims:
self.assertEqual('tentative', claim['status'])
self.assertEqual(0, self.radb.get_resource_usage_at_or_before(claim['resource_id'], claim['starttime'] - timedelta(minutes=10), claim['status'])['usage'])
self.assertEqual(100, self.radb.get_resource_usage_at_or_before(claim['resource_id'], claim['starttime'], claim['status'])['usage'])
self.assertEqual(100, self.radb.get_resource_usage_at_or_before(claim['resource_id'], claim['endtime'] - timedelta(seconds=1), claim['status'])['usage'])
self.assertEqual(0, self.radb.get_resource_usage_at_or_before(claim['resource_id'], claim['endtime'] + timedelta(minutes=10), claim['status'])['usage'])
# resource usages tables should be filled
usages = self.radb.executeQuery("SELECT * from resource_allocation.resource_usage;", fetch=FETCH_ALL)
usage_deltas = self.radb.executeQuery("SELECT * from resource_allocation.resource_usage_delta;", fetch=FETCH_ALL)
self.assertGreater(len(usages), 0)
self.assertGreater(len(usage_deltas), 0)
self.radb.updateTaskAndResourceClaims(task_id, claim_status='claimed', task_status='prescheduled')
self.radb.updateTaskAndResourceClaims(task_id, task_status='scheduled')
# check claims
# should be claimed and 'using' the resources in the claim's time windows
claims = self.radb.getResourceClaims(task_ids=task_id)
self.assertEqual(2, len(claims))
for claim in claims:
self.assertEqual('claimed', claim['status'])
self.assertEqual(0, self.radb.get_resource_usage_at_or_before(claim['resource_id'], claim['starttime'] - timedelta(minutes=10), claim['status'])['usage'])
self.assertEqual(100, self.radb.get_resource_usage_at_or_before(claim['resource_id'], claim['starttime'], claim['status'])['usage'])
self.assertEqual(100, self.radb.get_resource_usage_at_or_before(claim['resource_id'], claim['endtime'] - timedelta(seconds=1), claim['status'])['usage'])
self.assertEqual(0, self.radb.get_resource_usage_at_or_before(claim['resource_id'], claim['endtime'] + timedelta(minutes=10), claim['status'])['usage'])
usages = self.radb.getResourceUsages()
# let the task 'run'...
self.radb.updateTaskAndResourceClaims(task_id, task_status='queued')
self.radb.updateTaskAndResourceClaims(task_id, task_status='active')
self.radb.updateTaskAndResourceClaims(task_id, task_status='completing')
# check claims
# should still be claimed and 'using' the resources
claims = self.radb.getResourceClaims(task_ids=task_id)
self.assertEqual(2, len(claims))
for claim in claims:
self.assertEqual('claimed', claim['status'])
self.assertEqual(0, self.radb.get_resource_usage_at_or_before(claim['resource_id'], claim['starttime'] - timedelta(minutes=10), claim['status'])['usage'])
self.assertEqual(100, self.radb.get_resource_usage_at_or_before(claim['resource_id'], claim['starttime'], claim['status'])['usage'])
self.assertEqual(100, self.radb.get_resource_usage_at_or_before(claim['resource_id'], claim['endtime'] - timedelta(seconds=1), claim['status'])['usage'])
self.assertEqual(0, self.radb.get_resource_usage_at_or_before(claim['resource_id'], claim['endtime'] + timedelta(minutes=10), claim['status'])['usage'])
# allow the task to finish...
# this should result in a trigger removing finished claims as well
self.radb.updateTaskAndResourceClaims(task_id, task_status='finished')
# check claims
# one claim should be gone, the other ends in the future and should still be there
claims = self.radb.getResourceClaims(task_ids=task_id)
self.assertEqual(1, len(claims))
claim = claims[0]
self.assertEqual('claimed', claim['status'])
self.assertGreater(claim['endtime'], task['endtime'])
self.assertEqual(0, self.radb.get_resource_usage_at_or_before(claim['resource_id'], claim['starttime'] - timedelta(minutes=10), claim['status'])['usage'])
self.assertEqual(100, self.radb.get_resource_usage_at_or_before(claim['resource_id'], claim['starttime'], claim['status'])['usage'])
self.assertEqual(100, self.radb.get_resource_usage_at_or_before(claim['resource_id'], claim['endtime'] - timedelta(seconds=1), claim['status'])['usage'])
self.assertEqual(0, self.radb.get_resource_usage_at_or_before(claim['resource_id'], claim['endtime'] + timedelta(minutes=10), claim['status'])['usage'])
# later, after data was ingested and cleaned-up, the storage-claim is deleted
# this should result in a trigger removing the resource_usages as well
self.radb.deleteResourceClaim(claim['id'])
claims = self.radb.getResourceClaims(task_ids=task_id)
self.assertEqual(0, len(claims))
# in the end we should have a clean usage table again
usages = self.radb.executeQuery("SELECT * from resource_allocation.resource_usage;", fetch=FETCH_ALL)
usage_deltas = self.radb.executeQuery("SELECT * from resource_allocation.resource_usage_delta;", fetch=FETCH_ALL)
self.assertEqual(0, len(usages))
self.assertEqual(0, len(usage_deltas))
os.environ['TZ'] = 'UTC'
logging.basicConfig(format='%(asctime)s %(levelname)s %(process)s %(threadName)s %(message)s', level=logging.DEBUG)
......
......@@ -479,6 +479,9 @@ def putTask(task_id):
# ...then, handle status update which might trigger resource assignment,
# for which the above updated times are needed
if 'status' in updatedTask:
if isProductionEnvironment() and task['type'] == 'observation' and updatedTask['status'] == 'prescheduled':
abort(403, 'Scheduling of observations via the webscheduler by users is not (yet) allowed')
try:
#update status in otdb only
#the status change will propagate automatically into radb via other services (by design)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment