diff --git a/CMake/LofarMacros.cmake b/CMake/LofarMacros.cmake index 7995cc094c79f6ee0607e7df796869b1224f2767..9b6fd9db69b2fe8ef90d36ab54312855e77d2d7d 100644 --- a/CMake/LofarMacros.cmake +++ b/CMake/LofarMacros.cmake @@ -178,22 +178,40 @@ if(NOT DEFINED LOFAR_MACROS_INCLUDED) # -------------------------------------------------------------------------- - # lofar_add_sysconf_files([name1 [name2 ..]]) + # lofar_add_sysconf_files([name1 [name2 ..]] [DESTINATION subdir]) # # Add system configuration files (read-only single machine data) that need # to be installed into the <prefix>/etc directory. Also create a symbolic # link in <build-dir>/etc to each of these files. The file names may contain # a relative(!) path. + # + # The mentioned files are installed in the same relative path as provided, + # that is: + # lofar_add_sysconf_files(foo/bar) + # installs "etc/foo/bar". To override this behaviour use: + # lofar_add_sysconf_files(foo/bar DESTINATION .) + # installs "etc/bar". # -------------------------------------------------------------------------- macro(lofar_add_sysconf_files) - foreach(_name ${ARGN}) - get_filename_component(_path ${_name} PATH) - get_filename_component(_abs_name ${_name} ABSOLUTE) - file(MAKE_DIRECTORY ${CMAKE_BINARY_DIR}/etc/${_path}) + string(REGEX REPLACE ";?DESTINATION.*" "" _src_names "${ARGN}") + string(REGEX MATCH "DESTINATION;.*" _destination "${ARGN}") + string(REGEX REPLACE "^DESTINATION;" "" _destination "${_destination}") + foreach(_src_name ${_src_names}) + if(_destination MATCHES ".+") + get_filename_component(_src_filename ${_src_name} NAME) + set(_dest_name ${_destination}/${_src_filename}) + else(_destination MATCHES ".+") + set(_dest_name ${_src_name}) + endif(_destination MATCHES ".+") + + get_filename_component(_abs_name ${_src_name} ABSOLUTE) + get_filename_component(_dest_path ${_dest_name} PATH) + + file(MAKE_DIRECTORY ${CMAKE_BINARY_DIR}/etc/${_dest_path}) execute_process(COMMAND ${CMAKE_COMMAND} -E create_symlink - ${_abs_name} ${CMAKE_BINARY_DIR}/etc/${_name}) - install(FILES ${_name} DESTINATION etc/${_path}) - endforeach(_name ${ARGN}) + ${_abs_name} ${CMAKE_BINARY_DIR}/etc/${_dest_name}) + install(FILES ${_src_name} DESTINATION etc/${_dest_path}) + endforeach(_src_name ${_src_names}) endmacro(lofar_add_sysconf_files) diff --git a/LCU/PPSTune/CMakeLists.txt b/LCU/PPSTune/CMakeLists.txt index e70eac3a4392d43a2c914667d75ff52bdc246b86..0337b6dccebed187584b034908b3db8d5f158464 100644 --- a/LCU/PPSTune/CMakeLists.txt +++ b/LCU/PPSTune/CMakeLists.txt @@ -5,9 +5,12 @@ lofar_package(PPSTune 1.0) add_subdirectory(ppstune) # Install files matching regex pattern in current directory and below +# Omit directories test and testdata from install + install(DIRECTORY . DESTINATION sbin USE_SOURCE_PERMISSIONS FILES_MATCHING REGEX "\\.(py|sh)$" - PATTERN ".svn" EXCLUDE) + PATTERN ".svn" EXCLUDE + PATTERN "test*" EXCLUDE) diff --git a/LTA/LTAIngest/LTAIngestClient/lib/ingestbuslistener.py b/LTA/LTAIngest/LTAIngestClient/lib/ingestbuslistener.py index a5244e59071d08451656168ca89a0d2305ed8af4..706c025c762c7b0d2096367920b6ded4105b99fa 100644 --- a/LTA/LTAIngest/LTAIngestClient/lib/ingestbuslistener.py +++ b/LTA/LTAIngest/LTAIngestClient/lib/ingestbuslistener.py @@ -114,14 +114,19 @@ class IngestBusListener(AbstractBusListener): :param task_dict: dictionary with the finished task''' pass - def _logJobNotification(self, status, job_dict, level=logging.INFO): - msg = 'job %s: %s export_id: %s type: %s server: %s'% (status, - job_dict.get('project'), - job_dict.get('export_id'), - job_dict.get('type'), - job_dict.get('ingest_server')) try: + msg = 'job ' + + if status == 'progress': + msg += 'is transferring. ' + else: + msg += 'status changed to %s. ' % (status,) + + msg += 'project: %s export_id: %s type: %s server: %s'% (job_dict.get('project'), + job_dict.get('export_id'), + job_dict.get('type'), + job_dict.get('ingest_server')) if job_dict.get('archive_id'): msg += ' archive_id: %s' % job_dict.get('archive_id') @@ -142,11 +147,11 @@ class IngestBusListener(AbstractBusListener): if job_dict.get('message'): msg += ' message: %s' % job_dict.get('message') + + logger.log(level, msg) except Exception as e: logger.error(e) - logger.log(level, msg) - class JobsMonitor(IngestBusListener): def __init__(self, busname=DEFAULT_INGEST_NOTIFICATION_BUSNAME, subjects=DEFAULT_INGEST_NOTIFICATION_SUBJECTS, broker=DEFAULT_BROKER, listen_for_all_jobs=True, **kwargs): diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestjobmanagementserver.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestjobmanagementserver.py index 1e1e831971da8f9df34dd096d064674da6811257..21882c173043e9dc0b4dfce6252bde515fa038e1 100644 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestjobmanagementserver.py +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestjobmanagementserver.py @@ -31,6 +31,9 @@ from lofar.messaging import CommandMessage, EventMessage, FromBus, ToBus from lofar.messaging import Service from lofar.messaging.Service import MessageHandlerInterface from lofar.common.util import convertIntKeysToString +from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC +from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME +from lofar.common import isProductionEnvironment import os import os.path @@ -65,7 +68,10 @@ class IngestJobManager: jobs_for_transfer_queue_name=DEFAULT_INGEST_JOBS_FOR_TRANSER_QUEUENAME, jobs_dir=JOBS_DIR, max_num_retries=MAX_NR_OF_RETRIES, + mom_busname=DEFAULT_MOMQUERY_BUSNAME, + mom_servicename=DEFAULT_MOMQUERY_SERVICENAME, broker=None, + mom_broker=None, **kwargs): self.__notification_listener = IngestBusListener(busname=notification_listen_queue_name, subjects=notification_prefix+'*', broker=broker) @@ -83,6 +89,9 @@ class IngestJobManager: self.notification_prefix = notification_prefix self.event_bus = ToBus(notification_busname, broker=broker) + if not mom_broker: + mom_broker = broker + self.__momrpc = MoMQueryRPC(servicename=mom_servicename, busname=mom_busname, broker=mom_broker, timeout=180) self.service = Service(servicename, IngestServiceMessageHandler, @@ -1004,6 +1013,23 @@ Total Files: %(total)i submitters = [j['Submitter'] for j in done_group_jobs if 'Submitter' in j] extra_mail_addresses = [j['email'] for j in done_group_jobs if 'email' in j] + try: + if len(unfinished_group_jads) == 0: + # only for successful ingests, + # try to get the PI's email address for this export's projects + # and add these to the extra_mail_addresses + done_group_mom_jobs = [job for job in done_group_jobs if job.get('type','').lower() == 'mom'] + mom_export_ids = set([job['JobId'].split('_')[1] for job in done_group_mom_jobs if 'JobId' in job]) + project_mom2ids = set([self.__momrpc.getObjectDetails(mom_export_id).get('project_mom2id') for mom_export_id in mom_export_ids]) + project_mom2ids = [x for x in project_mom2ids if x is not None] + + for project_mom2id in project_mom2ids: + project_details = self.__momrpc.get_project_details(project_mom2id) + if project_details and 'pi_email' in project_details: + extra_mail_addresses.append(project_details['pi_email']) + except Exception as e: + logger.error('error while trying to get PI\'s email address for %s: %s', job_group_id, e) + #submitters might contain comma seperated strings #join all sumbitterstrings in one long csv string, split it, and get the unique submitters submitters = list(set([s.strip() for s in ','.join(submitters).split(',') if '@' in s])) @@ -1081,6 +1107,15 @@ def main(): type='string', default=DEFAULT_INGEST_JOBS_FOR_TRANSER_QUEUENAME, help='name of the managed job queue (which jobs are handled by the ingestservices), default: %default') + parser.add_option('--mom_query_busname', dest='mom_query_busname', type='string', + default=DEFAULT_MOMQUERY_BUSNAME, + help='Name of the bus exchange on the qpid broker on which the momqueryservice listens, default: %default') + parser.add_option('--mom_query_servicename', dest='mom_query_servicename', type='string', + default=DEFAULT_MOMQUERY_SERVICENAME, + help='Name of the momqueryservice, default: %default') + parser.add_option('--mom_query_service_broker', dest='mom_query_service_broker', type='string', + default='scu001.control.lofar' if isProductionEnvironment() else 'scu199.control.lofar', + help='Address of the qpid broker where the mom query service runs, default: %default') (options, args) = parser.parse_args() setQpidLogLevel(logging.INFO) @@ -1100,7 +1135,10 @@ def main(): jobs_for_transfer_queue_name=options.jobs_for_transfer_queue_name, jobs_dir=options.jobs_dir, max_num_retries=options.max_num_retries, - broker=options.broker) + mom_busname=options.mom_query_busname, + mom_servicename=options.mom_query_servicename, + broker=options.broker, + mom_broker=options.mom_query_service_broker) manager.run() if __name__ == '__main__': diff --git a/MAC/Deployment/data/StaticMetaData/CMakeLists.txt b/MAC/Deployment/data/StaticMetaData/CMakeLists.txt index aa733c417f2e49ae031c393e2415ac3d22729706..71266e4b37742f126b5199dc2a608deb0495e90d 100644 --- a/MAC/Deployment/data/StaticMetaData/CMakeLists.txt +++ b/MAC/Deployment/data/StaticMetaData/CMakeLists.txt @@ -16,13 +16,13 @@ lofar_add_sysconf_files( # relative dir as in the source, hence we've put a symlink # StaticMetaData -> . in this directory. file(GLOB staticmeta_data RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" - StaticMetaData/*.tmpl - StaticMetaData/*.test - StaticMetaData/*.dat - StaticMetaData/AntennaArrays/*.conf* - StaticMetaData/AntennaPos/*.conf* - StaticMetaData/CableDelays/*.conf* - StaticMetaData/iHBADeltas/*.conf* - StaticMetaData/AntennaFields/*.conf* - StaticMetaData/Attenuation/*.conf*) -lofar_add_sysconf_files(${staticmeta_data}) + *.tmpl + *.test + *.dat + AntennaArrays/*.conf* + AntennaPos/*.conf* + CableDelays/*.conf* + iHBADeltas/*.conf* + AntennaFields/*.conf* + Attenuation/*.conf*) +lofar_add_sysconf_files(${staticmeta_data} DESTINATION StaticMetaData) diff --git a/MAC/Services/src/PipelineControl.py b/MAC/Services/src/PipelineControl.py index 19edb2ca30d6eb249446850825f7bc211e2e222d..64a5e9877ed8978cd95ccae4fd7fd7caa5a41640 100755 --- a/MAC/Services/src/PipelineControl.py +++ b/MAC/Services/src/PipelineControl.py @@ -72,6 +72,7 @@ from lofar.sas.otdb.OTDBBusListener import OTDBBusListener from lofar.sas.otdb.config import DEFAULT_OTDB_NOTIFICATION_BUSNAME, DEFAULT_OTDB_SERVICE_BUSNAME from lofar.sas.otdb.otdbrpc import OTDBRPC from lofar.common.util import waitForInterrupt +from lofar.common import isProductionEnvironment from lofar.messaging.RPC import RPCTimeoutException, RPCException from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME as DEFAULT_RAS_SERVICE_BUSNAME @@ -173,18 +174,24 @@ class Parset(dict): @staticmethod def defaultDockerImage(): - return "lofar-pipeline:latest" + return "lofar-pipeline" @staticmethod def defaultDockerTag(): - return "latest" + if isProductionEnvironment(): + # "latest" refers to the current /production/ image + return "latest" + else: + # test/dev environments want to use their specific version, since they + # share images with the production environment + return runCommand("docker-template", "${LOFAR_TAG}") def dockerImage(self): # Return the version set in the parset, and fall back to our own version. image = self[PARSET_PREFIX + "Observation.ObservationControl.PythonControl.softwareVersion"] if not image: - return self.defaultDockerImage() + image = self.defaultDockerImage() if ":" in image: return image diff --git a/RTCP/Cobalt/GPUProc/etc/parset-additions.d/default/StationCalibration.parset b/RTCP/Cobalt/GPUProc/etc/parset-additions.d/default/StationCalibration.parset index 7bca4a649a4d4e2a30f8716dce228070164d494e..8fe274086055b3bbf4f5112380027872f0339bff 100644 --- a/RTCP/Cobalt/GPUProc/etc/parset-additions.d/default/StationCalibration.parset +++ b/RTCP/Cobalt/GPUProc/etc/parset-additions.d/default/StationCalibration.parset @@ -4072,96 +4072,96 @@ PIC.Core.RS509HBA.HBA_DUAL_INNER.HBA_210_250.delay.X = 0.000000e+00 PIC.Core.RS509HBA.HBA_DUAL_INNER.HBA_210_250.delay.Y = 0.000000e+00 PIC.Core.DE601LBA.LBA_INNER.LBA_10_70.phase0.X = 0.000000e+00 PIC.Core.DE601LBA.LBA_INNER.LBA_10_70.phase0.Y = 0.000000e+00 -PIC.Core.DE601LBA.LBA_INNER.LBA_10_70.delay.X = 1.100000e-06 -PIC.Core.DE601LBA.LBA_INNER.LBA_10_70.delay.Y = 1.100000e-06 +PIC.Core.DE601LBA.LBA_INNER.LBA_10_70.delay.X = 0.000000e+00 +PIC.Core.DE601LBA.LBA_INNER.LBA_10_70.delay.Y = 0.000000e+00 PIC.Core.DE601LBA.LBA_INNER.LBA_30_70.phase0.X = 0.000000e+00 PIC.Core.DE601LBA.LBA_INNER.LBA_30_70.phase0.Y = 0.000000e+00 -PIC.Core.DE601LBA.LBA_INNER.LBA_30_70.delay.X = 1.100000e-06 -PIC.Core.DE601LBA.LBA_INNER.LBA_30_70.delay.Y = 1.100000e-06 +PIC.Core.DE601LBA.LBA_INNER.LBA_30_70.delay.X = 0.000000e+00 +PIC.Core.DE601LBA.LBA_INNER.LBA_30_70.delay.Y = 0.000000e+00 PIC.Core.DE601LBA.LBA_INNER.LBA_10_90.phase0.X = 0.000000e+00 PIC.Core.DE601LBA.LBA_INNER.LBA_10_90.phase0.Y = 0.000000e+00 -PIC.Core.DE601LBA.LBA_INNER.LBA_10_90.delay.X = 1.100000e-06 -PIC.Core.DE601LBA.LBA_INNER.LBA_10_90.delay.Y = 1.100000e-06 +PIC.Core.DE601LBA.LBA_INNER.LBA_10_90.delay.X = 0.000000e+00 +PIC.Core.DE601LBA.LBA_INNER.LBA_10_90.delay.Y = 0.000000e+00 PIC.Core.DE601LBA.LBA_INNER.LBA_30_90.phase0.X = 0.000000e+00 PIC.Core.DE601LBA.LBA_INNER.LBA_30_90.phase0.Y = 0.000000e+00 -PIC.Core.DE601LBA.LBA_INNER.LBA_30_90.delay.X = 1.100000e-06 -PIC.Core.DE601LBA.LBA_INNER.LBA_30_90.delay.Y = 1.100000e-06 +PIC.Core.DE601LBA.LBA_INNER.LBA_30_90.delay.X = 0.000000e+00 +PIC.Core.DE601LBA.LBA_INNER.LBA_30_90.delay.Y = 0.000000e+00 PIC.Core.DE601LBA.LBA_OUTER.LBA_10_70.phase0.X = 0.000000e+00 PIC.Core.DE601LBA.LBA_OUTER.LBA_10_70.phase0.Y = 0.000000e+00 -PIC.Core.DE601LBA.LBA_OUTER.LBA_10_70.delay.X = 1.100000e-06 -PIC.Core.DE601LBA.LBA_OUTER.LBA_10_70.delay.Y = 1.100000e-06 +PIC.Core.DE601LBA.LBA_OUTER.LBA_10_70.delay.X = 0.000000e+00 +PIC.Core.DE601LBA.LBA_OUTER.LBA_10_70.delay.Y = 0.000000e+00 PIC.Core.DE601LBA.LBA_OUTER.LBA_30_70.phase0.X = 0.000000e+00 PIC.Core.DE601LBA.LBA_OUTER.LBA_30_70.phase0.Y = 0.000000e+00 -PIC.Core.DE601LBA.LBA_OUTER.LBA_30_70.delay.X = 1.100000e-06 -PIC.Core.DE601LBA.LBA_OUTER.LBA_30_70.delay.Y = 1.100000e-06 +PIC.Core.DE601LBA.LBA_OUTER.LBA_30_70.delay.X = 0.000000e+00 +PIC.Core.DE601LBA.LBA_OUTER.LBA_30_70.delay.Y = 0.000000e+00 PIC.Core.DE601LBA.LBA_OUTER.LBA_10_90.phase0.X = 0.000000e+00 PIC.Core.DE601LBA.LBA_OUTER.LBA_10_90.phase0.Y = 0.000000e+00 -PIC.Core.DE601LBA.LBA_OUTER.LBA_10_90.delay.X = 1.100000e-06 -PIC.Core.DE601LBA.LBA_OUTER.LBA_10_90.delay.Y = 1.100000e-06 +PIC.Core.DE601LBA.LBA_OUTER.LBA_10_90.delay.X = 0.000000e+00 +PIC.Core.DE601LBA.LBA_OUTER.LBA_10_90.delay.Y = 0.000000e+00 PIC.Core.DE601LBA.LBA_OUTER.LBA_30_90.phase0.X = 0.000000e+00 PIC.Core.DE601LBA.LBA_OUTER.LBA_30_90.phase0.Y = 0.000000e+00 -PIC.Core.DE601LBA.LBA_OUTER.LBA_30_90.delay.X = 1.100000e-06 -PIC.Core.DE601LBA.LBA_OUTER.LBA_30_90.delay.Y = 1.100000e-06 +PIC.Core.DE601LBA.LBA_OUTER.LBA_30_90.delay.X = 0.000000e+00 +PIC.Core.DE601LBA.LBA_OUTER.LBA_30_90.delay.Y = 0.000000e+00 PIC.Core.DE601HBA.HBA_JOINED.HBA_110_190.phase0.X = 0.000000e+00 PIC.Core.DE601HBA.HBA_JOINED.HBA_110_190.phase0.Y = 0.000000e+00 -PIC.Core.DE601HBA.HBA_JOINED.HBA_110_190.delay.X = 1.100000e-06 -PIC.Core.DE601HBA.HBA_JOINED.HBA_110_190.delay.Y = 1.100000e-06 +PIC.Core.DE601HBA.HBA_JOINED.HBA_110_190.delay.X = 0.000000e+00 +PIC.Core.DE601HBA.HBA_JOINED.HBA_110_190.delay.Y = 0.000000e+00 PIC.Core.DE601HBA.HBA_JOINED.HBA_170_230.phase0.X = 0.000000e+00 PIC.Core.DE601HBA.HBA_JOINED.HBA_170_230.phase0.Y = 0.000000e+00 -PIC.Core.DE601HBA.HBA_JOINED.HBA_170_230.delay.X = 1.100000e-06 -PIC.Core.DE601HBA.HBA_JOINED.HBA_170_230.delay.Y = 1.100000e-06 +PIC.Core.DE601HBA.HBA_JOINED.HBA_170_230.delay.X = 0.000000e+00 +PIC.Core.DE601HBA.HBA_JOINED.HBA_170_230.delay.Y = 0.000000e+00 PIC.Core.DE601HBA.HBA_JOINED.HBA_210_250.phase0.X = 0.000000e+00 PIC.Core.DE601HBA.HBA_JOINED.HBA_210_250.phase0.Y = 0.000000e+00 -PIC.Core.DE601HBA.HBA_JOINED.HBA_210_250.delay.X = 1.100000e-06 -PIC.Core.DE601HBA.HBA_JOINED.HBA_210_250.delay.Y = 1.100000e-06 +PIC.Core.DE601HBA.HBA_JOINED.HBA_210_250.delay.X = 0.000000e+00 +PIC.Core.DE601HBA.HBA_JOINED.HBA_210_250.delay.Y = 0.000000e+00 PIC.Core.DE601HBA.HBA_DUAL.HBA_110_190.phase0.X = 0.000000e+00 PIC.Core.DE601HBA.HBA_DUAL.HBA_110_190.phase0.Y = 0.000000e+00 -PIC.Core.DE601HBA.HBA_DUAL.HBA_110_190.delay.X = 1.100000e-06 -PIC.Core.DE601HBA.HBA_DUAL.HBA_110_190.delay.Y = 1.100000e-06 +PIC.Core.DE601HBA.HBA_DUAL.HBA_110_190.delay.X = 0.000000e+00 +PIC.Core.DE601HBA.HBA_DUAL.HBA_110_190.delay.Y = 0.000000e+00 PIC.Core.DE601HBA.HBA_DUAL.HBA_170_230.phase0.X = 0.000000e+00 PIC.Core.DE601HBA.HBA_DUAL.HBA_170_230.phase0.Y = 0.000000e+00 -PIC.Core.DE601HBA.HBA_DUAL.HBA_170_230.delay.X = 1.100000e-06 -PIC.Core.DE601HBA.HBA_DUAL.HBA_170_230.delay.Y = 1.100000e-06 +PIC.Core.DE601HBA.HBA_DUAL.HBA_170_230.delay.X = 0.000000e+00 +PIC.Core.DE601HBA.HBA_DUAL.HBA_170_230.delay.Y = 0.000000e+00 PIC.Core.DE601HBA.HBA_DUAL.HBA_210_250.phase0.X = 0.000000e+00 PIC.Core.DE601HBA.HBA_DUAL.HBA_210_250.phase0.Y = 0.000000e+00 -PIC.Core.DE601HBA.HBA_DUAL.HBA_210_250.delay.X = 1.100000e-06 -PIC.Core.DE601HBA.HBA_DUAL.HBA_210_250.delay.Y = 1.100000e-06 +PIC.Core.DE601HBA.HBA_DUAL.HBA_210_250.delay.X = 0.000000e+00 +PIC.Core.DE601HBA.HBA_DUAL.HBA_210_250.delay.Y = 0.000000e+00 PIC.Core.DE601HBA.HBA_ZERO.HBA_110_190.phase0.X = 0.000000e+00 PIC.Core.DE601HBA.HBA_ZERO.HBA_110_190.phase0.Y = 0.000000e+00 -PIC.Core.DE601HBA.HBA_ZERO.HBA_110_190.delay.X = 1.100000e-06 -PIC.Core.DE601HBA.HBA_ZERO.HBA_110_190.delay.Y = 1.100000e-06 +PIC.Core.DE601HBA.HBA_ZERO.HBA_110_190.delay.X = 0.000000e+00 +PIC.Core.DE601HBA.HBA_ZERO.HBA_110_190.delay.Y = 0.000000e+00 PIC.Core.DE601HBA.HBA_ZERO.HBA_170_230.phase0.X = 0.000000e+00 PIC.Core.DE601HBA.HBA_ZERO.HBA_170_230.phase0.Y = 0.000000e+00 -PIC.Core.DE601HBA.HBA_ZERO.HBA_170_230.delay.X = 1.100000e-06 -PIC.Core.DE601HBA.HBA_ZERO.HBA_170_230.delay.Y = 1.100000e-06 +PIC.Core.DE601HBA.HBA_ZERO.HBA_170_230.delay.X = 0.000000e+00 +PIC.Core.DE601HBA.HBA_ZERO.HBA_170_230.delay.Y = 0.000000e+00 PIC.Core.DE601HBA.HBA_ZERO.HBA_210_250.phase0.X = 0.000000e+00 PIC.Core.DE601HBA.HBA_ZERO.HBA_210_250.phase0.Y = 0.000000e+00 -PIC.Core.DE601HBA.HBA_ZERO.HBA_210_250.delay.X = 1.100000e-06 -PIC.Core.DE601HBA.HBA_ZERO.HBA_210_250.delay.Y = 1.100000e-06 +PIC.Core.DE601HBA.HBA_ZERO.HBA_210_250.delay.X = 0.000000e+00 +PIC.Core.DE601HBA.HBA_ZERO.HBA_210_250.delay.Y = 0.000000e+00 PIC.Core.DE601HBA.HBA_ONE.HBA_110_190.phase0.X = 0.000000e+00 PIC.Core.DE601HBA.HBA_ONE.HBA_110_190.phase0.Y = 0.000000e+00 -PIC.Core.DE601HBA.HBA_ONE.HBA_110_190.delay.X = 1.100000e-06 -PIC.Core.DE601HBA.HBA_ONE.HBA_110_190.delay.Y = 1.100000e-06 +PIC.Core.DE601HBA.HBA_ONE.HBA_110_190.delay.X = 0.000000e+00 +PIC.Core.DE601HBA.HBA_ONE.HBA_110_190.delay.Y = 0.000000e+00 PIC.Core.DE601HBA.HBA_ONE.HBA_170_230.phase0.X = 0.000000e+00 PIC.Core.DE601HBA.HBA_ONE.HBA_170_230.phase0.Y = 0.000000e+00 -PIC.Core.DE601HBA.HBA_ONE.HBA_170_230.delay.X = 1.100000e-06 -PIC.Core.DE601HBA.HBA_ONE.HBA_170_230.delay.Y = 1.100000e-06 +PIC.Core.DE601HBA.HBA_ONE.HBA_170_230.delay.X = 0.000000e+00 +PIC.Core.DE601HBA.HBA_ONE.HBA_170_230.delay.Y = 0.000000e+00 PIC.Core.DE601HBA.HBA_ONE.HBA_210_250.phase0.X = 0.000000e+00 PIC.Core.DE601HBA.HBA_ONE.HBA_210_250.phase0.Y = 0.000000e+00 -PIC.Core.DE601HBA.HBA_ONE.HBA_210_250.delay.X = 1.100000e-06 -PIC.Core.DE601HBA.HBA_ONE.HBA_210_250.delay.Y = 1.100000e-06 +PIC.Core.DE601HBA.HBA_ONE.HBA_210_250.delay.X = 0.000000e+00 +PIC.Core.DE601HBA.HBA_ONE.HBA_210_250.delay.Y = 0.000000e+00 PIC.Core.DE601HBA.HBA_DUAL_INNER.HBA_110_190.phase0.X = 0.000000e+00 PIC.Core.DE601HBA.HBA_DUAL_INNER.HBA_110_190.phase0.Y = 0.000000e+00 -PIC.Core.DE601HBA.HBA_DUAL_INNER.HBA_110_190.delay.X = 1.100000e-06 -PIC.Core.DE601HBA.HBA_DUAL_INNER.HBA_110_190.delay.Y = 1.100000e-06 +PIC.Core.DE601HBA.HBA_DUAL_INNER.HBA_110_190.delay.X = 0.000000e+00 +PIC.Core.DE601HBA.HBA_DUAL_INNER.HBA_110_190.delay.Y = 0.000000e+00 PIC.Core.DE601HBA.HBA_DUAL_INNER.HBA_170_230.phase0.X = 0.000000e+00 PIC.Core.DE601HBA.HBA_DUAL_INNER.HBA_170_230.phase0.Y = 0.000000e+00 -PIC.Core.DE601HBA.HBA_DUAL_INNER.HBA_170_230.delay.X = 1.100000e-06 -PIC.Core.DE601HBA.HBA_DUAL_INNER.HBA_170_230.delay.Y = 1.100000e-06 +PIC.Core.DE601HBA.HBA_DUAL_INNER.HBA_170_230.delay.X = 0.000000e+00 +PIC.Core.DE601HBA.HBA_DUAL_INNER.HBA_170_230.delay.Y = 0.000000e+00 PIC.Core.DE601HBA.HBA_DUAL_INNER.HBA_210_250.phase0.X = 0.000000e+00 PIC.Core.DE601HBA.HBA_DUAL_INNER.HBA_210_250.phase0.Y = 0.000000e+00 -PIC.Core.DE601HBA.HBA_DUAL_INNER.HBA_210_250.delay.X = 1.100000e-06 -PIC.Core.DE601HBA.HBA_DUAL_INNER.HBA_210_250.delay.Y = 1.100000e-06 +PIC.Core.DE601HBA.HBA_DUAL_INNER.HBA_210_250.delay.X = 0.000000e+00 +PIC.Core.DE601HBA.HBA_DUAL_INNER.HBA_210_250.delay.Y = 0.000000e+00 PIC.Core.DE602LBA.LBA_INNER.LBA_10_70.phase0.X = 0.000000e+00 PIC.Core.DE602LBA.LBA_INNER.LBA_10_70.phase0.Y = 0.000000e+00 PIC.Core.DE602LBA.LBA_INNER.LBA_10_70.delay.X = 0.000000e+00 diff --git a/RTCP/Cobalt/GPUProc/src/scripts/cobalt_functions.sh b/RTCP/Cobalt/GPUProc/src/scripts/cobalt_functions.sh index 5530dbd94508cc989c8dd481a4b40ba34fba35dc..cc1c9ff3a4a4c76db4ccdcff894f71f04dbc9598 100755 --- a/RTCP/Cobalt/GPUProc/src/scripts/cobalt_functions.sh +++ b/RTCP/Cobalt/GPUProc/src/scripts/cobalt_functions.sh @@ -53,10 +53,10 @@ function read_cluster_model { CEP4) HEADNODE=head01.cep4.control.lofar SLURM_PARTITION=cpu - COMPUTENODES="`ssh $HEADNODE sinfo --responding --states=idle,mixed,alloc --format=%n.cep4,%T --noheader --partition=$SLURM_PARTITION --sort=N | fgrep -v ,draining | cut -f1 -d,`" + COMPUTENODES="`ssh $HEADNODE sinfo --responding --states=idle,mixed,alloc --format=%n.cep4.infiniband.lofar,%T --noheader --partition=$SLURM_PARTITION --sort=N | fgrep -v ,draining | cut -f1 -d,`" if [ -z "$COMPUTENODES" ]; then echo "ERROR: Could not obtain list of available CEP4 nodes. Defaulting to all." - COMPUTENODES="`seq -f "cpu%02.0f.cep4" 1 50`" + COMPUTENODES="`seq -f "cpu%02.0f.cep4.infiniband.lofar" 1 50`" fi GLOBALFS_DIR=/data @@ -71,10 +71,10 @@ function read_cluster_model { DRAGNET) HEADNODE=dragnet.control.lofar SLURM_PARTITION=lofarobs # NOTE: sinfo (without -a) only displays this partition for members of the lofarsys group (+ slurm,root) - COMPUTENODES="`ssh $HEADNODE sinfo --responding --states=idle,mixed,alloc --format=%n.control.lofar,%T --noheader --partition=$SLURM_PARTITION --sort=N | fgrep -v ,draining | cut -f1 -d,`" + COMPUTENODES="`ssh $HEADNODE sinfo --responding --states=idle,mixed,alloc --format=%n-ib.dragnet.infiniband.lofar,%T --noheader --partition=$SLURM_PARTITION --sort=N | fgrep -v ,draining | cut -f1 -d,`" if [ -z "$COMPUTENODES" ]; then - echo "ERROR: Could not obtain list of available DRAGNET nodes. Defaulting to drg01 - drg20 .control.lofar" - COMPUTENODES="`seq -f "drg%02.0f.control.lofar" 1 20`" + echo "ERROR: Could not obtain list of available DRAGNET nodes. Defaulting to drg01 - drg20 -ib.dragnet.infiniband.lofar" + COMPUTENODES="`seq -f "drg%02-ib.dragnet.infiniband.lofar" 1 20`" fi #SLURM=true diff --git a/SAS/OTDB_Services/OTDBBusListener.py b/SAS/OTDB_Services/OTDBBusListener.py index bd2bf4367a468105648cb41d3da2fd9723d43d7c..99ef88fcc33bca222acd0e20b00876ca37ab9e8d 100644 --- a/SAS/OTDB_Services/OTDBBusListener.py +++ b/SAS/OTDB_Services/OTDBBusListener.py @@ -100,6 +100,14 @@ class OTDBBusListener(AbstractBusListener): else: logger.info("OTDBBusListener.handleMessage - handled unknown state: %s", msg.content['state']) + # apart from calling the above methods for known predefined states, + # also always call plain onObservationStatusChanged + # so subclasses can act on any status in this generic method. + self.onObservationStatusChanged(treeId, msg.content['state'], modificationTime) + + def onObservationStatusChanged(self, treeId, new_status, modificationTime): + pass + def onObservationDescribed(self, treeId, modificationTime): pass diff --git a/SAS/ResourceAssignment/OTDBtoRATaskStatusPropagator/propagator.py b/SAS/ResourceAssignment/OTDBtoRATaskStatusPropagator/propagator.py index 3e6b8a1264687c6d46b39593b45abcc23fde180d..8e1a4112c3dd95835d6b8f6c0f737438f91058d0 100644 --- a/SAS/ResourceAssignment/OTDBtoRATaskStatusPropagator/propagator.py +++ b/SAS/ResourceAssignment/OTDBtoRATaskStatusPropagator/propagator.py @@ -18,6 +18,8 @@ from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC logger = logging.getLogger(__name__) +STORAGE_CLAIM_EXTENSION=timedelta(days=365) + class OTDBtoRATaskStatusPropagator(OTDBBusListener): def __init__(self, otdb_notification_busname=DEFAULT_OTDB_NOTIFICATION_BUSNAME, @@ -104,6 +106,9 @@ class OTDBtoRATaskStatusPropagator(OTDBBusListener): def onObservationConflict(self, treeId, modificationTime): self._update_radb_task_status(treeId, 'conflict') + def onObservationError(self, treeId, modificationTime): + self._update_radb_task_status(treeId, 'error') + def onObservationObsolete(self, treeId, modificationTime): self._update_radb_task_status(treeId, 'obsolete') @@ -144,8 +149,27 @@ class OTDBtoRATaskStatusPropagator(OTDBBusListener): new_startime = max([max_pred_endtime, datetime.utcnow()]) new_endtime = new_startime + timedelta(seconds=task['duration']) - logger.info("Updating task %s (otdb_id=%s, status=queued) startime to \'%s\' and endtime to \'%s\'", task['id'], treeId, new_startime, new_endtime) - self.radb.updateTaskAndResourceClaims(task['id'], starttime=new_startime, endtime=new_endtime) + logger.info("Updating task %s (otdb_id=%s, status=active) startime to \'%s\' and endtime to \'%s\' except for storage claims", + task['id'], treeId, new_startime, new_endtime) + + #update task and all claim start/endtimes, except for storage claims. + non_storage_resource_type_ids = [rt['id'] for rt in self.radb.getResourceTypes() if rt['name'] != 'storage'] + self.radb.updateTaskAndResourceClaims(task['id'], + where_resource_types=non_storage_resource_type_ids, + starttime=new_startime, + endtime=new_endtime) + + #get remaining storage claims... + #and update storage start/end times (including 1 year extra) + logger.info("Updating storage claims for task %s (otdb_id=%s, status=active) startime to \'%s\' and endtime to \'%s\' (with extra storage claim time)", + task['id'], treeId, new_startime, new_endtime+STORAGE_CLAIM_EXTENSION) + + storage_claims = self.radb.getResourceClaims(task_ids=task['id'], resource_type='storage') + storage_claim_ids = [c['id'] for c in storage_claims] + self.radb.updateResourceClaims(where_resource_claim_ids=storage_claim_ids, + starttime=new_startime, + endtime=new_endtime+STORAGE_CLAIM_EXTENSION) + except Exception as e: logger.error(e) @@ -161,8 +185,26 @@ class OTDBtoRATaskStatusPropagator(OTDBBusListener): new_startime = otdb_task['starttime'] new_endtime = new_startime + timedelta(seconds=radb_task['duration']) - logger.info("Updating task %s (otdb_id=%s, status=active) startime to \'%s\' and endtime to \'%s\'", radb_task['id'], treeId, new_startime, new_endtime) - self.radb.updateTaskAndResourceClaims(radb_task['id'], starttime=new_startime, endtime=new_endtime) + logger.info("Updating task %s (otdb_id=%s, status=active) startime to \'%s\' and endtime to \'%s\' except for storage claims", + radb_task['id'], treeId, new_startime, new_endtime) + + #update task and all claim start/endtimes, except for storage claims. + non_storage_resource_type_ids = [rt['id'] for rt in self.radb.getResourceTypes() if rt['name'] != 'storage'] + self.radb.updateTaskAndResourceClaims(radb_task['id'], + where_resource_types=non_storage_resource_type_ids, + starttime=new_startime, + endtime=new_endtime) + + #get remaining storage claims... + #and update storage start/end times (including 1 year extra) + logger.info("Updating storage claims for task %s (otdb_id=%s, status=active) startime to \'%s\' and endtime to \'%s\' with extra claim time", + radb_task['id'], treeId, new_startime, new_endtime) + + storage_claims = self.radb.getResourceClaims(task_ids=radb_task['id'], resource_type='storage') + storage_claim_ids = [c['id'] for c in storage_claims] + self.radb.updateResourceClaims(where_resource_claim_ids=storage_claim_ids, + starttime=new_startime, + endtime=new_endtime+STORAGE_CLAIM_EXTENSION) def onObservationCompleting(self, treeId, modificationTime): self._update_radb_task_status(treeId, 'completing') @@ -180,8 +222,23 @@ class OTDBtoRATaskStatusPropagator(OTDBBusListener): if otdb_task and (otdb_task['starttime'] != radb_task['starttime'] or otdb_task['stoptime'] != radb_task['endtime']): new_endtime = otdb_task['stoptime'] - logger.info("Updating task %s (otdb_id=%s, status=%s) endtime to \'%s\'", radb_task['id'], treeId, radb_task['status'], new_endtime) - self.radb.updateTaskAndResourceClaims(radb_task['id'], endtime=new_endtime) + logger.info("Updating task %s (otdb_id=%s, status=%s) endtime to \'%s\' except for storage resource claims", radb_task['id'], treeId, radb_task['status'], new_endtime) + + #update task and all claim endtimes, except for storage claims. + non_storage_resource_type_ids = [rt['id'] for rt in self.radb.getResourceTypes() if rt['name'] != 'storage'] + self.radb.updateTaskAndResourceClaims(radb_task['id'], + where_resource_types=non_storage_resource_type_ids, + endtime=new_endtime) + + #get remaining storage claims... + #and extend storage end time + logger.info("Updating storage claims for task %s (otdb_id=%s, status=%s) endtime to \'%s\' (with extra storage claim time)", + radb_task['id'], treeId, radb_task['status'], new_endtime+STORAGE_CLAIM_EXTENSION) + + storage_claims = self.radb.getResourceClaims(task_ids=radb_task['id'], resource_type='storage') + storage_claim_ids = [c['id'] for c in storage_claims] + self.radb.updateResourceClaims(where_resource_claim_ids=storage_claim_ids, + endtime=new_endtime+STORAGE_CLAIM_EXTENSION) def onObservationFinished(self, treeId, modificationTime): self._update_radb_task_status(treeId, 'finished') diff --git a/SAS/ResourceAssignment/RATaskSpecifiedService/lib/RATaskSpecified.py b/SAS/ResourceAssignment/RATaskSpecifiedService/lib/RATaskSpecified.py index 91a16fc1f5982a1c3543c7c84e28e6de4ea40a71..048b624d22b3fae9fad8e0b5e758b6c74a3c33dd 100755 --- a/SAS/ResourceAssignment/RATaskSpecifiedService/lib/RATaskSpecified.py +++ b/SAS/ResourceAssignment/RATaskSpecifiedService/lib/RATaskSpecified.py @@ -121,17 +121,21 @@ class RATaskSpecified(OTDBBusListener): add("Observation.startTime") add("Observation.stopTime") + # ===================================== + # Task settings + # ===================================== + add("Observation.momID") + add("Observation.startTime") + add("Observation.stopTime") + # ===================================== # Observation settings # ===================================== if radb_type == "observation": - add("Observation.momID") add("Observation.sampleClock", as_int) add("Observation.nrBitsPerSample", as_int) add("Observation.antennaSet") add("Observation.VirtualInstrument.stationList", as_strvector) - add("Observation.startTime") - add("Observation.stopTime") add("Observation.nrBeams", as_int) nrSAPs = subset.get("Observation.nrBeams", 0) diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py b/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py index cb5f64eb58a88c3aec019362c89da38068de4c4e..c2ef36f5dce30312818b654bb79ca98ed0627249 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py @@ -338,7 +338,7 @@ class ResourceAssigner(): logger.info('doAssignment: %d claims were inserted in the radb' % len(claim_ids)) if len(claim_ids) != len(claims): logger.error('doAssignment: too few claims were inserted in the radb') - self._announceStateChange(task, 'conflict') + self._announceStateChange(task, 'error') return conflictingClaims = self.radbrpc.getResourceClaims(task_ids=taskId, status='conflict') diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb.py b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb.py index ed3d7ba68e196cc9ce5fe28f28d3351d564e05b8..cdbec51e431e6f2d26f92de7ec46edb891090da8 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb.py +++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb.py @@ -1266,10 +1266,11 @@ class RADatabase: def updateResourceClaim(self, resource_claim_id, resource_id=None, task_id=None, starttime=None, endtime=None, status=None, claim_size=None, username=None, used_rcus=None, user_id=None, commit=True): - return self.updateResourceClaims([resource_claim_id], None, resource_id, task_id, starttime, endtime, status, + return self.updateResourceClaims([resource_claim_id], None, None, resource_id, task_id, starttime, endtime, status, claim_size, username, used_rcus, user_id, commit) - def updateResourceClaims(self, where_resource_claim_ids=None, where_task_ids=None, resource_id=None, task_id=None, starttime=None, endtime=None, + def updateResourceClaims(self, where_resource_claim_ids=None, where_task_ids=None, where_resource_types=None, + resource_id=None, task_id=None, starttime=None, endtime=None, status=None, claim_size=None, username=None, used_rcus=None, user_id=None, commit=True): '''Update the given paramenters on all resource claims given/delimited by where_resource_claim_ids and/or where_task_ids. @@ -1353,8 +1354,27 @@ class RADatabase: conditions.append('task_id in %s') values.append(tuple(where_task_ids)) + if where_resource_types is not None: + if isinstance(where_resource_types, basestring) or isinstance(where_resource_types, int): + where_resource_types = [where_resource_types] + elif not isinstance(where_resource_types, collections.Iterable): + where_resource_types = [where_resource_types] + + # convert any resource_type name to id + resource_type_names = set([x for x in where_resource_types if isinstance(x, basestring)]) + if resource_type_names: + resource_type_name_to_id = {x['name']:x['id'] for x in self.getResourceTypes()} + where_resource_type_ids = [resource_type_name_to_id[x] if isinstance(x, basestring) else x + for x in where_resource_types] + else: + where_resource_type_ids = [x for x in where_resource_types] + + conditions.append('resource_type_id in %s') + values.append(tuple(where_resource_type_ids)) + query += ' WHERE ' + ' AND '.join(conditions) + self._executeQuery(query, values) if commit: @@ -1363,7 +1383,7 @@ class RADatabase: return self.cursor.rowcount > 0 - def updateTaskAndResourceClaims(self, task_id, starttime=None, endtime=None, task_status=None, claim_status=None, username=None, used_rcus=None, user_id=None, commit=True): + def updateTaskAndResourceClaims(self, task_id, starttime=None, endtime=None, task_status=None, claim_status=None, username=None, used_rcus=None, user_id=None, where_resource_types=None, commit=True): '''combination of updateResourceClaims and updateTask in one transaction''' updated = True @@ -1371,6 +1391,7 @@ class RADatabase: username is not None or used_rcus is not None or user_id is not None): # update the claims as well updated &= self.updateResourceClaims(where_task_ids=task_id, + where_resource_types=where_resource_types, starttime=starttime, endtime=endtime, status=claim_status, @@ -1613,6 +1634,8 @@ class RADatabase: resource_type_name_to_id = {x['name']:x['id'] for x in self.getResourceTypes()} resource_type_ids = [resource_type_name_to_id[x] if isinstance(x, basestring) else x for x in resource_types] + else: + resource_type_ids = [x for x in resource_types] conditions.append('type_id in %s') qargs.append(tuple(resource_type_ids)) diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb/sql/add_functions_and_triggers.sql b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb/sql/add_functions_and_triggers.sql index eabae3893ccaafc09d95e67f17935f7eb27daa30..e8389aeea5d35fe4362529e576e6c3dacf847f90 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb/sql/add_functions_and_triggers.sql +++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb/sql/add_functions_and_triggers.sql @@ -485,7 +485,7 @@ $$ DECLARE claimed_status_id int := 1; --beware: hard coded instead of lookup for performance max_resource_usage resource_allocation.resource_usage; - max_resource_usage_value int; + max_resource_usage_value bigint; available_capacity bigint; BEGIN SELECT * FROM resource_allocation.get_max_resource_usage_between(_resource_id, claimed_status_id, _lower, _upper) INTO max_resource_usage; @@ -521,6 +521,7 @@ BEGIN --get all overlapping_claims, check whether they cause a conflict or not. SET LOCAL client_min_messages=warning; --prevent "table overlapping_claims does not exist, skipping" message DROP TABLE IF EXISTS overlapping_claims; -- TODO: use CREATE TEMPORARY TABLE IF NOT EXISTS when we will use postgres 9.5+ + SET LOCAL client_min_messages=info; --back to normal log level CREATE TEMPORARY TABLE overlapping_claims ON COMMIT DROP AS SELECT * FROM resource_allocation.resource_claim rc @@ -608,6 +609,9 @@ DECLARE claim_conflict_status_id int := 2; --beware: hard coded instead of lookup for performance task_approved_status_id int := 300; --beware: hard coded instead of lookup for performance task_conflict_status_id int := 335; --beware: hard coded instead of lookup for performance + task_prescheduled_status_id int := 350; --beware: hard coded instead of lookup for performance + task_scheduled_status_id int := 400; --beware: hard coded instead of lookup for performance + task_queued_status_id int := 500; --beware: hard coded instead of lookup for performance claim_has_conflicts boolean; BEGIN --order of following steps is important, do not reorder the steps @@ -621,34 +625,51 @@ BEGIN IF TG_OP = 'UPDATE' THEN -- bounce any updated claim which has conflict state, but is tried to be updated to claimed + -- only this function can 'reset' the conflict state back to tentative! IF NEW.status_id = claim_claimed_status_id AND OLD.status_id = claim_conflict_status_id THEN RAISE EXCEPTION 'cannot update claim-in-conflict to status claimed; old:% new:%', OLD, NEW; END IF; - -- bounce any updated claim which has claimed state onto which other properties are changed - IF NEW.status_id = claim_claimed_status_id AND OLD.status_id = NEW.status_id THEN - RAISE EXCEPTION 'updates on properties of claimed claim are not allowed; old:% new:%', OLD, NEW; + -- bounce any claim_size updates on claimed claims + IF NEW.status_id = claim_claimed_status_id AND OLD.claim_size <> NEW.claim_size THEN + RAISE EXCEPTION 'cannot update claim size on claimed claim; old:% new:%', OLD, NEW; + END IF; + + -- bounce any task_id updates + IF OLD.task_id <> NEW.task_id THEN + RAISE EXCEPTION 'cannot change the task to which a claim belongs; old:% new:%', OLD, NEW; + END IF; + + -- bounce any resource_id updates + IF OLD.resource_id <> NEW.resource_id THEN + RAISE EXCEPTION 'cannot change the resource to which a claim belongs; old:% new:%', OLD, NEW; END IF; END IF; - IF TG_OP = 'INSERT' OR TG_OP = 'UPDATE' THEN - --only check claim if in tentative/conflict status, and status or claim_size or start/end time changed - IF NEW.status_id = claim_tentative_status_id OR NEW.status_id = claim_conflict_status_id THEN - IF TG_OP = 'INSERT' OR (TG_OP = 'UPDATE' AND (OLD.status_id <> NEW.status_id OR - OLD.claim_size <> NEW.claim_size OR - OLD.starttime <> NEW.starttime OR - OLD.endtime <> NEW.endtime)) THEN - --check if claim fits or has conflicts - SELECT * FROM resource_allocation.has_conflict_with_overlapping_claims(NEW) INTO claim_has_conflicts; - - IF claim_has_conflicts AND NEW.status_id <> claim_conflict_status_id THEN + --only check claim if status and/or claim_size and/or start/end time changed + IF TG_OP = 'INSERT' OR (TG_OP = 'UPDATE' AND (OLD.status_id <> NEW.status_id OR + OLD.claim_size <> NEW.claim_size OR + OLD.starttime <> NEW.starttime OR + OLD.endtime <> NEW.endtime)) THEN + --check if claim fits or has conflicts + SELECT * FROM resource_allocation.has_conflict_with_overlapping_claims(NEW) INTO claim_has_conflicts; + + IF claim_has_conflicts THEN + IF NEW.status_id <> claim_conflict_status_id THEN + -- only set claims to conflict if task status <= queued + -- when a claim goes to conflict, then so does it's task, and we don't want that for running/finished/aborted tasks + IF EXISTS (SELECT 1 FROM resource_allocation.task + WHERE id=NEW.task_id + AND status_id = ANY(ARRAY[300, 335, 350, 400, 500])) THEN -- hardcoded tasks statuses <= queued -- conflict with others, so set claim status to conflict NEW.status_id := claim_conflict_status_id; - ELSIF NOT claim_has_conflicts AND NEW.status_id <> claim_tentative_status_id THEN - -- no conflict (anymore) with others, so set claim status to tentative - NEW.status_id := claim_tentative_status_id; END IF; END IF; + ELSE + -- no conflict (anymore) with others, so set claim status to tentative if currently in conflict + IF NEW.status_id = claim_conflict_status_id THEN + NEW.status_id := claim_tentative_status_id; + END IF; END IF; END IF; diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/t_radb.py b/SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/t_radb.py index 9c04e2b22654de6f1137a9e6d606a6165e0f48e0..156a34b6802b04833e74b1c2c70d86ab83c44d3b 100755 --- a/SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/t_radb.py +++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/t_radb.py @@ -1055,7 +1055,7 @@ class ResourceAssignmentDatabaseTest(unittest.TestCase): 'status': 'claimed', 'claim_size': 10 } t1_faulty_claim_ids = self.radb.insertResourceClaims(task_id1, [t1_claim2], 'foo', 1, 1) - self.assertEqual(0, len(t1_faulty_claim_ids)) + self.assertEqual(1, len(self.radb.getResourceClaims(task_ids=task_id1))) #there should still be one (proper/non-faulty) claim for this task # try to insert a claim with the wrong (already 'conflict') status. Should rollback, and return no ids. t1_claim3 = { 'resource_id': 117, @@ -1064,7 +1064,8 @@ class ResourceAssignmentDatabaseTest(unittest.TestCase): 'status': 'conflict', 'claim_size': 10 } t1_faulty_claim_ids = self.radb.insertResourceClaims(task_id1, [t1_claim3], 'foo', 1, 1) - self.assertEqual(0, len(t1_faulty_claim_ids)) + t1_faulty_claim_ids = self.radb.insertResourceClaims(task_id1, [t1_claim2], 'foo', 1, 1) + self.assertEqual(1, len(self.radb.getResourceClaims(task_ids=task_id1))) #there should still be one (proper/non-faulty) claim for this task # try to update the task status to scheduled, should not succeed, since it's claims are not 'claimed' yet. self.assertFalse(self.radb.updateTask(task_id1, task_status='scheduled')) @@ -1118,6 +1119,13 @@ class ResourceAssignmentDatabaseTest(unittest.TestCase): self.assertEqual(set([task_id1]), set(t['id'] for t in self.radb.get_conflicting_overlapping_tasks(t2_claim_ids[0]))) + #try to connect this claim to task1, should fail + self.assertFalse(self.radb.updateResourceClaims(t2_claim_ids, task_id=task_id1)) + self.assertEqual(task_id2, t2_claims[0]['task_id']) + + #try to connect this claim to other resource, should fail + self.assertFalse(self.radb.updateResourceClaims(t2_claim_ids, resource_id=118)) + self.assertEqual(117, t2_claims[0]['resource_id']) # try to update the task status to scheduled, should not succeed, since it's claims are not 'claimed' yet. self.assertFalse(self.radb.updateTask(task_id2, task_status='scheduled')) @@ -1147,6 +1155,44 @@ class ResourceAssignmentDatabaseTest(unittest.TestCase): self.assertEqual(0, len(self.radb.get_conflicting_overlapping_claims(t2_claim_ids[0]))) self.assertEqual(0, len(self.radb.get_conflicting_overlapping_tasks(t2_claim_ids[0]))) + # updating task/claim start/endtime should work, even for scheduled tasks with claimed claims + # effect might be that a scheduled tasks goes to conflict + # force conflict by moving back to original start/endtimes + self.assertTrue(self.radb.updateTaskAndResourceClaims(task_id2, starttime=task2['starttime'], endtime=task2['endtime'])) + self.assertEqual('conflict', self.radb.getResourceClaim(t2_claim_ids[0])['status']) + self.assertEqual('conflict', self.radb.getTask(task_id2)['status']) + + # again do conflict resolution, shift task and claims + self.assertTrue(self.radb.updateTaskAndResourceClaims(task_id2, starttime=now+timedelta(hours=2), endtime=now+timedelta(hours=3))) + self.assertTrue(self.radb.updateTaskAndResourceClaims(task_id2, claim_status='claimed', task_status='scheduled')) + # now the task and claim status should be scheduled/claimed + self.assertEqual('scheduled', self.radb.getTask(task_id2)['status']) + self.assertEqual('claimed', self.radb.getResourceClaim(t2_claim_ids[0])['status']) + + # updating task/claim start/endtime should work, even for scheduled tasks with claimed claims + # effect might be that a scheduled tasks goes to conflict + # now, make simple endtime adjustment, task should stay scheduled + self.assertTrue(self.radb.updateTaskAndResourceClaims(task_id2, endtime=now+timedelta(hours=2.75))) + # now the task and claim status should still be scheduled/claimed + self.assertEqual('scheduled', self.radb.getTask(task_id2)['status']) + self.assertEqual('claimed', self.radb.getResourceClaim(t2_claim_ids[0])['status']) + + # now some weird corner case... + # when a task is > queued (for example, finished) + # then we don't want conflict statuses anymore if we update start/endtimes + # test here with weird starttime shift back to overlap with task1 + self.assertTrue(self.radb.updateTask(task_id2, task_status='finished')) + self.assertEqual('finished', self.radb.getTask(task_id2)['status']) + self.assertTrue(self.radb.updateTaskAndResourceClaims(task_id2, starttime=task1['starttime'])) + self.assertEqual('finished', self.radb.getTask(task_id2)['status']) + self.assertEqual('claimed', self.radb.getResourceClaim(t2_claim_ids[0])['status']) + + #ok, that works, now set the start/end time back to 'normal' for some later test cases + self.assertTrue(self.radb.updateTaskAndResourceClaims(task_id2, starttime=now+timedelta(hours=2), endtime=now+timedelta(hours=3))) + self.assertEqual('finished', self.radb.getTask(task_id2)['status']) + self.assertEqual('claimed', self.radb.getResourceClaim(t2_claim_ids[0])['status']) + + logger.info('------------------------------ concludes task 2 ------------------------------') logger.info('-- now test with a 3rd task, and test resource availability, conflicts etc. --') @@ -1232,6 +1278,7 @@ class ResourceAssignmentDatabaseTest(unittest.TestCase): self.assertEqual('claimed', self.radb.getResourceClaim(t3_claim_ids[0])['status']) self.assertEqual('scheduled', self.radb.getTask(task_id3)['status']) + # suppose the resource_usages table is broken for some reason, fix it.... # break it first... self._execute_query('TRUNCATE TABLE resource_allocation.resource_usage;') diff --git a/SAS/ResourceAssignment/ResourceAssignmentEditor/bin/raewebservice.ini b/SAS/ResourceAssignment/ResourceAssignmentEditor/bin/raewebservice.ini index a4bb85e8ea9ddd771739132be83ed5157fe51264..0095ddcd2b3a0db93aec6a0763a1778e7722f060 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEditor/bin/raewebservice.ini +++ b/SAS/ResourceAssignment/ResourceAssignmentEditor/bin/raewebservice.ini @@ -1,5 +1,5 @@ [program:raewebservice] -command=/bin/bash -c 'source $LOFARROOT/lofarinit.sh;exec raewebservice -p 7412' +command=/bin/bash -c 'source $LOFARROOT/lofarinit.sh;exec raewebservice' user=lofarsys stopsignal=INT ; KeyboardInterrupt stopasgroup=true ; bash does not propagate signals diff --git a/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/static/app/controllers/gridcontroller.js b/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/static/app/controllers/gridcontroller.js index 22ef3636b5adbb06827ee1f62cb8f76ee5877410..6a362afa541b5f21d770a0a99d9ba66dc81fb912 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/static/app/controllers/gridcontroller.js +++ b/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/static/app/controllers/gridcontroller.js @@ -495,7 +495,7 @@ gridControllerMod.controller('GridController', ['$scope', '$window', 'dataServic fillTypeColumFilterSelectOptions(); fillProjectsColumFilterSelectOptions(); fillGroupsColumFilterSelectOptions(); - fillColumFilterSelectOptions(['CEP2', 'CEP4'], $scope.columns.find(function(c) {return c.field == 'cluster'; })); + fillColumFilterSelectOptions(['CEP2', 'CEP4', 'DRAGNET'], $scope.columns.find(function(c) {return c.field == 'cluster'; })); } }; @@ -851,36 +851,36 @@ gridControllerMod.directive('contextMenu', ['$document', '$window', function($do }); } - var approved_selected_cep4_tasks = selected_cep4_tasks.filter(function(t) { return t.status == 'approved'; }); + var schedulable_cep4_tasks = selected_cep4_tasks.filter(function(t) { return t.status == 'approved' || t.status == 'conflict' || t.status == 'error'; }); - if(approved_selected_cep4_tasks.length > 0) { - var liContent = '<li><a href="#">Schedule approved CEP4 task(s)</a></li>' + if(schedulable_cep4_tasks.length > 0) { + var liContent = '<li><a href="#">Schedule CEP4 task(s)</a></li>' var liElement = angular.element(liContent); ulElement.append(liElement); liElement.on('click', function() { closeContextMenu(); - for(var pl of approved_selected_cep4_tasks) { + for(var pl of schedulable_cep4_tasks) { var newTask = { id: pl.id, status: 'prescheduled' }; dataService.putTask(newTask); } }); } - var scheduled_selected_cep4_tasks = selected_cep4_tasks.filter(function(t) { return (t.status == 'prescheduled' || t.status == 'scheduled' || t.status == 'queued') }); + var unschedulable_selected_cep4_tasks = selected_cep4_tasks.filter(function(t) { return t.status == 'prescheduled' || t.status == 'scheduled' || t.status == 'queued' || t.status == 'error'; }); - if(scheduled_selected_cep4_tasks.length > 0) { - var liContent = '<li><a href="#">Unschedule (pre)scheduled/queued CEP4 tasks</a></li>' + if(unschedulable_selected_cep4_tasks.length > 0) { + var liContent = '<li><a href="#">Unschedule (pre)scheduled/queued/error tasks</a></li>' var liElement = angular.element(liContent); ulElement.append(liElement); liElement.on('click', function() { closeContextMenu(); - for(var t of scheduled_selected_cep4_tasks) { - if(t.status == 'queued') { - var newTask = { id: t.id, status: 'aborted' }; + for(var pl of unschedulable_selected_cep4_tasks) { + if(pl.status == 'queued') { + var newTask = { id: pl.id, status: 'aborted' }; dataService.putTask(newTask); } - var newTask = { id: t.id, status: 'approved' }; + var newTask = { id: pl.id, status: 'approved' }; dataService.putTask(newTask); } }); diff --git a/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/static/css/main.css b/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/static/css/main.css index 5d3d7dd21d56feeef41530ff4569622a4edff7f0..93140b295decac918922f81e71ff77e969c5f3b9 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/static/css/main.css +++ b/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/static/css/main.css @@ -227,6 +227,10 @@ div.gantt-task.task-status-blocked div{ background-color: #ccffcc !important; } +.grid-cluster-DRAGNET { + background-color: #ffffcc !important; +} + diff --git a/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/webservice.py b/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/webservice.py index 4060cf13096b16d0b1ca5378a395e3267718ab39..62175f115d6a33390632c4811aa2cb7adf52b061 100755 --- a/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/webservice.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/webservice.py @@ -406,49 +406,60 @@ def putTask(task_id): request.headers['Content-Type'].startswith('application/json'): try: updatedTask = json_loads(request.data) - logger.info('putTask: updatedTask: %s', updatedTask) if task_id != int(updatedTask['id']): abort(404, 'task_id in url is not equal to id in request.data') - org_task = radb().getTask(task_id) - - if not org_task: - abort(404, "unknown task %s" % updatedTask) - - for timeprop in ['starttime', 'endtime']: - if timeprop in updatedTask: - try: - updatedTask[timeprop] = asDatetime(updatedTask[timeprop]) - except ValueError: - abort(400, 'timestamp not in iso format: ' + updatedTask[timeprop]) + #check if task is known + task = radb().getTask(task_id) + if not task: + abort(404, "unknown task %s" % str(updatedTask)) + # first handle start- endtimes... if 'starttime' in updatedTask or 'endtime' in updatedTask: - # block time edits in productin for now until we've replaced the old scheduler. + logger.info('starttime or endtime in updatedTask: %s', updatedTask) if isProductionEnvironment(): - abort(403, 'Editing of startime/endtime of tasks by users is not yet approved') + abort(403, 'Editing of %s of tasks by users is not yet approved' % (time,)) + + #update dict for otdb spec + spec_update = {} + + for timeprop in ['starttime', 'endtime']: + if timeprop in updatedTask: + try: + updatedTask[timeprop] = asDatetime(updatedTask[timeprop]) + except ValueError: + abort(400, 'timestamp not in iso format: ' + updatedTask[timeprop]) + otdb_key = 'LOFAR.ObsSW.Observation.' + ('startTime' if timeprop == 'starttime' else 'stopTime') + spec_update[otdb_key] = updatedTask[timeprop].strftime('%Y-%m-%d %H:%M:%S') + + #update timestamps in both otdb and radb + otdbrpc.taskSetSpecification(task['otdb_id'], spec_update) # update the task's (and its claims) start/endtime # do not update the tasks status directly via the radb. See few lines below. task status is routed via otdb (and then ends up in radb automatically) # it might be that editing the start/end time results in a (rabd)task status update (for example to 'conflict' due to conflicting claims) # that's ok, since we'll update the status to the requested status later via otdb (see few lines below) - radb().updateTaskAndResourceClaims(task_id, starttime=updatedTask.get('starttime'), endtime=updatedTask.get('endtime')) + radb().updateTaskAndResourceClaims(task_id, + starttime=updatedTask.get('starttime'), + endtime=updatedTask.get('endtime')) + # ...then, handle status update which might trigger resource assignment, + # for which the above updated times are needed if 'status' in updatedTask: - if isProductionEnvironment() and org_task['type'] == 'observation': - abort(403, 'Editing of observation status by users is not yet approved') - try: - otdbrpc.taskSetStatus(org_task['otdb_id'], updatedTask['status']) + #update status in otdb only + #the status change will propagate automatically into radb via other services (by design) + otdbrpc.taskSetStatus(task['otdb_id'], updatedTask['status']) #block until radb and mom task status are equal to otdb task status (with timeout) start_wait = datetime.utcnow() while True: - org_task = radb().getTask(task_id) - details = momqueryrpc.getObjectDetails(org_task['mom_id']).get(org_task['mom_id']) + task = radb().getTask(task_id) + details = momqueryrpc.getProjectDetails(task['mom_id']).get(task['mom_id']) - if org_task['status'] == updatedTask['status'] and details['object_status'] == updatedTask['status']: + if task['status'] == updatedTask['status'] and details['object_status'] == updatedTask['status']: break if datetime.utcnow() - start_wait > timedelta(seconds=10): @@ -459,12 +470,16 @@ def putTask(task_id): if 'does not exist' in e.message: # task does not exist (anymore) in otdb #so remove it from radb as well (with cascading deletes on specification) - logger.warn('task with otdb_id %s does not exist anymore in OTDB. removing task radb_id %s from radb', org_task['otdb_id'], org_task['id']) - radb().deleteSpecification(org_task['specification_id']) + logger.warn('task with otdb_id %s does not exist anymore in OTDB. removing task radb_id %s from radb', task['otdb_id'], task['id']) + radb().deleteSpecification(task['specification_id']) if 'data_pinned' in updatedTask: - if not curpc.setTaskDataPinned(org_task['otdb_id'], updatedTask['data_pinned']): - abort(500, 'Could not (un)pin task') + task = radb().getTask(task_id) + + if not task: + abort(404, "unknown task %s" % str(updatedTask)) + + curpc.setTaskDataPinned(task['otdb_id'], updatedTask['data_pinned']) return "", 204 except Exception as e: diff --git a/SAS/ResourceAssignment/ResourceAssignmentService/rpc.py b/SAS/ResourceAssignment/ResourceAssignmentService/rpc.py index 6ee8e7480c32cb44b2389fc27057db41b7d77038..41573604cec7b245d5a4db2b72166bf324d1a2d4 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentService/rpc.py +++ b/SAS/ResourceAssignment/ResourceAssignmentService/rpc.py @@ -68,53 +68,67 @@ class RARPC(RPCWrapper): return resource_claim - def insertResourceClaim(self, resource_id, task_id, starttime, endtime, status, session_id, claim_size, username, + def insertResourceClaim(self, resource_id, task_id, starttime, endtime, status, claim_size, username, user_id, used_rcus=None, properties=None): return self.rpc('InsertResourceClaim', resource_id=resource_id, task_id=task_id, starttime=starttime, endtime=endtime, status=status, - session_id=session_id, claim_size=claim_size, username=username, used_rcus=used_rcus, user_id=user_id, properties=properties) - def insertResourceClaims(self, task_id, claims, session_id, username, user_id): + def insertResourceClaims(self, task_id, claims, username, user_id): return self.rpc('InsertResourceClaims', task_id=task_id, claims=claims, - session_id=session_id, username=username, user_id=user_id) def deleteResourceClaim(self, id): return self.rpc('DeleteResourceClaim', id=id) - def updateResourceClaim(self, id, resource_id=None, task_id=None, starttime=None, endtime=None, status=None, session_id=None, claim_size=None, username=None, used_rcus=None, user_id=None): + def updateResourceClaim(self, id, resource_id=None, task_id=None, starttime=None, endtime=None, status=None, claim_size=None, username=None, used_rcus=None, user_id=None): return self.rpc('UpdateResourceClaim', id=id, resource_id=resource_id, task_id=task_id, starttime=starttime, endtime=endtime, status=status, - session_id=session_id, claim_size=claim_size, username=username, used_rcus=used_rcus, user_id=user_id) - def updateTaskAndResourceClaims(self, task_id, starttime=None, endtime=None, task_status=None, claim_status=None, session_id=None, username=None, used_rcus=None, user_id=None): + def updateResourceClaims(self, where_resource_claim_ids=None, where_task_ids=None, where_resource_types=None, + resource_id=None, task_id=None, starttime=None, endtime=None, + status=None, claim_size=None, username=None, used_rcus=None, user_id=None, + commit=True): + return self.rpc('UpdateResourceClaims', where_resource_claim_ids=where_resource_claim_ids, + where_task_ids=where_task_ids, + where_resource_types=where_resource_types, + resource_id=resource_id, + task_id=task_id, + starttime=starttime, + endtime=endtime, + status=status, + claim_size=claim_size, + username=username, + used_rcus=used_rcus, + user_id=user_id) + + def updateTaskAndResourceClaims(self, task_id, starttime=None, endtime=None, task_status=None, claim_status=None, username=None, used_rcus=None, user_id=None, where_resource_types=None): return self.rpc('UpdateTaskAndResourceClaims', task_id=task_id, - starttime=starttime, - endtime=endtime, - task_status=task_status, - claim_status=claim_status, - session_id=session_id, - username=username, - used_rcus=used_rcus, - user_id=user_id) + starttime=starttime, + endtime=endtime, + task_status=task_status, + claim_status=claim_status, + username=username, + used_rcus=used_rcus, + user_id=user_id, + where_resource_types=where_resource_types) def getResourceUsages(self, claim_ids=None, lower_bound=None, upper_bound=None, resource_ids=None, task_ids=None, status=None, resource_type=None): all_usages = self.rpc('GetResourceUsages', diff --git a/SAS/ResourceAssignment/ResourceAssignmentService/service.py b/SAS/ResourceAssignment/ResourceAssignmentService/service.py index f2cfd92c144584fc93e4db5bfa5cbf2088795ab4..5a26990f69acb4907c1928687a48dff368dab322 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentService/service.py +++ b/SAS/ResourceAssignment/ResourceAssignmentService/service.py @@ -52,6 +52,7 @@ class RADBHandler(MessageHandlerInterface): 'InsertResourceClaim': self._insertResourceClaim, 'DeleteResourceClaim': self._deleteResourceClaim, 'UpdateResourceClaim': self._updateResourceClaim, + 'UpdateResourceClaims': self._updateResourceClaims, 'UpdateTaskAndResourceClaims': self._updateTaskAndResourceClaims, 'GetResourceUsages': self._getResourceUsages, 'GetResourceGroupTypes': self._getResourceGroupTypes, @@ -147,7 +148,6 @@ class RADBHandler(MessageHandlerInterface): ids = self.radb.insertResourceClaims(kwargs['task_id'], claims, - kwargs['session_id'], kwargs['username'], kwargs['user_id']) return {'ids':ids} @@ -159,7 +159,6 @@ class RADBHandler(MessageHandlerInterface): kwargs['starttime'].datetime(), kwargs['endtime'].datetime(), kwargs.get('status_id', kwargs.get('status')), - kwargs['session_id'], kwargs['claim_size'], kwargs['username'], kwargs['user_id'], @@ -182,12 +181,30 @@ class RADBHandler(MessageHandlerInterface): starttime=kwargs.get('starttime').datetime() if kwargs.get('starttime') else None, endtime=kwargs.get('endtime').datetime() if kwargs.get('endtime') else None, status=kwargs.get('status_id', kwargs.get('status')), - session_id=kwargs.get('session_id'), claim_size=kwargs.get('claim_size'), username=kwargs.get('username'), user_id=kwargs.get('user_id')) return {'id': id, 'updated': updated} + def _updateResourceClaims(self, **kwargs): + logger.info('UpdateResourceClaims: %s' % dict({k:v for k,v in kwargs.items() if v != None})) + task_id = kwargs['task_id'] + + updated = self.radb.updateResourceClaims(where_resource_claim_ids=kwargs.get('where_resource_claim_ids'), + where_task_ids=kwargs.get('where_task_ids'), + where_resource_types=kwargs.get('where_resource_types'), + resource_id=kwargs.get('resource_id'), + task_id=kwargs.get('task_id'), + starttime=kwargs.get('starttime').datetime() if kwargs.get('starttime') else None, + endtime=kwargs.get('endtime').datetime() if kwargs.get('endtime') else None, + status=kwargs.get('status_id', kwargs.get('status')), + claim_size=kwargs.get('status'), + username=kwargs.get('username'), + user_id=kwargs.get('user_id'), + used_rcus=None) + + return {'task_id': task_id, 'updated': updated} + def _updateTaskAndResourceClaims(self, **kwargs): logger.info('UpdateTaskAndResourceClaims: %s' % dict({k:v for k,v in kwargs.items() if v != None})) task_id = kwargs['task_id'] @@ -197,9 +214,11 @@ class RADBHandler(MessageHandlerInterface): endtime=kwargs.get('endtime').datetime() if kwargs.get('endtime') else None, task_status=kwargs.get('task_status_id', kwargs.get('task_status')), claim_status=kwargs.get('claim_status_id', kwargs.get('claim_status')), - session_id=kwargs.get('session_id'), username=kwargs.get('username'), - user_id=kwargs.get('user_id')) + user_id=kwargs.get('user_id'), + where_resource_types=kwargs.get('where_resource_types'), + commit=kwargs.get('commit', True)) + return {'task_id': task_id, 'updated': updated} def _getResourceUsages(self, **kwargs): diff --git a/SubSystems/RAServices/CMakeLists.txt b/SubSystems/RAServices/CMakeLists.txt index 2010af21aef64a63becd466ef1a04f0cbbdbc90e..7c3995e9c214865902c9ff97b1d6c1ef51369d4d 100644 --- a/SubSystems/RAServices/CMakeLists.txt +++ b/SubSystems/RAServices/CMakeLists.txt @@ -18,6 +18,7 @@ lofar_package(RAServices TriggerEmailService TaskPrescheduler DataManagement + QPIDInfrastructure RAScripts StaticMetaData)