diff --git a/CMake/LofarPackageList.cmake b/CMake/LofarPackageList.cmake index f151e9395a89417b77f596cc5105a4f0ef8ef335..77ba86e16df07e3d8b991d60e12f5ab4a15ca61a 100644 --- a/CMake/LofarPackageList.cmake +++ b/CMake/LofarPackageList.cmake @@ -1,7 +1,7 @@ # - Create for each LOFAR package a variable containing the absolute path to # its source directory. # -# Generated by gen_LofarPackageList_cmake.sh at di 9 feb 2021 14:22:29 CET +# Generated by gen_LofarPackageList_cmake.sh at do 18 feb 2021 9:48:57 CET # # ---- DO NOT EDIT ---- # @@ -216,6 +216,7 @@ if(NOT DEFINED LOFAR_PACKAGE_LIST_INCLUDED) set(TMSSPostgresListenerService_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TMSS/backend/services/tmss_postgres_listener) set(TMSSWebSocketService_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TMSS/backend/services/websocket) set(TMSSWorkflowService_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TMSS/backend/services/workflow_service) + set(TMSSLTAAdapter_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TMSS/backend/services/tmss_lta_adapter) set(TriggerEmailServiceCommon_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TriggerEmailService/Common) set(TriggerEmailServiceServer_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TriggerEmailService/Server) set(CCU_MAC_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SubSystems/CCU_MAC) diff --git a/Docker/lofar-ci/Dockerfile_ci_scu b/Docker/lofar-ci/Dockerfile_ci_scu index 5ccbe5435d266f98c6ff6734bad13a5b09eaa701..ef581a80eaf0eb70ca3d2b58334baab6ef53de27 100644 --- a/Docker/lofar-ci/Dockerfile_ci_scu +++ b/Docker/lofar-ci/Dockerfile_ci_scu @@ -7,7 +7,7 @@ ARG BASE_VERSION=latest FROM ci_base:$BASE_VERSION RUN echo "Installing packages for SAS..." && \ - yum install -y log4cplus log4cplus-devel python3 python3-libs python3-devel boost readline-devel boost-devel binutils-devel boost-python36 boost-python36-devel gettext which openldap-devel git java-11-openjdk python-twisted-core graphviz + yum install -y log4cplus log4cplus-devel python3 python3-libs python3-devel boost readline-devel boost-devel binutils-devel boost-python36 boost-python36-devel gettext which openldap-devel git java-11-openjdk python-twisted-core graphviz libaio # see https://www.postgresql.org/download/linux/redhat/ on how to install postgresql-server > 9.2 on centos7 RUN yum erase -y postgresql postgresql-server postgresql-devel && \ @@ -16,7 +16,15 @@ RUN yum erase -y postgresql postgresql-server postgresql-devel && \ cd /bin && ln -s /usr/pgsql-9.6/bin/initdb && ln -s /usr/pgsql-9.6/bin/postgres ENV PATH /usr/pgsql-9.6/bin:$PATH -RUN pip3 install cython kombu lxml requests pygcn xmljson mysql-connector-python python-dateutil Django==3.0.9 djangorestframework==3.11.1 djangorestframework-xml ldap==1.0.2 flask fabric coverage python-qpid-proton PyGreSQL numpy h5py psycopg2 testing.postgresql Flask-Testing scipy Markdown django-filter python-ldap python-ldap-test ldap3 django-jsonforms django-json-widget django-jsoneditor drf-yasg flex swagger-spec-validator django-auth-ldap mozilla-django-oidc jsonschema comet pyxb==1.2.5 graphviz isodate astropy packaging django-debug-toolbar pymysql astroplan SimpleWebSocketServer websocket_client drf-flex-fields django-property-filter +# Oracle decided in all its wisdom to not make use of rpm/deb +# So, we're forced to download the Oracle client packages, and configure the paths +RUN mkdir -p /opt/oracle && \ + cd /opt/oracle && \ + wget https://download.oracle.com/otn_software/linux/instantclient/211000/instantclient-basic-linux.x64-21.1.0.0.0.zip && \ + unzip instantclient-basic-linux.x64-21.1.0.0.0.zip +ENV LD_LIBRARY_PATH /opt/oracle/instantclient_21_1:$LD_LIBRARY_PATH + +RUN pip3 install cython kombu lxml requests pygcn xmljson mysql-connector-python python-dateutil Django==3.0.9 djangorestframework==3.11.1 djangorestframework-xml ldap==1.0.2 flask fabric coverage python-qpid-proton PyGreSQL numpy h5py psycopg2 testing.postgresql Flask-Testing scipy Markdown django-filter python-ldap python-ldap-test ldap3 django-jsonforms django-json-widget django-jsoneditor drf-yasg flex swagger-spec-validator django-auth-ldap mozilla-django-oidc jsonschema comet pyxb==1.2.5 graphviz isodate astropy packaging django-debug-toolbar pymysql astroplan SimpleWebSocketServer websocket_client drf-flex-fields django-property-filter cx_Oracle #Viewflow package RUN pip3 install django-material django-viewflow diff --git a/LCS/PyCommon/cep4_utils.py b/LCS/PyCommon/cep4_utils.py index 5326fd90ce8c351f8858cfd4e71a9843aa33e996..f2dfd20cd7a30534c9cc179fb8f718acf32e14b5 100755 --- a/LCS/PyCommon/cep4_utils.py +++ b/LCS/PyCommon/cep4_utils.py @@ -43,7 +43,21 @@ def wrap_command_in_cep4_head_node_ssh_call(cmd): :param list cmd: a subprocess cmd list :return: the same subprocess cmd list, but then wrapped with cep4 ssh calls ''' - ssh_cmd = ssh_cmd_list(user='lofarsys', host='head.cep4.control.lofar') + return wrap_command_in_ssh_call(cmd, user='lofarsys', host='head.cep4.control.lofar') + +def wrap_command_in_ssh_call(cmd, host, user='lofarsys'): + '''wrap the command in an ssh call for the given user on the given host + :param list cmd: a subprocess cmd list + :param host: the node name or ip address where to run the cmd via ssh + :param user: optional username for ssh login, defaults to 'lofarsys' + :return: the same subprocess cmd list, but then wrapped with cep4 ssh calls + ''' + ssh_cmd = ssh_cmd_list(user=user, host=host) + + # "forward" the current LOFARENV if present + if 'LOFARENV' in os.environ: + ssh_cmd += ['LOFARENV=%s' % (os.environ['LOFARENV'],)] + return ssh_cmd + ([cmd] if isinstance(cmd, str) else cmd) def wrap_command_in_cep4_random_node_ssh_call(cmd, partition: str=SLURM_CPU_PARTITION, via_head=True): @@ -76,8 +90,7 @@ def wrap_command_in_cep4_node_ssh_call(cmd, node_nr, partition=SLURM_CPU_PARTITI :param bool via_head: when True, route the cmd first via the cep4 head node :return: the same subprocess cmd list, but then wrapped with cep4 ssh calls ''' - ssh_cmd = ssh_cmd_list(host='%s%02d.cep4' % (partition, node_nr), user='lofarsys') - remote_cmd = ssh_cmd + ([cmd] if isinstance(cmd, str) else cmd) + remote_cmd = wrap_command_in_ssh_call(cmd, host='%s%02d.cep4' % (partition, node_nr), user='lofarsys') if via_head: return wrap_command_in_cep4_head_node_ssh_call(remote_cmd) else: @@ -101,6 +114,10 @@ def wrap_command_for_docker(cmd, image_name, image_label='', mount_dirs=['/data' for d in mount_dirs: dockerized_cmd += ['-v', '%s:%s' % (d,d)] + # "forward" the current LOFARENV if present + if 'LOFARENV' in os.environ: + dockerized_cmd += ['-e', 'LOFARENV=%s' % (os.environ['LOFARENV'],)] + dockerized_cmd += ['-u', id_string, '-v', '/etc/passwd:/etc/passwd:ro', '-v', '/etc/group:/etc/group:ro', diff --git a/LCS/pyparameterset/src/__init__.py b/LCS/pyparameterset/src/__init__.py index 3f55a4550ae93e414a20015ddadb343173d5d4e6..353081407293b57681ff01e0ee0bfde85ef10335 100755 --- a/LCS/pyparameterset/src/__init__.py +++ b/LCS/pyparameterset/src/__init__.py @@ -161,7 +161,23 @@ class parameterset(PyParameterSet): Splits the string in lines, and parses each '=' seperated key/value pair. ''' lines = [l.strip() for l in parset_string.split('\n')] - kv_pairs = [tuple(l.split('=')) for l in lines if '=' in l] + if len(lines) == 1 and parset_string.count('=') > 1: + # the given parset_string lacks proper line endings. + # try to split the single-line-parset_string into proper lines, and reparse. + # a parset line is made of three parts: <key> = <value> + # the <key> contains no whitespace, the '=' can be surrounded by whitespace, and the value can contain whitespace as well. + # so, split the string at each '=', strip the ends of the parts, and extract the key-value pairs + parts = [part.strip() for part in parset_string.split('=')] + kv_pairs = [] + key = parts[0] + for part in parts[1:-1]: + part_parts = part.split() + value = ' '.join(part_parts[:-1]) + kv_pairs.append((key.strip(),value.strip())) + key = part_parts[-1] + kv_pairs.append((key.strip(),parts[-1].strip())) + else: + kv_pairs = [tuple(l.split('=')) for l in lines if '=' in l] parset_dict = dict(kv_pairs) return parameterset(parset_dict) @@ -262,4 +278,5 @@ class parameterset(PyParameterSet): def __str__(self): """:returns the parset in a human readable string (lines of key=value, sorted by key)""" - return '\n'.join("%s=%s" % (key, self[key]) for key in sorted(self.keys())) \ No newline at end of file + return '\n'.join("%s=%s" % (key, self[key]) for key in sorted(self.keys())) + diff --git a/LTA/LTACatalogue/CMakeLists.txt b/LTA/LTACatalogue/CMakeLists.txt index 6f06bdabd4ef21ff2509710d566fb7d51c588563..55ca7247a732e8f4df6c15138e081213d12a0bdb 100644 --- a/LTA/LTACatalogue/CMakeLists.txt +++ b/LTA/LTACatalogue/CMakeLists.txt @@ -1,4 +1,9 @@ lofar_package(LTACatalogue 1.0 DEPENDS PyCommon) -python_install(lta_catalogue_db.py - DESTINATION lofar/lta) +IF(NOT SKIP_TMSS_BUILD) + include(FindPythonModule) + find_python_module(cx_Oracle REQUIRED) # pip3 install cx_Oracle + + python_install(lta_catalogue_db.py + DESTINATION lofar/lta) +ENDIF(NOT SKIP_TMSS_BUILD) diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestjobmanagementserver.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestjobmanagementserver.py index 7a3a343006619239c6e4485a12d143cefc72dee4..78d2cd998044a685f9b5eba177d30f43648f72b8 100644 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestjobmanagementserver.py +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestjobmanagementserver.py @@ -1185,9 +1185,11 @@ Total Files: %(total)i extra_mail_addresses.append(project_details['pi_email']) if project_details and 'author_email' in project_details: extra_mail_addresses.append(project_details['author_email']) + if project_details and 'friend_email' in project_details: + extra_mail_addresses.append(project_details['friend_email']) if not extra_mail_addresses: - report += '\n\nCould not find any PI\'s/Contact-author\'s email address in MoM to sent this email to.' + report += '\n\nCould not find any PI\'s/Contact-author\'s/Friends email address in MoM to sent this email to.' except Exception as e: msg = 'error while trying to get PI\'s/Contact-author\'s email address for %s: %s' % (job_group_id, e) diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingesttmssadapter.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingesttmssadapter.py index 8b2222ac49cb062a0468328a3d24b72befd04b7e..c8fb0dfcd88882dca08aacf084a82d82659a4199 100644 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingesttmssadapter.py +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingesttmssadapter.py @@ -130,6 +130,10 @@ class TMSSEventMessageHandlerForIngestTMSSAdapter(UsingToBusMixin, TMSSEventMess UsingToBusMixin.stop_handling(self) self.tmss_client.close() + def init_tobus(self, exchange, broker): + logger.warning("FOR COMMISSIONING WE LET THE INGESTTMSSADAPTER SEND ITS INGEST JOBS TO THE PRODUCTION BROKER!") + self._tobus = ToBus(exchange='lofar', broker='scu001.control.lofar') + def onSubTaskStatusChanged(self, id: int, status: str): super().onSubTaskStatusChanged(id, status) @@ -178,11 +182,11 @@ class IngestTMSSAdapter: def __init__(self, tmss_creds: DBCredentials, exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER): self.ingest2tmss_adapter = IngestEventMesssageBusListener(handler_type=IngestEventMessageHandlerForIngestTMSSAdapter, handler_kwargs={'tmss_creds': tmss_creds}, - exchange=exchange, broker=broker) + exchange='lofar', broker='scu001.control.lofar') # TODO: replace hardcoded commissioning brokers by parameters self.tmss2ingest_adapter = TMSSBusListener(handler_type=TMSSEventMessageHandlerForIngestTMSSAdapter, handler_kwargs={'tmss_creds': tmss_creds}, routing_key=TMSS_SUBTASK_STATUS_EVENT_PREFIX+'.#', - exchange=exchange, broker=broker) + exchange='test.lofar', broker='scu199.control.lofar') # TODO: replace hardcoded commissioning brokers by parameters def open(self): self.ingest2tmss_adapter.start_listening() diff --git a/MAC/APL/MainCU/src/MACScheduler/MACScheduler.cc b/MAC/APL/MainCU/src/MACScheduler/MACScheduler.cc index 956ec1abab2fba1f16b638da5de8b15d784c731e..070672448124bd3697786a99704e1dd4ca1c3ec9 100644 --- a/MAC/APL/MainCU/src/MACScheduler/MACScheduler.cc +++ b/MAC/APL/MainCU/src/MACScheduler/MACScheduler.cc @@ -922,6 +922,8 @@ void MACScheduler::_updatePlannedList() OLiter prepIter = itsPreparedObs.find(subtask_id); if ((prepIter == itsPreparedObs.end()) || (prepIter->second.prepReady == false) || (prepIter->second.modTime != modTime)) { + itsTMSSconnection->setSubtaskState(subtask_id, "queueing"); + // create a ParameterFile for this Observation string parsetText = itsTMSSconnection->getParsetAsText(subtask_id); if(prepIter == itsPreparedObs.end()) { diff --git a/MAC/Services/src/PipelineControl.py b/MAC/Services/src/PipelineControl.py index 678101c0faf8744472b0cc2ccfa5ffb394a57463..0a92b224556962528d5bb9efd0bafd91c77083da 100755 --- a/MAC/Services/src/PipelineControl.py +++ b/MAC/Services/src/PipelineControl.py @@ -551,16 +551,16 @@ class PipelineControlTMSSHandler(TMSSEventMessageHandler): runPipeline.sh -o {obsid} -c /opt/lofar/share/pipeline/pipeline.cfg.{cluster} -P {parset_dir} -p {parset_file} RESULT=$? - # notify that we're tearing down - runcmd {setStatus_finishing} - + if [ $RESULT -eq 0 ]; then # if we reached this point, the pipeline ran succesfully, and TMSS will set it to finished once it processed the feedback # notify ganglia # !!! TODO Is TMSS supposed to inform Ganglia in future? wget -O - -q "http://ganglia.control.lofar/ganglia/api/events.php?action=add&start_time=now&summary=Pipeline {obsid} FINISHED&host_regex=" else - # !!! TODO: How to set an "unsuccesfull" finished state in TMSS? + # notify TMSS that we finished and some error occured + runcmd {setStatus_finished} + runcmd {setStatus_error} fi # report status back to SLURM @@ -578,8 +578,8 @@ class PipelineControlTMSSHandler(TMSSEventMessageHandler): getParset=getParset_cmdline(), setStatus_starting=setStatus_cmdline("starting"), setStatus_started=setStatus_cmdline("started"), - setStatus_finishing=setStatus_cmdline("finishing"), setStatus_finished=setStatus_cmdline("finished"), + setStatus_error=setStatus_cmdline("error") ), sbatch_params=sbatch_params @@ -590,14 +590,10 @@ class PipelineControlTMSSHandler(TMSSEventMessageHandler): logger.info("Scheduling SLURM job for pipelineAborted.sh") slurm_cancel_job_id = self.slurm.submit("%s-abort-trigger" % self._jobName(subtask_id), """ - # notify TMSS - {setStatus_finished} - # notify ganglia wget -O - -q "http://ganglia.control.lofar/ganglia/api/events.php?action=add&start_time=now&summary=Pipeline {obsid} ABORTED&host_regex=" """ .format( - setStatus_finished=setStatus_cmdline("finished"), obsid=subtask_id, ), diff --git a/SAS/MoM/MoMSimpleAPIs/momdbclient.py b/SAS/MoM/MoMSimpleAPIs/momdbclient.py index ee5fbc83ebbe9a6cc77c908dcaf5979418375ef9..71f36c29b36660843b21abb9731fdcfd8c83f083 100755 --- a/SAS/MoM/MoMSimpleAPIs/momdbclient.py +++ b/SAS/MoM/MoMSimpleAPIs/momdbclient.py @@ -540,7 +540,7 @@ join """ + self.useradministration_db + """.useraccount as useraccount on regist join """ + self.useradministration_db + """.user as user on user.id=useraccount.id join memberprojectrole as member_project_role on member_project_role.memberid=member.id join projectrole as project_role on project_role.id=member_project_role.projectroleid -where project.mom2id = %s and (project_role.name = "Pi" or project_role.name = "Contact author");""" +where project.mom2id = %s and (project_role.name = "Pi" or project_role.name = "Contact author" or project_role.name = "Friend");""" parameters = (mom_id, ) rows = self._executeSelectQuery(query, parameters) @@ -552,6 +552,8 @@ where project.mom2id = %s and (project_role.name = "Pi" or project_role.name = " result["pi_email"] = row["email"] if row["name"] == "Contact author": result["author_email"] = row["email"] + if row["name"] == "Friend": + result["friend_email"] = row["email"] logger.info("get_project_details for mom_id (%s): %s", mom_id, result) diff --git a/SAS/TMSS/backend/CMakeLists.txt b/SAS/TMSS/backend/CMakeLists.txt index bf7aa263dfa84fc18f8b02c17049db8ce9bd8de8..103fea06e42810122f2937c612ca1af0dad96ee9 100644 --- a/SAS/TMSS/backend/CMakeLists.txt +++ b/SAS/TMSS/backend/CMakeLists.txt @@ -1,10 +1,13 @@ lofar_package(TMSSBackend 0.1 DEPENDS TMSSClient PyCommon pyparameterset PyMessaging ResourceAssigner TaskPrescheduler sip) -add_subdirectory(src) -add_subdirectory(bin) -add_subdirectory(test) +IF(NOT SKIP_TMSS_BUILD) + add_subdirectory(src) + add_subdirectory(test) +ENDIF(NOT SKIP_TMSS_BUILD) +add_subdirectory(bin) lofar_add_package(TMSSServices services) + diff --git a/SAS/TMSS/backend/services/feedback_handling/CMakeLists.txt b/SAS/TMSS/backend/services/feedback_handling/CMakeLists.txt index af48000ed59e13275a9709b0a9ce4bc2c423fd2c..114a8acc4b6855b842ce9af4d63738a692be2efc 100644 --- a/SAS/TMSS/backend/services/feedback_handling/CMakeLists.txt +++ b/SAS/TMSS/backend/services/feedback_handling/CMakeLists.txt @@ -2,7 +2,9 @@ lofar_package(TMSSFeedbackHandlingService 0.1 DEPENDS TMSSClient PyCommon pypara lofar_find_package(PythonInterp 3.4 REQUIRED) -add_subdirectory(lib) -add_subdirectory(bin) -add_subdirectory(test) +IF(NOT SKIP_TMSS_BUILD) + add_subdirectory(lib) + add_subdirectory(test) +ENDIF(NOT SKIP_TMSS_BUILD) +add_subdirectory(bin) diff --git a/SAS/TMSS/backend/services/feedback_handling/lib/feedback_handling.py b/SAS/TMSS/backend/services/feedback_handling/lib/feedback_handling.py index 6ad64b4bb7509dfc0d522bfcf4a9b660f21dd7b1..bbff5b48f2c88b5e31e3ab167e953a7e07b948d3 100644 --- a/SAS/TMSS/backend/services/feedback_handling/lib/feedback_handling.py +++ b/SAS/TMSS/backend/services/feedback_handling/lib/feedback_handling.py @@ -110,11 +110,6 @@ class HybridFeedbackMessageHandler(TMSSEventMessageHandler): if status == 'finishing': subtask = self._tmss_client.get_subtask(id) self._init_wait_timeout_for_finishing_observation_or_pipeline_subtask(subtask) - - # sometimes the feedback arrived earlier than the 'finishing' state change -> a classic race condition - # we can handle that by reprocessing the raw_feedback which may already be there. - subtask = self._tmss_client.reprocess_raw_feedback_for_subtask_and_set_to_finished_if_complete(id) - elif status in ('finished', 'cancelling', 'cancelled', 'error'): if id in self._finishing_subtasks: wait_timeout_timestamp = self._finishing_subtasks[id] diff --git a/SAS/TMSS/backend/services/scheduling/CMakeLists.txt b/SAS/TMSS/backend/services/scheduling/CMakeLists.txt index 34de269349de481543af911fa1ad28162fb07b2f..9f17c276c03e888be70b4e59ac55a2dabd967a98 100644 --- a/SAS/TMSS/backend/services/scheduling/CMakeLists.txt +++ b/SAS/TMSS/backend/services/scheduling/CMakeLists.txt @@ -2,10 +2,13 @@ lofar_package(TMSSSchedulingService 0.1 DEPENDS TMSSClient PyCommon pyparameters lofar_find_package(PythonInterp 3.4 REQUIRED) -include(FindPythonModule) -find_python_module(astroplan REQUIRED) # pip3 install astroplan +IF(NOT SKIP_TMSS_BUILD) + include(FindPythonModule) + find_python_module(astroplan REQUIRED) # pip3 install astroplan + + add_subdirectory(lib) + add_subdirectory(test) +ENDIF(NOT SKIP_TMSS_BUILD) -add_subdirectory(lib) add_subdirectory(bin) -add_subdirectory(test) diff --git a/SAS/TMSS/backend/services/scheduling/lib/constraints/__init__.py b/SAS/TMSS/backend/services/scheduling/lib/constraints/__init__.py index f114d276d99e8991d30c23e6e7fff5f28033ea60..28ea03e36f9644edc67545648ab526fde80bc4bb 100644 --- a/SAS/TMSS/backend/services/scheduling/lib/constraints/__init__.py +++ b/SAS/TMSS/backend/services/scheduling/lib/constraints/__init__.py @@ -233,7 +233,7 @@ def get_earliest_possible_start_time(scheduling_unit: models.SchedulingUnitBluep def get_min_earliest_possible_start_time(scheduling_units: [models.SchedulingUnitBlueprint], lower_bound: datetime) -> datetime: '''deterimine the earliest possible starttime over all given scheduling units, taking into account all their constraints''' try: - return min(get_earliest_possible_start_time(scheduling_unit, lower_bound) for scheduling_unit in scheduling_units) + return min(get_earliest_possible_start_time(scheduling_unit, lower_bound) for scheduling_unit in scheduling_units if scheduling_unit.draft.scheduling_constraints_template is not None) except ValueError: return lower_bound diff --git a/SAS/TMSS/backend/services/scheduling/lib/constraints/template_constraints_v1.py b/SAS/TMSS/backend/services/scheduling/lib/constraints/template_constraints_v1.py index f9e2aab204e1b49dfddbb9c2019342a9150cbf9d..9de1309082ee8e2d8d9150460480fc8ae1e05fe7 100644 --- a/SAS/TMSS/backend/services/scheduling/lib/constraints/template_constraints_v1.py +++ b/SAS/TMSS/backend/services/scheduling/lib/constraints/template_constraints_v1.py @@ -259,10 +259,11 @@ def get_earliest_possible_start_time(scheduling_unit: models.SchedulingUnitBluep try: if has_manual_scheduler_constraint(scheduling_unit) and 'at' in constraints['time']: at = parser.parse(constraints['time']['at'], ignoretz=True) - return at + return max(lower_bound, at) if 'after' in constraints['time']: - return parser.parse(constraints['time']['after'], ignoretz=True) + after = parser.parse(constraints['time']['after'], ignoretz=True) + return max(lower_bound, after) if constraints['daily']['require_day'] or constraints['daily']['require_night'] or constraints['daily']['avoid_twilight']: station_groups = scheduling_unit.requirements_doc['tasks'][main_observation_task_name]['specifications_doc']["station_groups"] diff --git a/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py b/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py index d921996e78fd30cfa6c0329393b2d02aa9d05cd9..8dba705f7322eb1176c6925ef981874bec52f054 100644 --- a/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py +++ b/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py @@ -82,7 +82,11 @@ def schedule_next_scheduling_unit() -> models.SchedulingUnitBlueprint: :return: the scheduled scheduling unit.''' # --- setup of needed variables --- - schedulable_units = get_schedulable_scheduling_units() + schedulable_units = get_dynamically_schedulable_scheduling_units() + + if len(schedulable_units) == 0: + logger.info("No scheduling units found...") + return # estimate the lower_bound_start_time lower_bound_start_time = get_min_earliest_possible_start_time(schedulable_units, datetime.utcnow()+DEFAULT_NEXT_STARTTIME_GAP) @@ -127,7 +131,10 @@ def schedule_next_scheduling_unit() -> models.SchedulingUnitBlueprint: # nothing was found, or an error occurred. # seach again... (loop) with the remaining schedulable_units and new lower_bound_start_time - schedulable_units = get_schedulable_scheduling_units() + schedulable_units = get_dynamically_schedulable_scheduling_units() + if len(schedulable_units) == 0: + logger.info("No scheduling units found...") + return lower_bound_start_time = get_min_earliest_possible_start_time(schedulable_units, lower_bound_start_time + timedelta(hours=1)) @@ -135,7 +142,11 @@ def assign_start_stop_times_to_schedulable_scheduling_units(lower_bound_start_ti '''''' logger.info("Estimating mid-term schedule...") - scheduling_units = get_schedulable_scheduling_units() + scheduling_units = get_dynamically_schedulable_scheduling_units() + + if len(scheduling_units) == 0: + logger.info("No scheduling units found...") + return upper_bound_stop_time = lower_bound_start_time + timedelta(days=365) @@ -279,11 +290,13 @@ def create_dynamic_scheduling_service(exchange: str=DEFAULT_BUSNAME, broker: str ################## helper methods ################################################################# -def get_schedulable_scheduling_units() -> [models.SchedulingUnitBlueprint]: - '''get a list of all schedulable scheduling_units''' +def get_dynamically_schedulable_scheduling_units() -> [models.SchedulingUnitBlueprint]: + '''get a list of all dynamically schedulable scheduling_units''' defined_independend_subtasks = models.Subtask.independent_subtasks().filter(state__value='defined') defined_independend_subtask_ids = defined_independend_subtasks.values('task_blueprint__scheduling_unit_blueprint_id').distinct().all() - scheduling_units = models.SchedulingUnitBlueprint.objects.filter(id__in=defined_independend_subtask_ids).select_related('draft', 'draft__scheduling_constraints_template').all() + scheduling_units = models.SchedulingUnitBlueprint.objects.filter(id__in=defined_independend_subtask_ids) \ + .filter(draft__scheduling_constraints_template__isnull=False) \ + .select_related('draft', 'draft__scheduling_constraints_template').all() return [su for su in scheduling_units if su.status == 'schedulable'] diff --git a/SAS/TMSS/backend/services/tmss_lta_adapter/CMakeLists.txt b/SAS/TMSS/backend/services/tmss_lta_adapter/CMakeLists.txt index c71d7b72950b64ea247b49f146e809cad35b3c72..c9d027f0ce5839dc51490f90429466b69a87cf31 100644 --- a/SAS/TMSS/backend/services/tmss_lta_adapter/CMakeLists.txt +++ b/SAS/TMSS/backend/services/tmss_lta_adapter/CMakeLists.txt @@ -1,5 +1,7 @@ lofar_package(TMSSLTAAdapter 0.1 DEPENDS TMSSClient LTACatalogue) -add_subdirectory(lib) +IF(NOT SKIP_TMSS_BUILD) + add_subdirectory(lib) +ENDIF(NOT SKIP_TMSS_BUILD) add_subdirectory(bin) diff --git a/SAS/TMSS/backend/services/websocket/CMakeLists.txt b/SAS/TMSS/backend/services/websocket/CMakeLists.txt index ba899270ef576cc4bff54cdfe1c3ffd4dc69b525..de02703279e29aa6db07e43737780a3a9a5525e3 100644 --- a/SAS/TMSS/backend/services/websocket/CMakeLists.txt +++ b/SAS/TMSS/backend/services/websocket/CMakeLists.txt @@ -2,10 +2,13 @@ lofar_package(TMSSWebSocketService 0.1 DEPENDS TMSSClient PyCommon pyparameterse lofar_find_package(PythonInterp 3.6 REQUIRED) -include(FindPythonModule) -find_python_module(SimpleWebSocketServer REQUIRED) # sudo pip3 install SimpleWebSocketServer +IF(NOT SKIP_TMSS_BUILD) + include(FindPythonModule) + find_python_module(SimpleWebSocketServer REQUIRED) # sudo pip3 install SimpleWebSocketServer + + add_subdirectory(lib) + add_subdirectory(test) +ENDIF(NOT SKIP_TMSS_BUILD) -add_subdirectory(lib) add_subdirectory(bin) -add_subdirectory(test) diff --git a/SAS/TMSS/backend/src/tmss/settings.py b/SAS/TMSS/backend/src/tmss/settings.py index 9c90529f05ecbd309b51f5f01d9fc6b6dd4e02b7..26e34cb4b8771950ea51aca0f9416ae6eecf38c8 100644 --- a/SAS/TMSS/backend/src/tmss/settings.py +++ b/SAS/TMSS/backend/src/tmss/settings.py @@ -48,7 +48,7 @@ LOGGING = { }, 'django.request': { 'handlers': ['console'], - 'level': 'DEBUG', # change debug level as appropiate + 'level': 'INFO', # change debug level as appropiate 'propagate': False, }, # 'django.db.backends': { # uncomment to enable logging of each db query. Very spammy and slow, but also usefull for performance improvement. Gives more even detail/insight than django debug toolbar. diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/feedback.py b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/feedback.py index 5617923eff2b71cfd3d84115732b2c426fad2926..9afe50a2c4b9cb3f2c021ec33bcf5d19053a0465 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/feedback.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/feedback.py @@ -33,7 +33,7 @@ def process_feedback_into_subtask_dataproducts(subtask:Subtask, feedback: parame (subtask.id, subtask.specifications_template.type.value, [SubtaskType.Choices.OBSERVATION.value, SubtaskType.Choices.PIPELINE.value])) - if subtask.state.value not in (SubtaskState.objects.get(value='started').value, SubtaskState.objects.get(value='finishing').value): + if subtask.state.value != SubtaskState.objects.get(value='finishing').value: raise SubtaskInvalidStateException("Cannot process feedback for subtask id=%s because the state is '%s' and not '%s'" % (subtask.id, subtask.state.value, SubtaskState.Choices.FINISHING.value)) logger.info('processing feedback into the dataproducts of subtask id=%s type=%s feedback: %s', subtask.id, subtask.specifications_template.type.value, single_line_with_single_spaces(str(feedback))) @@ -53,9 +53,9 @@ def process_feedback_into_subtask_dataproducts(subtask:Subtask, feedback: parame # determine corresponding TMSS dataproduct try: - dataproduct = subtask.output_dataproducts.get(filename=dp_feedback['filename']) - except Dataproduct.DoesNotExist: - logger.error("cannot process feedback for %s. No such output dataproduct known for subtask id=%s", dp_feedback['filename'], subtask.id) + dataproduct = subtask.output_dataproducts.get(filename=dp_feedback.get('filename')) + except Exception as e: + logger.error("cannot process feedback: %s. No output dataproduct known for subtask id=%s feedback: %s", e, subtask.id, dp_feedback) continue try: @@ -128,6 +128,7 @@ def process_feedback_into_subtask_dataproducts(subtask:Subtask, feedback: parame except Exception as e: logger.error('error while processing feedback for dataproduct id=%s filename=%s feedback=%s error: %s', dataproduct.id, dataproduct.filename, dp_feedback, e) + subtask.refresh_from_db() return subtask @@ -150,6 +151,12 @@ def process_feedback_for_subtask_and_set_to_finished_if_complete(subtask: Subtas Translates raw feedback from a subtask (which has been provided by Cobalt or pipelines) and translate it to json documents for the individual dataproducts. """ + if subtask.state.value == SubtaskState.objects.get(value='started').value: + logger.info("received feedback for subtask id=%s while it is still running (state=%s). Setting state to 'finishing' and stop_time to now.", subtask.id, subtask.state.value) + subtask.state = SubtaskState.objects.get(value='finishing') + subtask.stop_time = datetime.utcnow() + subtask.save() + # the submitted feedback_doc is (should be) a plain text document in parset format # so, treat it as a parset new_feedback_parset = parameterset.fromString(feedback_doc) @@ -174,11 +181,21 @@ def reprocess_raw_feedback_for_subtask_and_set_to_finished_if_complete(subtask: Reprocesses the stored raw feedback from a subtask (which has been provided by Cobalt or pipelines) and translate it to json documents for the individual dataproducts. """ - if subtask.state.value == SubtaskState.objects.get(value='finished').value: - # allow finished subtask to go to finishing, reprocess, and then go to finished again if complete - subtask.state.value = SubtaskState.objects.get(value='finishing').value + if not subtask.raw_feedback: + raise SubtaskException("Cannot reprocess raw_feedback for subtask id=%s because it is empty" % (subtask.id)) + + try: + raw_feedback_parset = parameterset.fromString(subtask.raw_feedback) + except Exception as e: + raise SubtaskException("Cannot reprocess raw_feedback for subtask id=%s because it cannot be interpreted as parset. Error=%s\n%s" % (subtask.id, str(e), subtask.raw_feedback)) + + if subtask.state.value in (SubtaskState.objects.get(value='started').value, SubtaskState.objects.get(value='finished').value): + # allow started/running subtask to go to finishing, (re)process, and then go to finished if complete + # allow finished subtask to go back to finishing, reprocess, and then go to finished again if complete + subtask.state = SubtaskState.objects.get(value='finishing') + subtask.save() - subtask = process_feedback_into_subtask_dataproducts(subtask, parameterset.fromString(subtask.raw_feedback)) + subtask = process_feedback_into_subtask_dataproducts(subtask, raw_feedback_parset) # if complete, set subtask state to finished if subtask.is_feedback_complete: diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/migrations/0001_initial.py b/SAS/TMSS/backend/src/tmss/tmssapp/migrations/0001_initial.py index 92a733c311d257a8a786d9a700284cf905114d2c..2df67c9b37f610d3abe48ac11aeaa440843794f5 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/migrations/0001_initial.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/migrations/0001_initial.py @@ -1,4 +1,4 @@ -# Generated by Django 3.0.9 on 2021-02-18 15:51 +# Generated by Django 3.0.9 on 2021-02-22 09:32 from django.conf import settings import django.contrib.postgres.fields @@ -581,7 +581,7 @@ class Migration(migrations.Migration): ('ingest_permission_required', models.BooleanField(default=False, help_text='Explicit permission is needed before the task.')), ('ingest_permission_granted_since', models.DateTimeField(help_text='The moment when ingest permission was granted.', null=True)), ('output_data_allowed_to_be_ingested', models.BooleanField(default=False, help_text='boolean (default FALSE), which blocks Ingest Tasks from starting if OFF. When toggled ON, backend must scan for startable Ingest Tasks.')), - ('output_data_allowed_to_be_deleted', models.BooleanField(default=False, help_text='boolean (default FALSE), which blocks deleting unpinned dataproducts. When toggled ON, backend must pick SUB up for deletion. It also must when dataproducts are unpinned.')), + ('output_pinned', models.BooleanField(default=False, help_text='boolean (default FALSE), which blocks deleting unpinned dataproducts. When toggled ON, backend must pick SUB up for deletion. It also must when dataproducts are unpinned.')), ('results_accepted', models.BooleanField(default=False, help_text='boolean (default NULL), which records whether the results were accepted, allowing the higher-level accounting to be adjusted.')), ], options={ @@ -1151,7 +1151,7 @@ class Migration(migrations.Migration): migrations.AddField( model_name='schedulingunitblueprint', name='draft', - field=models.ForeignKey(help_text='Scheduling Unit Draft which this run instantiates.', on_delete=django.db.models.deletion.CASCADE, related_name='scheduling_unit_blueprints', to='tmssapp.SchedulingUnitDraft'), + field=models.ForeignKey(help_text='Scheduling Unit Draft which this run instantiates.', on_delete=django.db.models.deletion.PROTECT, related_name='scheduling_unit_blueprints', to='tmssapp.SchedulingUnitDraft'), ), migrations.AddField( model_name='schedulingunitblueprint', diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/models/scheduling.py b/SAS/TMSS/backend/src/tmss/tmssapp/models/scheduling.py index 39165fae8e5bc0893f773be2a2116f221c1b461c..9535b3c3d9732a2ae062b3186717697a5f853cd4 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/models/scheduling.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/models/scheduling.py @@ -236,8 +236,9 @@ class Subtask(BasicCommon): @property def is_feedback_complete(self) -> bool: - '''returns True if the feedback for all output dataproducts is filled in for a non-"empty"-template''' - return self.output_dataproducts.filter(feedback_template__isnull=False).exclude(feedback_template__name="empty").count() == self.output_dataproducts.count() + '''returns True if the feedback for all (>0) output dataproducts is filled in for a non-"empty"-template''' + nr_of_output_dataproducts = self.output_dataproducts.count() + return nr_of_output_dataproducts > 0 and self.output_dataproducts.filter(feedback_template__isnull=False).exclude(feedback_template__name="empty").count() == nr_of_output_dataproducts @property def progress(self) -> float: @@ -291,9 +292,6 @@ class Subtask(BasicCommon): if self.start_time is None: raise SubtaskSchedulingException("Cannot schedule subtask id=%s when start time is 'None'." % (self.pk, )) - if self.raw_feedback and '\\n' in self.raw_feedback: - self.raw_feedback = self.raw_feedback.replace('\\n', '\n') - super().save(force_insert, force_update, using, update_fields) # log if either state update or new entry: diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/models/specification.py b/SAS/TMSS/backend/src/tmss/tmssapp/models/specification.py index 7198dfcf39f3baac714dfe2c6620546324272990..86d6d04c5853b0cd78367a5fa8699fdcb75ae529 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/models/specification.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/models/specification.py @@ -404,9 +404,9 @@ class SchedulingUnitBlueprint(NamedCommon): ingest_permission_required = BooleanField(default=False, help_text='Explicit permission is needed before the task.') ingest_permission_granted_since = DateTimeField(auto_now_add=False, null=True, help_text='The moment when ingest permission was granted.') requirements_template = ForeignKey('SchedulingUnitTemplate', on_delete=CASCADE, help_text='Schema used for requirements_doc (IMMUTABLE).') - draft = ForeignKey('SchedulingUnitDraft', related_name='scheduling_unit_blueprints', on_delete=CASCADE, help_text='Scheduling Unit Draft which this run instantiates.') + draft = ForeignKey('SchedulingUnitDraft', related_name='scheduling_unit_blueprints', on_delete=PROTECT, help_text='Scheduling Unit Draft which this run instantiates.') output_data_allowed_to_be_ingested = BooleanField(default=False, help_text='boolean (default FALSE), which blocks Ingest Tasks from starting if OFF. When toggled ON, backend must scan for startable Ingest Tasks.') - output_data_allowed_to_be_deleted = BooleanField(default=False, help_text='boolean (default FALSE), which blocks deleting unpinned dataproducts. When toggled ON, backend must pick SUB up for deletion. It also must when dataproducts are unpinned.') + output_pinned = BooleanField(default=False, help_text='boolean (default FALSE), which blocks deleting unpinned dataproducts. When toggled ON, backend must pick SUB up for deletion. It also must when dataproducts are unpinned.') results_accepted = BooleanField(default=False, help_text='boolean (default NULL), which records whether the results were accepted, allowing the higher-level accounting to be adjusted.') def save(self, force_insert=False, force_update=False, using=None, update_fields=None): @@ -529,11 +529,6 @@ class SchedulingUnitBlueprint(NamedCommon): '''Can this scheduling unit proceed with running its tasks?''' return self.status not in [SchedulingUnitBlueprint.Status.ERROR.value, SchedulingUnitBlueprint.Status.FINISHED.value, SchedulingUnitBlueprint.Status.CANCELLED.value] - @property - def output_pinned(self) -> bool: - '''Is the output_pinned flag True for any of its task_blueprints?''' - return any(task.output_pinned for task in self.task_blueprints.all()) - def _task_graph_instantiated(self): return self._get_total_nbr_tasks() > 0 @@ -746,7 +741,7 @@ class TaskBlueprint(NamedCommon): specifications_doc = JSONField(help_text='Schedulings for this task (IMMUTABLE).') do_cancel = BooleanField(help_text='Cancel this task.') specifications_template = ForeignKey('TaskTemplate', on_delete=CASCADE, help_text='Schema used for specifications_doc (IMMUTABLE).') - draft = ForeignKey('TaskDraft', related_name='task_blueprints', on_delete=CASCADE, help_text='Task Draft which this task instantiates.') + draft = ForeignKey('TaskDraft', related_name='task_blueprints', on_delete=PROTECT, help_text='Task Draft which this task instantiates.') scheduling_unit_blueprint = ForeignKey('SchedulingUnitBlueprint', related_name='task_blueprints', on_delete=CASCADE, help_text='Scheduling Unit Blueprint to which this task belongs.') output_pinned = BooleanField(default=False, help_text='True if the output of this task is pinned to disk, that is, forbidden to be removed.') diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py index 73bbeaee8b8312b9b2b05bd6bb0bad652ca155d1..2e62394fc92e667ddb385b580c6a2ba879a0d941 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py @@ -552,8 +552,8 @@ def unschedule_subtask(subtask: Subtask) -> Subtask: subtask.save() for output in subtask.outputs.all(): - # delete all transforms (the producers of the output dataproducts), and the the dataproducts themselves - output.dataproducts.all().select_related('producers').delete() + # delete all output transforms, and the the dataproducts themselves + DataproductTransform.objects.filter(output__in=output.dataproducts.all()).all().delete() output.dataproducts.all().delete() assign_or_unassign_resources(subtask) @@ -671,6 +671,8 @@ def assign_or_unassign_resources(subtask: Subtask): ra_spec['predecessors'].append(_create_ra_specification(pred)) except: pass + + #TODO: rewrite the code below. Goal is to take out stations which cannot be used. Accept if sufficient stations available, else raise. Only do this for observation subtasks. assigned = False cnt_do_assignments = 1 with RARPC.create() as rarpc: @@ -682,9 +684,11 @@ def assign_or_unassign_resources(subtask: Subtask): logger.info("Conflicts in assignment detected, lets check the stations in conflict and re-assign if possible") # Try to re-assign if not assigned yet if not assigned: - lst_stations_in_conflict = get_stations_in_conflict(subtask.id) - lst_stations = determine_stations_which_can_be_assigned(subtask, lst_stations_in_conflict) - ra_spec = update_specification(ra_spec, lst_stations) + # only reason about stations when this is an observation with a station_list + if "stations" in subtask.specifications_doc and "station_list" in subtask.specifications_doc["stations"]: + lst_stations_in_conflict = get_stations_in_conflict(subtask.id) + lst_stations = determine_stations_which_can_be_assigned(subtask, lst_stations_in_conflict) + ra_spec = update_specification(ra_spec, lst_stations) # At the end still not possible to assign, give Exception. if not assigned: diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/viewsets/scheduling.py b/SAS/TMSS/backend/src/tmss/tmssapp/viewsets/scheduling.py index 42fc0210cbabfc8e86b05c51f2b0657aaca6ea53..767c7a835c1ebb74b5ae3a8cfff1a3411a8d1321 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/viewsets/scheduling.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/viewsets/scheduling.py @@ -190,8 +190,8 @@ class SubtaskViewSet(LOFARViewSet): @action(methods=['get'], detail=True, url_name="schedule") def schedule(self, request, pk=None): subtask = get_object_or_404(models.Subtask, pk=pk) - from lofar.sas.tmss.tmss.tmssapp.subtasks import schedule_subtask - scheduled_subtask = schedule_subtask(subtask) + from lofar.sas.tmss.tmss.tmssapp.subtasks import schedule_subtask_and_update_successor_start_times + scheduled_subtask = schedule_subtask_and_update_successor_start_times(subtask) serializer = self.get_serializer(scheduled_subtask) return RestResponse(serializer.data) diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/viewsets/specification.py b/SAS/TMSS/backend/src/tmss/tmssapp/viewsets/specification.py index 506b44b8b18dc9b3969305d0c3ac1c326da23f54..12fa2504a1131c98730c5519ec06a4d997ff5ae4 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/viewsets/specification.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/viewsets/specification.py @@ -88,6 +88,15 @@ class SchedulingUnitObservingStrategyTemplateViewSet(LOFARViewSet): spec = add_defaults_to_json_object_for_schema(strategy_template.template, strategy_template.scheduling_unit_template.schema) + # get the default_scheduling_constraints_template and fill a doc if available + default_scheduling_constraints_template = models.DefaultSchedulingConstraintsTemplate.objects.all().order_by('created_at').last() + if default_scheduling_constraints_template: + scheduling_constraints_template = default_scheduling_constraints_template.template + scheduling_constraints_doc = get_default_json_object_for_schema(scheduling_constraints_template.schema) + else: + scheduling_constraints_template = None + scheduling_constraints_doc = None + scheduling_set = get_object_or_404(models.SchedulingSet, pk=request.query_params['scheduling_set_id']) scheduling_unit_draft = models.SchedulingUnitDraft.objects.create(name=request.query_params.get('name', "scheduling unit"), @@ -95,7 +104,9 @@ class SchedulingUnitObservingStrategyTemplateViewSet(LOFARViewSet): requirements_doc=spec, scheduling_set=scheduling_set, requirements_template=strategy_template.scheduling_unit_template, - observation_strategy_template=strategy_template) + observation_strategy_template=strategy_template, + scheduling_constraints_doc=scheduling_constraints_doc, + scheduling_constraints_template=scheduling_constraints_template) scheduling_unit_observation_strategy_template_path = request._request.path base_path = scheduling_unit_observation_strategy_template_path[:scheduling_unit_observation_strategy_template_path.find('/scheduling_unit_observing_strategy_template')] diff --git a/SAS/TMSS/backend/src/tmss/workflowapp/flows/schedulingunitflow.py b/SAS/TMSS/backend/src/tmss/workflowapp/flows/schedulingunitflow.py index d98d0e652a756ebfcbb1611704233b3f234f5c5e..5ab934891e8e4358fbf19719cb1d82972de9587b 100644 --- a/SAS/TMSS/backend/src/tmss/workflowapp/flows/schedulingunitflow.py +++ b/SAS/TMSS/backend/src/tmss/workflowapp/flows/schedulingunitflow.py @@ -242,7 +242,7 @@ class SchedulingUnitFlow(Flow): def do_mark_sub(self, activation): - activation.process.su.output_data_allowed_to_be_deleted = True + activation.process.su.output_pinned = True activation.process.su.results_accepted = ((activation.process.qa_reporting_to is not None and activation.process.qa_reporting_to.operator_accept) and (activation.process.qa_reporting_sos is not None and activation.process.qa_reporting_sos.sos_accept_show_pi) and (activation.process.decide_acceptance is not None and activation.process.decide_acceptance.sos_accept_after_pi)) diff --git a/SAS/TMSS/backend/src/tmss/workflowapp/tests/t_workflow_qaworkflow.py b/SAS/TMSS/backend/src/tmss/workflowapp/tests/t_workflow_qaworkflow.py index 7185040d6f089a13f6e67397d260648dae5e9847..4616f51c172043d48486bf833ef01fe1ddd2695c 100755 --- a/SAS/TMSS/backend/src/tmss/workflowapp/tests/t_workflow_qaworkflow.py +++ b/SAS/TMSS/backend/src/tmss/workflowapp/tests/t_workflow_qaworkflow.py @@ -346,8 +346,8 @@ class SchedulingUnitFlowTest(unittest.TestCase): self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[12].flow_task.name, 'mark_sub') self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[12].status, 'DONE') - output_data_allowed_to_be_deleted = models.SchedulingUnitBlueprint.objects.get(pk=scheduling_unit_process_id).output_data_allowed_to_be_deleted - self.assertEqual(True,output_data_allowed_to_be_deleted) + output_pinned = models.SchedulingUnitBlueprint.objects.get(pk=scheduling_unit_process_id).output_pinned + self.assertEqual(True,output_pinned) self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[13].flow_task.name, 'check_data_pinned') self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[13].status, 'DONE') @@ -797,8 +797,8 @@ class SchedulingUnitFlowTest(unittest.TestCase): self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[7].flow_task.name, 'mark_sub') self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[7].status, 'DONE') - output_data_allowed_to_be_deleted = models.SchedulingUnitBlueprint.objects.get(pk=scheduling_unit_process_id).output_data_allowed_to_be_deleted - self.assertEqual(True,output_data_allowed_to_be_deleted) + output_pinned = models.SchedulingUnitBlueprint.objects.get(pk=scheduling_unit_process_id).output_pinned + self.assertEqual(True,output_pinned) self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[8].flow_task.name, 'check_data_pinned') self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[8].status, 'DONE') @@ -1094,8 +1094,8 @@ class SchedulingUnitFlowTest(unittest.TestCase): self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[10].flow_task.name, 'mark_sub') self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[10].status, 'DONE') - output_data_allowed_to_be_deleted = models.SchedulingUnitBlueprint.objects.get(pk=scheduling_unit_process_id).output_data_allowed_to_be_deleted - self.assertEqual(True,output_data_allowed_to_be_deleted) + output_pinned = models.SchedulingUnitBlueprint.objects.get(pk=scheduling_unit_process_id).output_pinned + self.assertEqual(True,output_pinned) self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[11].flow_task.name, 'check_data_pinned') self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[11].status, 'DONE') @@ -1405,17 +1405,17 @@ class SchedulingUnitFlowTest(unittest.TestCase): self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[12].flow_task.name, 'mark_sub') self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[12].status, 'DONE') - output_data_allowed_to_be_deleted = models.SchedulingUnitBlueprint.objects.get(pk=scheduling_unit_process_id).output_data_allowed_to_be_deleted - self.assertEqual(True,output_data_allowed_to_be_deleted) + output_pinned = models.SchedulingUnitBlueprint.objects.get(pk=scheduling_unit_process_id).output_pinned + self.assertEqual(True,output_pinned) self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[13].flow_task.name, 'check_data_pinned') self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[13].status, 'DONE') - self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[14].flow_task.name, 'delete_data') - self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[14].status, 'DONE') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[14].flow_task.name, 'unpin_data') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[14].status, 'NEW') - self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[15].flow_task.name, 'end') - self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[15].status, 'DONE') + # self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[15].flow_task.name, 'end') + # self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[15].status, 'DONE') if __name__ == '__main__': diff --git a/SAS/TMSS/backend/test/t_tmssapp_specification_REST_API.py b/SAS/TMSS/backend/test/t_tmssapp_specification_REST_API.py index a8b45b6631b58cef0594c095e01d21b68d458770..c43fc1ae28505d5528cd4a05b754cd3d282f2f0c 100755 --- a/SAS/TMSS/backend/test/t_tmssapp_specification_REST_API.py +++ b/SAS/TMSS/backend/test/t_tmssapp_specification_REST_API.py @@ -1941,7 +1941,7 @@ class SchedulingUnitBlueprintTestCase(unittest.TestCase): # assert GET_and_assert_equal_expected_code(self, url, 404) - def test_scheduling_unit_blueprint_CASCADE_behavior_on_scheduling_unit_draft_deleted(self): + def test_scheduling_unit_blueprint_PROTECT_behavior_on_scheduling_unit_draft_deleted(self): scheduling_unit_draft_url = test_data_creator.post_data_and_get_url(test_data_creator.SchedulingUnitDraft(), '/scheduling_unit_draft/') sub_test_data = test_data_creator.SchedulingUnitBlueprint(scheduling_unit_draft_url=scheduling_unit_draft_url, template_url=self.template_url) @@ -1951,11 +1951,11 @@ class SchedulingUnitBlueprintTestCase(unittest.TestCase): # verify GET_OK_and_assert_equal_expected_response(self, url, sub_test_data) - # DELETE dependency - DELETE_and_assert_gone(self, scheduling_unit_draft_url) - - # assert - GET_and_assert_equal_expected_code(self, url, 404) + # Try to DELETE dependency, verify that was not successful + # Unfortunately we don't get a nice error in json, but a Django debug page on error 500... + response = requests.delete(scheduling_unit_draft_url, auth=AUTH) + self.assertEqual(500, response.status_code) + self.assertTrue("ProtectedError" in str(response.content)) def test_GET_SchedulingUnitBlueprint_view_returns_correct_entry(self): @@ -2121,8 +2121,9 @@ class TaskBlueprintTestCase(unittest.TestCase): # assert GET_and_assert_equal_expected_code(self, url, 404) - def test_task_blueprint_CASCADE_behavior_on_task_draft_deleted(self): - draft_url = test_data_creator.post_data_and_get_url(test_data_creator.TaskDraft(), '/task_draft/') + def test_task_blueprint_PROTECT_behavior_on_task_draft_deleted(self): + draft_data = test_data_creator.TaskDraft() + draft_url = test_data_creator.post_data_and_get_url(draft_data, '/task_draft/') tb_test_data = test_data_creator.TaskBlueprint(draft_url=draft_url, template_url=self.template_url, scheduling_unit_blueprint_url=self.scheduling_unit_blueprint_url) # POST new item @@ -2131,11 +2132,15 @@ class TaskBlueprintTestCase(unittest.TestCase): # verify GET_OK_and_assert_equal_expected_response(self, url, tb_test_data) - # DELETE dependency - DELETE_and_assert_gone(self, draft_url) + # refresh draft_data, because it now has a reference to the blueprint + draft_data = test_data_creator.get_response_as_json_object(draft_url) - # assert - GET_and_assert_equal_expected_code(self, url, 404) + # Try to DELETE dependency, verify that was not successful + # Unfortunately we don't get a nice error in json, but a Django debug page on error 500... + response = requests.delete(draft_url, auth=AUTH) + self.assertEqual(500, response.status_code) + self.assertTrue("ProtectedError" in str(response.content)) + GET_OK_and_assert_equal_expected_response(self, draft_url, draft_data) def test_task_blueprint_CASCADE_behavior_on_scheduling_unit_blueprint_deleted(self): scheduling_unit_blueprint_url = test_data_creator.post_data_and_get_url(test_data_creator.SchedulingUnitBlueprint(), '/scheduling_unit_blueprint/') diff --git a/SAS/TMSS/backend/test/t_tmssapp_specification_django_API.py b/SAS/TMSS/backend/test/t_tmssapp_specification_django_API.py index fb612a0416d7b17b3e18e257a5775c6c501be218..7966ebf804157257cddc5f6b63d1d774d20694ad 100755 --- a/SAS/TMSS/backend/test/t_tmssapp_specification_django_API.py +++ b/SAS/TMSS/backend/test/t_tmssapp_specification_django_API.py @@ -43,6 +43,7 @@ from lofar.sas.tmss.test.tmss_test_data_django_models import * from django.db.utils import IntegrityError from django.core.exceptions import ValidationError +from django.db.models.deletion import ProtectedError from lofar.sas.tmss.tmss.exceptions import SchemaValidationException class GeneratorTemplateTest(unittest.TestCase): @@ -713,8 +714,15 @@ class SchedulingUnitBlueprintTest(unittest.TestCase): # assert with self.assertRaises(IntegrityError): models.SchedulingUnitBlueprint.objects.create(**test_data) - - + + def test_SchedulingUnitBlueprint_prevents_draft_deletion(self): + # setup + test_data = dict(SchedulingUnitBlueprint_test_data()) + blueprint = models.SchedulingUnitBlueprint.objects.create(**test_data) + draft = blueprint.draft + with self.assertRaises(ProtectedError): + draft.delete() + def test_SchedulingUnitBlueprint_gets_created_with_correct_default_ingest_permission_required(self): # setup @@ -775,6 +783,14 @@ class TaskBlueprintTest(unittest.TestCase): with self.assertRaises(IntegrityError): models.TaskBlueprint.objects.create(**test_data) + def test_TaskBlueprint_prevents_draft_deletion(self): + # setup + test_data = dict(TaskBlueprint_test_data()) + blueprint = models.TaskBlueprint.objects.create(**test_data) + draft = blueprint.draft + with self.assertRaises(ProtectedError): + draft.delete() + def test_TaskBlueprint_prevents_missing_scheduling_unit_blueprint(self): # setup diff --git a/SAS/TMSS/client/lib/populate.py b/SAS/TMSS/client/lib/populate.py index dc990681d39355e8a7b58b324bd5197c577a7674..6d3420403a6490f9b74c7117f4fb845bce66e9e5 100644 --- a/SAS/TMSS/client/lib/populate.py +++ b/SAS/TMSS/client/lib/populate.py @@ -103,6 +103,14 @@ def populate_schemas(schema_dir: str=None, templates_filename: str=None): template = templates_dict.pop(id) upload_template(template) + # helper functions for uploading observing_strategy_templates + def upload_observing_strategy_templates(template: dict): + scheduling_unit_templates = client.get_path_as_json_object('scheduling_unit_template?name=' + template.get('scheduling_unit_template_name') + '&version=' + template.get('scheduling_unit_template_version')) + scheduling_unit_template = scheduling_unit_templates[0] + template['scheduling_unit_template'] = scheduling_unit_template['url'] + logger.info("Uploading observation strategy with name='%s' version='%s'", template['name'], template['version']) + client.post_template(template_path='scheduling_unit_observing_strategy_template', **template) + # first, upload all dependent templates for ref in all_references: upload_template_if_needed_with_dependents_first(ref) @@ -113,9 +121,13 @@ def populate_schemas(schema_dir: str=None, templates_filename: str=None): executor.map(upload_template, rest_templates) # and finally, the observing_strategy_templates - for template in observing_strategy_templates: - scheduling_unit_templates = client.get_path_as_json_object('scheduling_unit_template?name=' + template.get('scheduling_unit_template_name') + '&version=' + template.get('scheduling_unit_template_version')) - scheduling_unit_template = scheduling_unit_templates[0] - template['scheduling_unit_template'] = scheduling_unit_template['url'] - logger.info("Uploading observation strategy with name='%s' version='%s'", template['name'], template['version']) - client.post_template(template_path='scheduling_unit_observing_strategy_template', **template) + with ThreadPoolExecutor() as executor: + executor.map(upload_observing_strategy_templates, observing_strategy_templates) + + scheduling_constraints_templates = client.get_path_as_json_object('scheduling_constraints_template') + if scheduling_constraints_templates: + default_scheduling_constraints_template = scheduling_constraints_templates[0] + logger.info("Making scheduling_constraints_templates name='%s' version='%s' the default", default_scheduling_constraints_template['name'], default_scheduling_constraints_template['version']) + client.session.post(client.get_full_url_for_path('default_scheduling_constraints_template'), json={'name': default_scheduling_constraints_template['name'], + 'template': default_scheduling_constraints_template['url']}) + diff --git a/SubSystems/SCU/CMakeLists.txt b/SubSystems/SCU/CMakeLists.txt index a40e3fca760202efa32c43603965210751182640..9e4627f8a943ebd1d808ce95e3f6f3c7dff9ae89 100644 --- a/SubSystems/SCU/CMakeLists.txt +++ b/SubSystems/SCU/CMakeLists.txt @@ -1,40 +1,3 @@ -IF(SKIP_TMSS_BUILD) # This is needed for building on Jenkins. When fully on GitLab it can be removed again. -set(BUILD_TESTING OFF) -lofar_package(SCU - DEPENDS MAC_Services - MoMQueryService - MoMutils - OTDBtoRATaskStatusPropagator - RATaskSpecifiedService - RAtoOTDBTaskSpecificationPropagator - ResourceAssigner - ResourceAssignmentDatabase - ResourceAssignmentEditor - ResourceAssignmentEstimator - ResourceAssignmentService - SpecificationServices - TBB - TBBService - TriggerServices - TriggerEmailService - TaskPrescheduler - CleanupService - AutoCleanupService - StorageQueryService - QPIDInfrastructure - RAScripts - StaticMetaData - RACommon - ltastorageoverview - QA_Service - MessageLogger - TMSSClient - TMSSSchedulingService - TMSSFeedbackHandlingService - TMSSPostgresListenerService - TMSSWebSocketService - TMSSWorkflowService) -ELSE() lofar_package(SCU DEPENDS MAC_Services MoMQueryService @@ -69,9 +32,9 @@ lofar_package(SCU TMSSFeedbackHandlingService TMSSPostgresListenerService TMSSWebSocketService + TMSSLTAAdapter TMSSWorkflowService) -ENDIF(SKIP_TMSS_BUILD) - + # supervisord config files lofar_add_sysconf_files(SCU.ini DESTINATION supervisord.d)