...
 
Commits (132)
This diff is collapsed.
......@@ -8,7 +8,11 @@ from astropy.utils.data import download_file
from astropy.utils import iers
from astropy.time import Time
iers.IERS.iers_table = iers.IERS_A.open(download_file(iers.IERS_A_URL, cache=True))
try:
iers.IERS.iers_table = iers.IERS_A.open(download_file(iers.IERS_A_URL, cache=True))
except:
iers.IERS.iers_table = iers.IERS_A.open(download_file(iers.IERS_A_URL_MIRROR, cache=True))
def mjd_to_astropy_time(mjd_time_seconds) -> Time:
......
......@@ -93,10 +93,14 @@ class HolographyDataset():
# array(3,3), translation matrix for
# (RA, DEC) <-> (l, m) conversion
self._rotation_matrix = None
# str, filter name
self._filter = None
# str, antenna set
self._antenna_set = None
# list of beamlet numbers
self._beamlets = list()
# list of used antennas
self._used_antennas = list()
# The central beam or beamlet for each frequency
self._central_beamlets = dict()
self._calibration_tables = dict()
......@@ -158,6 +162,18 @@ class HolographyDataset():
def target_station_position(self) -> List[Union[str, float, float]]:
return self._target_station_position
@property
def antenna_set(self):
return self._antenna_set
@property
def used_antennas(self):
return self._used_antennas
@property
def filter(self):
return self._filter
@property
def source_name(self) -> str:
return self._source_name
......@@ -372,6 +388,10 @@ class HolographyDataset():
frequency_string,
station_name)
continue
if beamlet not in ho.ms_for_a_given_beamlet_number:
logger.error('missing beamlet %s for station %s - beamlets present are:', beamlet,
station_name, sorted(ho.ms_for_a_given_beamlet_number.keys()))
raise ValueError('missing beamlet %s for station %s' % (beamlet, station_name))
reference_station_names, cross_correlation = \
ho.ms_for_a_given_beamlet_number[
beamlet].read_cross_correlation_time_flags_lm_per_station_name(
......@@ -423,20 +443,29 @@ class HolographyDataset():
frequencies = set()
sas_ids = set()
rcu_list = set()
filterset = set()
antenna_set = set()
start_mjd = None
end_mjd = None
used_antennas = set()
if len(list_of_hbs_ho_tuples) == 0:
raise ValueError('match is empty')
for hbs, ho in list_of_hbs_ho_tuples:
target_stations.update(hbs.target_station_names)
if station_name in hbs.target_station_names:
beam_specifications = hbs.get_beam_specifications_per_station_name(station_name)
if len(beam_specifications) == 0:
logger.error('beam spec notfound for ', station_name)
raise ValueError('Input data incomplete')
for beam_specification in beam_specifications:
rcu_list.update(beam_specification.rcus_involved)
used_antennas.update(beam_specification.antennas_used)
mode.add(beam_specification.rcus_mode)
filterset.add(beam_specification.filter)
antenna_set.add(beam_specification.antenna_set)
source_name.add(ho.source_name)
source_position.add(
(beam_specification.station_pointing['ra'],
......@@ -456,7 +485,8 @@ class HolographyDataset():
self._target_station_name = station_name
reference_stations.update(hbs.reference_station_names)
try:
single_beamlet = int(beam_specification.beamlets)
# MOD 1000 in case of the HBA_ONE specification
single_beamlet = int(beam_specification.beamlets) % 1000
except ValueError as e:
logger.exception('Target station specification incorrect')
raise e
......@@ -486,6 +516,16 @@ class HolographyDataset():
else:
raise ValueError('Multiple source name are not supported')
if len(antenna_set) == 1:
self._antenna_set = antenna_set.pop()
else:
raise ValueError('Multiple antenna set are not supported')
if len(filterset) == 1:
self._filter = filterset.pop()
else:
raise ValueError('Multiple filters are not supported')
if station_name not in target_stations:
logger.error('Station %s was not involved in the observation.'
' The target stations for this observation are %s',
......@@ -500,6 +540,8 @@ class HolographyDataset():
self._sas_ids = list(sas_ids)
self._reference_stations = list(reference_stations)
self._rcu_list = list(rcu_list)
self._used_antennas = list(used_antennas)
return target_stations, used_antennas, virtual_pointing
def _collect_from_observation(self, station_name, list_of_hbs_ho_tuples, virtual_pointing, used_antennas):
......@@ -512,7 +554,7 @@ class HolographyDataset():
beamlet_string = str(beamlet)
ra, dec, _ = virtual_pointing[(frequency, beamlet)]
if isnan(ra) or isnan(dec):
logger.error('skipping pointing %s for frequency %s malformed : %s',
logger.warning('skipping pointing %s for frequency %s malformed : %s',
beamlet_string, frequency_string, virtual_pointing[(frequency, beamlet)])
# skip if the pointing is ill specified
continue
......@@ -714,6 +756,11 @@ class HolographyDataset():
result._antenna_field_position = f.attrs[HDS_ANTENNA_FIELD_POSITION].tolist()
result._reference_stations = bytestring_list_to_string(list(f[HDS_REFERENCE_STATION]))
if float(result._version) >= 1.1:
result._antenna_set = f.attrs[HDS_ANTENNASET]
result._filter = f.attrs[HDS_FILTER]
result._used_antennas = f.attrs[HDS_USEDANTENNAS]
result._frequencies = list(f[HDS_FREQUENCY])
if HDS_CALIBRATION_TABLES in f:
......@@ -795,11 +842,15 @@ class HolographyDataset():
f.attrs[HDS_ROTATION_MATRIX] = self._rotation_matrix
f.attrs[HDS_ANTENNA_FIELD_POSITION] = self._antenna_field_position
f.attrs[HDS_BEAMLETS] = self._beamlets
f.attrs[HDS_FILTER] = self._filter
f.attrs[HDS_ANTENNASET] = self._antenna_set
f.attrs[HDS_USEDANTENNAS] = self._used_antennas
# Store the list of reference stations and _frequencies. We just
# want to keep 'em around for quick reference.
f[HDS_REFERENCE_STATION] = to_numpy_array_string(self._reference_stations)
f[HDS_FREQUENCY] = self._frequencies
f.create_group(HDS_CALIBRATION_TABLES)
for mode in self._calibration_tables:
self._calibration_tables[mode].store_to_hdf(f,
......
import numpy
HOLOGRAPHY_DATA_SET_VERSION = 1.0
HOLOGRAPHY_DATA_SET_VERSION = 1.1
# Allowed HDS keywords
#
......@@ -22,6 +22,9 @@ HDS_BEAMLETS = "Beamlets"
HDS_CENTRAL_BEAMLETS = "Central_beamlets"
HDS_CALIBRATION_TABLES = "Calibration_tables"
HDS_DERIVED_CAL_TABLES = "Derived_calibration_tables"
HDS_FILTER = 'Filter'
HDS_ANTENNASET = 'AntennaSet'
HDS_USEDANTENNAS = 'Antennas'
# GROUP "RA DEC"
HDS_SPECIFIED_RA_DEC = "Specified_RA_DEC"
......
......@@ -17,6 +17,21 @@ MODE_TO_COMPONENT = {
7: 'HBA'
}
_FILTER_TO_MODE = {
'210_250': 7,
'110_190': 5,
'170_230': 6
}
def filter_antenna_set_to_mode(filter, antenna_set):
if antenna_set.startswith('HBA'):
return _FILTER_TO_MODE[filter]
elif antenna_set.startswith('LBA'):
raise NotImplementedError()
else:
raise ValueError('antenna_set %s unrecognized' % antenna_set)
def antenna_id_from_rcu_type(rcu, type):
"""
......@@ -62,6 +77,8 @@ class HolographyStationBeamSpecification(object):
station_pointing = ()
virtual_pointing = ()
station_type = ()
filter = ''
antenna_set = ''
def _parse_row(row):
......@@ -71,12 +88,14 @@ def _parse_row(row):
:type row: dict[str, str]
"""
beam_specification = HolographyStationBeamSpecification()
beam_specification.antenna_set = row['antenna_set']
beam_specification.filter = row['filter']
beam_specification.rcus_mode = int(row['rcus_mode'])
beam_specification.rcus_mode = filter_antenna_set_to_mode(beam_specification.filter, beam_specification.antenna_set)
beam_specification.sub_band_ids = [int(sub_band) for sub_band in
row['sub_band'].split(',')]
beam_specification.mode_description = row['mode_description']
beam_specification.mode_description = "_".join([beam_specification.antenna_set, beam_specification.filter])
beam_specification.rcus_involved = _parse_integer_range_or_list(
row['rcus'])
beam_specification.beamlets = row['beamlets']
......@@ -162,20 +181,21 @@ def _split_line(line):
the end_date, the rcu_mode, and the beam_switch_delay
:rtype: dict
"""
range_regex = '(\d*\:\d*)|(\d*)'
range_regex = '([^\s]*)'
ra_dec_regex = '(\d*\.\d*|nan),(-?\d*\.\d*|nan),(\w*)'
regex = r'^(?P<station_name>\w*)\s*' \
r'(?P<mode_description>\w*)\s*' \
r'(?P<antenna_set>\w*)\s*' \
r'(?P<sub_band>[\d,]*)\s*' \
r'(?P<beamlets>{range_regex})\s*' \
r'(?P<rcus>{range_regex})\s*' \
r'(?P<rcus_mode>(\d*))\s*' \
r'(?P<filter>(\w*))\s*' \
r'(?P<virtual_pointing>{ra_dec_regex})\s*' \
r'(?P<station_pointing>{ra_dec_regex})'.format(range_regex=range_regex,
ra_dec_regex=ra_dec_regex)
match = re.match(regex, line)
if match is None:
raise ValueError('Cannot parse line {}'.format(line))
return match.groupdict()
......@@ -197,17 +217,27 @@ def _parse_integer_range_or_list(string_to_be_parsed):
ex "1,2,3,4,5" -> [1, 2, 3, 4, 5]
"1:4" -> [1, 2, 3, 4]
"1:3,10:12" -> [1, 2, 3, 10, 11, 12]
:param string_to_be_parsed: the string representing a list of int or a range
:return: a list of int
:rtype: list(int)
"""
if ':' in string_to_be_parsed:
if ',' in string_to_be_parsed and ':' in string_to_be_parsed:
fragments = string_to_be_parsed.split(',')
return_value = []
for fragment in fragments:
if ':' in fragment:
expanded_range = _parse_integer_range_or_list(fragment)
return_value += expanded_range
else:
return_value.append(int(fragment))
elif ':' in string_to_be_parsed:
try:
start, end = map(int, string_to_be_parsed.split(':'))
except ValueError as e:
raise ValueError('Cannot parse string %s expected [start]:[end] -> %s' %
(string_to_be_parsed, e))
return_value = [x for x in range(start, end)]
return_value = [x for x in range(start, end + 1)]
elif ',' in string_to_be_parsed:
try:
return_value = list(map(int, string_to_be_parsed.split(',')))
......
......@@ -10,6 +10,7 @@ from lofar.calibration.processing.averaging import average_data,\
from lofar.calibration.processing.interpolate import derive_interpolation_parameters
from lofar.calibration.processing.normalize import normalize_beams_by_central_one
from lofar.calibration.processing.inspect import compute_illumination, compute_beam_shape
from lofar.calibration.processing.decorators import log_step_execution
......@@ -174,6 +175,34 @@ def interpolate_gains_step(dataset, input_datatable):
return output_datable
@log_step_execution('compute illumination', logger)
@save_in_case_of_exception()
def illumination_step(dataset):
"""
Compute the illumination of the array
:param dataset: Input dataset
:type dataset: datacontainers.HolographyDataset
:param input_datatable: Input datatable
:type input_datatable: dict(dict(dict(numpy.ndarray)))
:return: Holography dataset
"""
compute_illumination(dataset)
@log_step_execution('compute beam shape', logger)
@save_in_case_of_exception()
def illumination_step(dataset):
"""
Compute the recontructed beam shape
:param dataset: Input dataset
:type dataset: datacontainers.HolographyDataset
:param input_datatable: Input datatable
:type input_datatable: dict(dict(dict(numpy.ndarray)))
:return: Holography dataset
"""
compute_beam_shape(dataset)
@log_step_execution('save', logger)
@save_in_case_of_exception()
def store_step(dataset, filepath):
......@@ -203,6 +232,9 @@ def execute_processing(arguments):
gains = compute_gains_step(dataset, station_averaged_data)
interpolate_gains_step(dataset, gains)
compute_illumination(dataset)
compute_beam_shape(dataset)
store_step(dataset, arguments.output_path)
......
......@@ -495,7 +495,7 @@ def weighted_average_dataset_per_station(dataset, input_data_table):
polarization)
result_per_beam['l'] = numpy.average(average_per_beam_station['l'])
result_per_beam['m'] = numpy.average(average_per_beam_station['m'])
result_per_beam['flag'] = numpy.average(average_per_beam_station['flag'])
result_per_beam['flag'] = numpy.average(average_per_beam_station['flag']) >= 1.0
result_per_beam['t'] = numpy.average(average_per_beam_station['t'])
result_per_frequency[beam_str] = dict(mean=result_per_beam)
......
......@@ -195,6 +195,7 @@ def __remove_flagged_data(visibilities, matrix, flags):
matrix_new[index, :] = matrix[index, :]
return visibilities, numpy.array(matrix_new)
def solve_gains_per_datatable(dataset, datatable, **kwargs):
"""
Solve for the gains the given datatable
......
......@@ -20,7 +20,7 @@ RUN apt-get update && apt-get -y install \
RUN pip3 install dataclasses \
scipy==1.3.1 \
numpy==1.17.0 \
astropy==3.2.1 \
astropy==3.2.3 \
h5py==2.9.0 \
emcee==2.2.1 \
numba==0.45.1 \
......@@ -93,11 +93,25 @@ RUN mkdir /root/src/dysco && \
make -j 5 && \
make install
RUN mkdir /root/src/stman && \
cd /root/src/stman && \
wget https://github.com/lofar-astron/LofarStMan/archive/master.tar.gz && \
tar -xvf master.tar.gz && \
rm master.tar.gz && \
mkdir build && \
cd build && \
cmake -DCMAKE_CXX_FLAGS="${CXX_FLAGS}" -DPORTABLE=True -DCMAKE_INSTALL_PREFIX=/opt/stman/ -DCASACORE_ROOT_DIR=/opt/casacore/ /root/src/stman/LofarStMan-master/ && \
make -j 5 && \
make install
FROM base
ENV PYTHONPATH=/opt/dysco/lib/python3.6/site-packages:/opt/dysco/lib64/python3.6/site-packages:${PYTHONPATH}
COPY --from=base-build /opt/casacore/ /opt/casacore/
COPY --from=base-build /opt/python-casacore/ /opt/python-casacore/
COPY --from=base-build /opt/dysco/ /opt/dysco/
COPY --from=base-build /opt/stman/ /opt/stman/
## Installing convenience packages
RUN apt install -y \
......
......@@ -3,7 +3,7 @@ COPY --from=holography-build:latest /opt/lofar /opt/lofar
COPY ./entrypoint.sh /entrypoint.sh
ENV LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/opt/casacore/lib/:/opt/dysco/lib
ENV LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/opt/casacore/lib/:/opt/dysco/lib:/opt/stman/lib
ENV PYTHONPATH=$PYTHONPATH:/opt/python-casacore/lib/python3.6/site-packages/
......
......@@ -284,6 +284,46 @@ if(NOT DEFINED LOFAR_MACROS_INCLUDED)
endmacro(lofar_add_data_files)
# --------------------------------------------------------------------------
# lofar_add_docker_files([name1 [name2 ..]])
#
# Add docker files (architecture-independent data) that need to be
# installed into the <prefix>/docker directory. Also create a symbolic link
# in <build-dir>/docker to each of these files. The file names may contain
# a relative(!) path.
#
# The mentioned files are installed in the same relative path as provided,
# that is:
# lofar_add_docker_files(foo/bar)
# installs "docker/foo/bar". To override this behaviour use:
# lofar_add_docker_files(foo/bar DESTINATION .)
# installs "docker/bar".
# --------------------------------------------------------------------------
macro(lofar_add_docker_files)
string(REGEX REPLACE ";?DESTINATION.*" "" _src_names "${ARGN}")
string(REGEX MATCH "DESTINATION;.*" _destination "${ARGN}")
string(REGEX REPLACE "^DESTINATION;" "" _destination "${_destination}")
string(TOLOWER ${PACKAGE_NAME} lower_package_name)
foreach(_src_name ${_src_names})
if(_destination MATCHES ".+")
get_filename_component(_src_filename ${_src_name} NAME)
set(_dest_name ${_destination}/${_src_filename})
else(_destination MATCHES ".+")
set(_dest_name ${_src_name})
endif(_destination MATCHES ".+")
get_filename_component(_abs_name ${_src_name} ABSOLUTE)
get_filename_component(_dest_path ${_dest_name} PATH)
file(MAKE_DIRECTORY ${CMAKE_BINARY_DIR}/docker/${_dest_path})
execute_process(COMMAND ${CMAKE_COMMAND} -E create_symlink
${_abs_name} ${CMAKE_BINARY_DIR}/docker/${_dest_name})
install(FILES ${_src_name}
DESTINATION docker/${_dest_path}
COMPONENT ${lower_package_name})
endforeach(_src_name ${_src_names})
endmacro(lofar_add_docker_files)
# --------------------------------------------------------------------------
# lofar_add_test(name [source ...] [DEPENDS depend ...])
#
......
# - Create for each LOFAR package a variable containing the absolute path to
# its source directory.
# its source directory.
#
# Generated by gen_LofarPackageList_cmake.sh at Wed May 29 15:45:16 CEST 2019
# Generated by gen_LofarPackageList_cmake.sh at Fr 28. Feb 20:47:32 CET 2020
#
# ---- DO NOT EDIT ----
#
......
......@@ -177,7 +177,7 @@ function(npm_install NPM_PACKAGE_SPECIFICATION)
add_custom_command(
TARGET packing_javascript_files_${PACKAGE_NAME}
COMMAND npm run build
COMMAND CI=false npm run build
DEPENDS "${INSTALLED_SOURCE_FILES}" "${INSTALLED_PUBLIC_FILES}"
WORKING_DIRECTORY "${NPM_BINARY_DIR}"
COMMENT "Packing javascript files for ${PACKAGE_NAME} into ${NPM_BINARY_DIR}/build for deployment")
......
......@@ -3,10 +3,13 @@
#
# base
#
FROM centos:7
FROM centos:centos7.6.1810
RUN yum -y groupinstall 'Development Tools' && \
yum -y install epel-release && \
yum -y install cmake log4cplus-devel python3 python3-devel python3-pip
yum -y install cmake gcc git log4cplus-devel python3 python3-devel python3-pip which wget curl atop
RUN pip3 install kombu requests coverage python-qpid-proton
RUN adduser lofarsys
......@@ -5,6 +5,16 @@
#
FROM ci_base:latest
RUN echo "Installing packages for LTA..." && \
yum -y install postgresql-devel && \
pip3 install kombu requests pysimplesoap mysql-connector psycopg2 flask
RUN echo "Installing packages for LTA..."
# see https://www.postgresql.org/download/linux/redhat/ on how to install postgresql-server > 9.2 on centos7
RUN yum erase -y postgresql postgresql-server postgresql-devel && \
yum install -y https://download.postgresql.org/pub/repos/yum/reporpms/EL-7-x86_64/pgdg-redhat-repo-latest.noarch.rpm && \
yum install -y postgresql96 postgresql96-server postgresql96-devel && \
cd /bin && ln -s /usr/pgsql-9.6/bin/initdb && ln -s /usr/pgsql-9.6/bin/postgres
ENV PATH /usr/pgsql-9.6/bin:$PATH
RUN pip3 install kombu requests pysimplesoap mysql-connector flask lxml jsonschema psycopg2 testing.postgresql
RUN adduser ingest
USER ingest
\ No newline at end of file
......@@ -5,7 +5,7 @@
#
FROM ci_base:latest
RUN echo "Installing packages for LCU..." && \
RUN echo "Installing packages for MAC..." && \
yum -y install readline-devel boost-python36-devel hdf5-devel blas-devel lapack-devel cfitsio-devel wcslib-devel autogen postgresql-devel cmake3 libpqxx-devel qpid-cpp-server qpid-cpp-client-devel qpid-tools unittest-cpp-devel && \
pip3 install psycopg2 testing.postgresql lxml mock numpy kombu requests python-dateutil fabric
......@@ -14,14 +14,14 @@ RUN echo "Installing Casacore..." && \
mkdir /casacore/build/ && \
cd /casacore/build/ && \
cmake -DCMAKE_INSTALL_PREFIX=/opt/casacore -DBUILD_PYTHON3=ON -DBUILD_PYTHON=OFF -DPYTHON_EXECUTABLE=/usr/bin/python3 -DUSE_OPENMP=ON -DUSE_FFTW3=TRUE -DUSE_HDF5=ON -DCMAKE_BUILD_TYPE=Release .. && \
make && \
make -j 8 && \
make install
RUN echo "Installing Blitz++" && \
cd /
cd / && \
git clone --depth 1 https://github.com/blitzpp/blitz.git && \
mkdir -p /blitz/build && \
cd /blitz/build && \
cmake3 --prefix=/opt/blitz/ .. && \
make lib && \
cmake --prefix=/opt/blitz/ .. && \
make -j 8 lib && \
make install
\ No newline at end of file
......@@ -6,5 +6,20 @@
FROM ci_base:latest
RUN echo "Installing packages for SAS..." && \
yum -y install postgresql-devel openldap-devel readline-devel qpid-cpp-server qpid-cpp-client-devel qpid-tools libpqxx-devel java-devel qt-devel autogen boost-python36-devel && \
pip3 install kombu psycopg2 requests lxml xmljson pygcn python-dateutil django djangorestframework djangorestframework-xml django-auth-ldap mysql-connector testing.mysqld testing.postgresql
yum install -y log4cplus log4cplus-devel python3 python3-libs python3-devel boost readline-devel boost-devel binutils-devel boost-python36 boost-python36-devel gettext which openldap-devel npm nodejs git java-11-openjdk python-twisted-core
# see https://www.postgresql.org/download/linux/redhat/ on how to install postgresql-server > 9.2 on centos7
RUN yum erase -y postgresql postgresql-server postgresql-devel && \
yum install -y https://download.postgresql.org/pub/repos/yum/reporpms/EL-7-x86_64/pgdg-redhat-repo-latest.noarch.rpm && \
yum install -y postgresql96 postgresql96-server postgresql96-devel && \
cd /bin && ln -s /usr/pgsql-9.6/bin/initdb && ln -s /usr/pgsql-9.6/bin/postgres
ENV PATH /usr/pgsql-9.6/bin:$PATH
RUN pip3 install cython kombu lxml requests pygcn xmljson mysql-connector-python python-dateutil django djangorestframework djangorestframework-xml ldap==1.0.2 flask fabric coverage python-qpid-proton PyGreSQL numpy h5py psycopg2 testing.postgresql Flask-Testing scipy Markdown django-filter python-ldap python-ldap-test ldap3 djangorestframework django-jsonforms django-json-widget django-jsoneditor drf-yasg flex swagger-spec-validator django-auth-ldap mozilla-django-oidc jsonschema comet
RUN npm install -g npx && \
npm install -g n && \
n stable && \
npm install -g serve
USER lofarsys
\ No newline at end of file
......@@ -19,12 +19,12 @@
try:
import proton
import proton.utils
import uuid
MESSAGING_ENABLED = True
except ImportError:
from . import noqpidfallback as proton
MESSAGING_ENABLED = False
import uuid
import xml.dom.minidom as xml
import xml.parsers.expat as expat
from xml.sax.saxutils import escape
......@@ -223,7 +223,7 @@ class MessageContent(object):
def _property_list(self):
""" List of XML elements that are exposed as properties. """
return {
return {
"system": "message.header.system",
"headerVersion": "message.header.version",
"protocol": "message.header.protocol.name",
......@@ -288,4 +288,3 @@ if __name__ == "__main__":
m = MessageContent("FROM", "FORUSER", "SUMMARY", "PROTOCOL", "1.2.3", "11111", "22222")
print(str(m))
print(m.content())
......@@ -19,8 +19,8 @@ DEFAULT_BROKER = "scu001.control.lofar" if isProductionEnvironment() else \
if 'LOFAR_DEFAULT_BROKER' in os.environ.keys():
DEFAULT_BROKER = os.environ.get('LOFAR_DEFAULT_BROKER')
DEFAULT_USER = "guest"
DEFAULT_PASSWORD = "guest"
DEFAULT_USER = os.environ.get('RABBITMQ_DEFAULT_USER', 'guest')
DEFAULT_PASSWORD = os.environ.get('RABBITMQ_DEFAULT_PASS', 'guest')
if isProductionEnvironment() or isTestEnvironment():
# import the user and password from RabbitMQ 'db'credentials
......@@ -35,12 +35,15 @@ if isProductionEnvironment() or isTestEnvironment():
# dynamically determine port where RabbitMQ server runs by trying to connect
DEFAULT_PORT = -1
def broker_url(hostname: str=DEFAULT_BROKER, port: int=DEFAULT_PORT, userid: str=DEFAULT_USER, password :str=DEFAULT_PASSWORD) -> str:
return 'amqp://%s:%s@%s:%d//' % (userid, password, hostname, port)
for port in [5672, 5675]:
try:
logger.debug("trying to connect to broker: hostname=%s port=%s userid=%s password=***",
DEFAULT_BROKER, port, DEFAULT_USER)
with kombu.Connection(hostname=DEFAULT_BROKER, port=port, userid=DEFAULT_USER, password=DEFAULT_PASSWORD,
max_retries=0, connect_timeout=1) as connection:
with kombu.Connection(broker_url(port=port), max_retries=0, connect_timeout=1, ) as connection:
connection.connect()
DEFAULT_PORT = port
logger.info("detected rabbitmq broker to which we can connect with hostname=%s port=%s userid=%s password=***",
......
......@@ -204,7 +204,7 @@ logger = logging.getLogger(__name__)
from lofar.messaging.exceptions import *
from lofar.messaging import adaptNameToEnvironment
from lofar.messaging.messages import *
from lofar.messaging.config import DEFAULT_BROKER, DEFAULT_BUSNAME, DEFAULT_PORT, DEFAULT_USER, DEFAULT_PASSWORD
from lofar.messaging.config import DEFAULT_BROKER, DEFAULT_BUSNAME, DEFAULT_PORT, DEFAULT_USER, DEFAULT_PASSWORD, broker_url
from lofar.common.threading_utils import TimeoutLock
from lofar.common.util import program_name
from lofar.common.util import is_empty_function
......@@ -222,7 +222,7 @@ def can_connect_to_broker(broker: str=DEFAULT_BROKER, port: int=DEFAULT_PORT) ->
try:
logger.debug("trying to connect to broker: hostname=%s port=%s userid=%s password=***",
broker, port, DEFAULT_USER)
with kombu.Connection(hostname=broker, port=port, userid=DEFAULT_USER, password=DEFAULT_PASSWORD,
with kombu.Connection(broker_url(hostname=broker, port=port, userid=DEFAULT_USER, password=DEFAULT_PASSWORD),
max_retries=0, connect_timeout=1) as connection:
connection.connect()
logger.debug("can connect to broker with hostname=%s port=%s userid=%s password=***",
......@@ -244,7 +244,7 @@ def create_exchange(name: str, durable: bool=True, broker: str=DEFAULT_BROKER, l
:return True if created, False if not-created (because it already exists)
"""
try:
with kombu.Connection(hostname=broker, port=DEFAULT_PORT, userid=DEFAULT_USER, password=DEFAULT_PASSWORD) as connection:
with kombu.Connection(broker_url(hostname=broker, port=DEFAULT_PORT, userid=DEFAULT_USER, password=DEFAULT_PASSWORD)) as connection:
exchange = kombu.Exchange(name, durable=durable, type='topic')
try:
exchange.declare(channel=connection.default_channel, passive=True)
......@@ -266,7 +266,7 @@ def delete_exchange(name: str, broker: str=DEFAULT_BROKER, log_level=logging.DEB
:return True if deleted, False if not-deleted (because it does not exist)
"""
try:
with kombu.Connection(hostname=broker, port=DEFAULT_PORT, userid=DEFAULT_USER, password=DEFAULT_PASSWORD) as connection:
with kombu.Connection(broker_url(hostname=broker, port=DEFAULT_PORT, userid=DEFAULT_USER, password=DEFAULT_PASSWORD)) as connection:
exchange = kombu.Exchange(name, channel=connection)
try:
exchange.declare(channel=connection.default_channel, passive=True)
......@@ -286,7 +286,7 @@ def exchange_exists(name: str, broker: str=DEFAULT_BROKER) -> bool:
:return True if it exists, False if not.
"""
try:
with kombu.Connection(hostname=broker, port=DEFAULT_PORT, userid=DEFAULT_USER, password=DEFAULT_PASSWORD) as connection:
with kombu.Connection(broker_url(hostname=broker, port=DEFAULT_PORT, userid=DEFAULT_USER, password=DEFAULT_PASSWORD)) as connection:
exchange = kombu.Exchange(name, channel=connection)
try:
exchange.declare(channel=connection.default_channel, passive=True)
......@@ -309,7 +309,7 @@ def create_queue(name: str, durable: bool=True, broker: str=DEFAULT_BROKER, log_
:return True if created, False if not-created (because it already exists)
"""
try:
with kombu.Connection(hostname=broker, port=DEFAULT_PORT, userid=DEFAULT_USER, password=DEFAULT_PASSWORD) as connection:
with kombu.Connection(broker_url(hostname=broker, port=DEFAULT_PORT, userid=DEFAULT_USER, password=DEFAULT_PASSWORD)) as connection:
queue = kombu.Queue(name,
durable=durable,
auto_delete=auto_delete,
......@@ -335,7 +335,7 @@ def delete_queue(name: str, broker: str=DEFAULT_BROKER, log_level=logging.DEBUG)
:return True if deleted, False if not-deleted (because it does not exist)
"""
try:
with kombu.Connection(hostname=broker, port=DEFAULT_PORT, userid=DEFAULT_USER, password=DEFAULT_PASSWORD) as connection:
with kombu.Connection(broker_url(hostname=broker, port=DEFAULT_PORT, userid=DEFAULT_USER, password=DEFAULT_PASSWORD)) as connection:
queue = kombu.Queue(name, no_declare=True, channel=connection)
try:
queue.queue_declare(channel=connection.default_channel, passive=True)
......@@ -355,7 +355,7 @@ def queue_exists(name: str, broker: str=DEFAULT_BROKER) -> bool:
:return True if it exists, False if not.
"""
try:
with kombu.Connection(hostname=broker, port=DEFAULT_PORT, userid=DEFAULT_USER, password=DEFAULT_PASSWORD) as connection:
with kombu.Connection(broker_url(hostname=broker, port=DEFAULT_PORT, userid=DEFAULT_USER, password=DEFAULT_PASSWORD)) as connection:
queue = kombu.Queue(name, no_declare=True, channel=connection)
try:
queue.queue_declare(channel=connection.default_channel, passive=True)
......@@ -389,7 +389,7 @@ def create_binding(exchange: str, queue: str, routing_key: str='#', durable: boo
:param log_level: optional logging level (to add/reduce spamming)
"""
try:
with kombu.Connection(hostname=broker, port=DEFAULT_PORT, userid=DEFAULT_USER, password=DEFAULT_PASSWORD) as connection:
with kombu.Connection(broker_url(hostname=broker, port=DEFAULT_PORT, userid=DEFAULT_USER, password=DEFAULT_PASSWORD)) as connection:
kombu_exchange = kombu.Exchange(exchange, durable=durable, type='topic', no_declare=True)
kombu_queue = kombu.Queue(queue, exchange=kombu_exchange, routing_key=routing_key, durable=durable, no_declare=True)
if not kombu_queue.is_bound:
......@@ -483,7 +483,7 @@ class _AbstractBus:
return
logger.debug("[%s] Connecting to broker: %s", self.__class__.__name__, self.broker)
self._connection = kombu.Connection(hostname=self.broker, port=DEFAULT_PORT, userid=DEFAULT_USER, password=DEFAULT_PASSWORD)
self._connection = kombu.Connection(broker_url(hostname=self.broker, port=DEFAULT_PORT, userid=DEFAULT_USER, password=DEFAULT_PASSWORD))
self._connection.connect()
logger.debug("[%s] Connected to broker: %s (%s)", self.__class__.__name__, self.broker, self.connection_name)
......
......@@ -374,7 +374,7 @@ class FromBusInitFailed(unittest.TestCase):
Connecting to broker on wrong port must raise MessageBusError
"""
with self.assertRaisesRegex(MessageBusError, ".*failed to resolve broker hostname"):
with FromBus("fake" + self.test_queue.address, broker="localhost:4"):
with FromBus("fake" + self.test_queue.address, broker="fdjsafhdjlahflaieoruieow"):
pass
......@@ -445,7 +445,7 @@ class ToBusInitFailed(unittest.TestCase):
Connecting to broker on wrong port must raise MessageBusError
"""
with self.assertRaisesRegex(MessageBusError, ".*failed to resolve broker hostname"):
with ToBus(self.test_exchange.address, broker="localhost:4"):
with ToBus(self.test_exchange.address, broker="fhjlahfowuefohwaueif"):
pass
......@@ -886,8 +886,17 @@ class ReconnectOnConnectionLossTests(unittest.TestCase):
self.tmp_exchange.close()
self.assertFalse(exchange_exists(tmp_exchange_address))
def _can_connect_to_rabbitmq_admin_site(self, hostname: str):
try:
url = 'http://%s:15672/api' % (hostname,)
return requests.get(url, auth=(DEFAULT_USER, DEFAULT_PASSWORD)).status_code in [200, 202]
except requests.ConnectionError:
return False
def _close_connection_of_bus_on_broker(self, bus: _AbstractBus):
if not self._can_connect_to_rabbitmq_admin_site(bus.broker):
raise unittest.SkipTest("Cannot connect tot RabbitMQ admin server to close connection %s" % (bus.connection_name))
# use the http REST API using request to forcefully close the connection on the broker-side
url = "http://%s:15672/api/connections/%s" % (bus.broker, bus.connection_name)
......
......@@ -6,6 +6,7 @@ lofar_find_package(Python 3.4 REQUIRED)
include(PythonInstall)
include(FindPythonModule)
find_python_module(jsonschema)
find_python_module(psycopg2)
set(_py_files
......@@ -24,6 +25,7 @@ set(_py_files
postgres.py
datetimeutils.py
flask_utils.py
h5_utils.py
subprocess_utils.py
xmlparse.py
json_utils.py
......
# Copyright (C) 2012-2015 ASTRON (Netherlands Institute for Radio Astronomy)
# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
import os.path
from datetime import datetime, timedelta
from time import sleep
import errno
import os
# prevent annoying h5py future/deprecation warnings
os.environ["TF_CPP_MIN_LOG_LEVEL"]="3"
import h5py
import logging
logger = logging.getLogger(__name__)
class SharedH5File():
"""
Wrapper class aroung h5py.File to open an hdf5 file in read, write, or read/write mode safely,
even when the file might be used simultanously by other processes.
It waits for <timeout> seconds until the file becomes available.
Example usage:
with SharedH5File("foo.h5", 'r') as file:
file["foo"] = "bar"
"""
def __init__(self, path, mode='r', timeout=900):
self._path = path
self._mode = mode
self._timeout = timeout
self._file = None
def open(self):
start_timestamp = datetime.utcnow()
while self._file is None:
try:
self._file = h5py.File(self._path, self._mode)
return self._file
except IOError:
if self._path.startswith('~'):
# try again with tilde replaced
self._path = os.path.expanduser(self._path)
continue
if not os.path.exists(self._path):
raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), self._path)
logger.warning("Cannot open file '%s' with mode '%s'. Trying again in 1 sec...",
self._path, self._mode)
sleep(max(0, min(1, self._timeout)))
if datetime.utcnow() - start_timestamp > timedelta(seconds=self._timeout):
logger.error("Cannot open file '%s' with mode '%s', even after trying for %s seconds",
self._path, self._mode, self._timeout)
raise
def close(self):
self._file.close()
self._file = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
return self.close()
class WorkingDirContext():
"""
A WorkingDirContext can be used to make sure that the raw-sibling-file of an h5 file can be read,
because the working dir needs to be the dir of the h5 file (h5py demands that).
When leaving this context, the working dir is restored to the situation before the context was entered.
Example usage:
with h5py.File("foo.h5", "r") as my_h5_file:
with WorkingDirContext(my_h5_file):
my_h5_file["foo"] = "bar"
"""
def __init__(self, h5_file: h5py.File):
self._h5_file = h5_file
self._original_cwd = os.getcwd()
def change_dir_to_h5_file(self):
self._original_cwd = os.getcwd()
os.chdir(os.path.dirname(self._h5_file.filename))
def change_dir_to_original(self):
os.chdir(self._original_cwd)
def __enter__(self):
self.change_dir_to_h5_file()
return self._h5_file
def __exit__(self, exc_type, exc_val, exc_tb):
self.change_dir_to_original()
......@@ -27,7 +27,14 @@ IF(BUILD_TESTING)
lofar_add_test(t_methodtrigger)
lofar_add_test(t_util)
lofar_add_test(t_test_utils)
lofar_add_test(t_json_utils)
lofar_add_test(t_cep4_utils)
lofar_add_test(t_postgres)
IF(PYTHON_JSONSCHEMA)
lofar_add_test(t_json_utils)
ENDIF()
IF(PYTHON_PSYCOPG2 AND PYTHON_TESTING.POSTGRESQL)
lofar_add_test(t_postgres)
ENDIF()
ENDIF()
\ No newline at end of file
......@@ -48,4 +48,4 @@ def assertEqualXML(test, expected):
integration_test = unittest.skipIf(os.environ.get('SKIP_INTEGRATION_TESTS', default='False').lower() in ['1', 'true'],
'Integration tests are disabled via env SKIP_INTEGRATION_TESTS')
unit_test = unittest.skipIf(os.environ.get('SKIP_UNIT_TESTS', default='False').lower() in ['1', 'true'],
'Unit tests are disabled via env SKIP_UNIT_TESTS')
\ No newline at end of file
'Unit tests are disabled via env SKIP_UNIT_TESTS')
......@@ -55,10 +55,11 @@ class ObservationControlHandler(ServiceMessageHandler):
killed = False
pid_line = self.connection.run('pidof ObservationControl').stdout
pid_line = self.connection.run('/usr/sbin/pidof ObservationControl').stdout.strip('\n')
pids = pid_line.split(' ')
for pid in pids:
logger.info("Running: ps -p %s --no-heading -o command | awk -F[{}] '{ printf $2; }'", pid)
pid_sas_id = self.connection.run(
"ps -p %s --no-heading -o command | awk -F[{}] '{ printf $2; }'" % pid).stdout
if str(pid_sas_id) == str(sas_id):
......
......@@ -19,4 +19,5 @@
lofar_add_bin_scripts(show_hdf5_info
find_hdf5
add_parset_to_hdf5
create_test_hypercube)
#!/usr/bin/env python3
# Copyright (C) 2012-2015 ASTRON (Netherlands Institute for Radio Astronomy)
# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
if __name__ == '__main__':
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
level=logging.INFO)
import os
import os.path
from optparse import OptionParser
from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME
# make sure we run in UTC timezone
os.environ['TZ'] = 'UTC'
# Check the invocation arguments
parser = OptionParser(usage='add_parset_to_hdf5 <hdf5_file>',
description='fetch the latest parset from OTDB and store it in the given hdf5 file.')
parser.add_option('-b', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, help='Address of the message broker, default: %default')
parser.add_option('-e', '--exchange', dest = 'exchange', type = 'string', default = DEFAULT_BUSNAME, help = 'Name of the bus exchange on the broker, default: %default')
(options, args) = parser.parse_args()
if len(args) != 1:
parser.print_help()
exit(-1)
hdf_path = os.path.expanduser(args[0])
from lofar.qa.hdf5_io import add_parset_to_hypercube
from lofar.sas.otdb.otdbrpc import OTDBRPC
with OTDBRPC.create(exchange=options.exchange, broker=options.broker) as otdbrpc:
add_parset_to_hypercube(hdf_path, otdbrpc)
......@@ -45,7 +45,6 @@ def main():
if len(args) != 1:
print('Please provide a file name for the h5 file which you want to create...\n')
parser.print_help()
exit(1)
......
......@@ -25,7 +25,7 @@ if __name__ == '__main__':
import os.path
import sys
import fnmatch
import glob
from pathlib import Path
from optparse import OptionParser, OptionGroup
from datetime import datetime, timedelta
......@@ -99,10 +99,10 @@ if __name__ == '__main__':
path = os.path.dirname(os.path.expanduser(args[0]) if len(args) == 1 else os.getcwd())
files = glob.glob(os.path.join(path, '*.h*5'))
files = [str(p) for p in Path(path).glob('*.h*5')]
if path == os.getcwd():
files = [os.path.basename(file) for file in files]
if not files:
print("Could not find any h5/hdf5 files in", path)
files = sorted(files)
......@@ -174,9 +174,9 @@ if __name__ == '__main__':
if options.info:
for file in files:
print read_info_from_hdf5(file, read_data_info=False)
print(read_info_from_hdf5(file, read_data_info=False))
else:
print '\n'.join(files)
print('\n'.join(files))
......@@ -26,7 +26,6 @@ if __name__ == '__main__':
from optparse import OptionParser
from lofar.qa.hdf5_io import *
from lofar.parameterset import *
# make sure we run in UTC timezone
os.environ['TZ'] = 'UTC'
......@@ -40,7 +39,7 @@ if __name__ == '__main__':
(options, args) = parser.parse_args()
if len(args) != 1:
print parser.print_help()
parser.print_help()
exit(-1)
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
......@@ -48,4 +47,10 @@ if __name__ == '__main__':
hdf_path = os.path.expanduser(args[0])
print read_info_from_hdf5(hdf_path, read_data_info=options.data, read_parset_info=True)
info_string = read_info_from_hdf5(hdf_path, read_data_info=options.data, read_parset_info=True)
if info_string:
print(info_string)
else:
print("Could not find any info in", hdf_path)
exit(1)
This diff is collapsed.
......@@ -116,7 +116,7 @@ def create_hypercube(num_saps=3, num_stations=5, num_timestamps=11, num_subbands
# generate 'ticks' along the central_frequencies-axes
# fill the HBA frequency range of 120-240MHz
central_frequencies = [120e6+i*120e6/max(1,num_subbands-1) for i in range(num_subbands)]
sb_offset = sum([len(sap['subbands']) for sap in list(data.values())])
sb_offset = sum([len(sap['subbands']) for sap in data.values()])
subbands = [i for i in range(sb_offset, sb_offset+num_subbands)]
# create some synthetic antenna locations
......
......@@ -19,5 +19,6 @@
include(LofarCTest)
lofar_add_test(t_hdf5_io)
set_tests_properties(t_hdf5_io PROPERTIES TIMEOUT 300)
This diff is collapsed.
......@@ -20,36 +20,57 @@
import logging
logger = logging.getLogger(__name__)
from lofar.common.cep4_utils import *
from subprocess import call
from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RADBRPC
from lofar.messaging import ToBus, EventMessage, DEFAULT_BROKER, adaptNameToEnvironment
from optparse import OptionParser, OptionGroup
from datetime import datetime
from datetime import datetime, timedelta
if __name__ == '__main__':
# Check the invocation arguments
parser = OptionParser("%prog <otdb_id>",description='enqueue a conversion from MS to hdf5, cluster the baselines, and create the default plots in the qa_service.')
parser = OptionParser("%prog -o <otdb_id>",description='enqueue a conversion from MS to hdf5, cluster the baselines, and create the default plots in the qa_service.')
group = OptionGroup(parser, 'Messaging options')
group.add_option('-b', '--broker', dest='broker', type='string', default=DEFAULT_BROKER,
help='Address of the messaging broker, default: %default')
group.add_option('-q', "--queue", dest="queue", type="string",
default=adaptNameToEnvironment('lofar.queue.for.qa_service.QAService.on.OTDB.notification.TaskStatus'),
default=adaptNameToEnvironment('lofar.queue.for.qa_service.QAFilteredOTDBBusListener.on.filtered.OTDB.notification.TaskStatus'),
help="queue where the QAService listens for the notification messages. [default: %default]")
group.add_option('-o', '--otdb_id', dest="otdb_id", type="int",
help="the otdb_id of the task which needs to be enqueued for qa plots")
group.add_option('-a', '--all', dest="all", action='store_true',
help="enqueue all tasks which were not converted yet in the past two weeks. (pipelines are prioritized 2 below observations)")
group.add_option('-p', '--priority', dest="priority", type="int", default=4,
help="priority of the enqueued task. (low=0, normal=4, high=9) [default: %default]")
parser.add_option_group(group)
(options, args) = parser.parse_args()
if len(args) != 1:
if (not options.otdb_id and not options.all) or (options.otdb_id and options.all):
print(parser.print_help())
exit(-1)
otdb_id = int(args[0])
options.priority = min(9, max(1, options.priority))
if options.all:
with RADBRPC.create(broker=options.broker) as rpc:
tasks = rpc.getTasks(lower_bound=datetime.utcnow()-timedelta(days=14), task_type=['observation','pipeline'], task_status=['finished'])
otdb_id_priorities = [(t['otdb_id'], options.priority if t['type']=='observation' else max(1, options.priority-2))
for t in tasks]
else:
otdb_id_priorities = [(options.otdb_id, options.priority)]
#config logging
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
with ToBus(exchange='', broker=options.broker) as tobus:
for status in ['completing', 'finished']:
content = {"treeID": otdb_id, "state": status, "time_of_change": datetime.utcnow() }
msg = EventMessage(subject=options.queue, content=content, priority=options.priority)
logging.info("sending: %s", msg)
tobus.send(msg)
for otdb_id, priority in otdb_id_priorities:
plots_path = '/qa/plots/L%s' % otdb_id
cmd = ['ls', plots_path, '>&', '/dev/null']
cmd = wrap_command_in_cep4_head_node_ssh_call(cmd)
if call(cmd, stdout=None, stderr=None) != 0:
for status in ['completing', 'finished']:
content = {"treeID": otdb_id, "state": status, "time_of_change": datetime.utcnow() }
msg = EventMessage(subject=options.queue, content=content, priority=priority)
logging.info("sending: %s", msg)
tobus.send(msg)
......@@ -34,7 +34,8 @@ from lofar.sas.otdb.otdbrpc import OTDBRPC
logger = logging.getLogger(__name__)
QA_BASE_DIR = '/data/qa'
QA_LUSTRE_BASE_DIR = '/data/qa'
QA_NFS_BASE_DIR = '/qa'
DEFAULT_FILTERED_OTDB_NOTIFICATION_SUBJECT = "filtered.%s" % (DEFAULT_OTDB_NOTIFICATION_SUBJECT,)
#TODO: idea: convert periodically while observing?
......@@ -83,9 +84,8 @@ class QAFilteredOTDBBusListener(BusListener):
upon observation/pipeline completion. The qa processes convert MS (measurement sets) to hdf5 qa files,
and then starts generating plots from the hdf5 file.
'''
def __init__(self, qa_base_dir: str=QA_BASE_DIR):
def __init__(self):
super().__init__()
self.qa_base_dir = qa_base_dir
self._unfinished_otdb_id_map = {}
def onObservationCompleting(self, otdb_id, modificationTime):
......@@ -117,13 +117,13 @@ class QAFilteredOTDBBusListener(BusListener):
del self._unfinished_otdb_id_map[otdb_id]
try:
# import here and not at top of file
# because on some systems h5py is not available
# and then only this method fails, which is less bad than the whole service failing.
from lofar.qa.hdf5_io import add_parset_to_hypercube
cmd = ['add_parset_to_hdf5', hdf5_file_path]
cmd = wrap_command_for_docker(cmd, 'adder', 'latest')
cmd = wrap_command_in_cep4_random_node_ssh_call(cmd, partition=SLURM_CPU_PARTITION, via_head=True)
with OTDBRPC.create(exchange=self.exchange, broker=self.broker) as otdbrpc:
add_parset_to_hypercube(hdf5_file_path, otdbrpc)
logger.info(' '.join(cmd))
if call(cmd) == 0:
self._copy_hdf5_to_nfs_dir(hdf5_file_path)
except Exception as e:
logger.warning("Cannot add parset with feedback for otdb=%s. error: %s", otdb_id, e)
else:
......@@ -161,7 +161,10 @@ class QAFilteredOTDBBusListener(BusListener):
# cluster it
self._cluster_h5_file(hdf5_file_path, otdb_id)
self._copy_hdf5_to_nfs_dir(hdf5_file_path)
plot_dir_path = self._create_plots_for_h5_file(hdf5_file_path, otdb_id)
plot_dir_path = self._move_plots_to_nfs_dir(plot_dir_path)
# and notify that we're finished
self._send_event_message('Finished', {'otdb_id': otdb_id,
......@@ -189,7 +192,7 @@ class QAFilteredOTDBBusListener(BusListener):
try:
# define default h5 filename use default cep4 qa output dir
h5_filename = 'L%s.MS_extract.h5' % otdb_id
h5_dir_path = os.path.join(self.qa_base_dir, 'ms_extract')
h5_dir_path = os.path.join(QA_LUSTRE_BASE_DIR, 'ms_extract')
hdf5_path = os.path.join(h5_dir_path, h5_filename)
cmd = ['ls', hdf5_path]
......@@ -234,7 +237,7 @@ class QAFilteredOTDBBusListener(BusListener):
'''
try:
#use default cep4 qa output dir.
plot_dir_path = os.path.join(self.qa_base_dir, 'inspectionplots')
plot_dir_path = os.path.join(QA_LUSTRE_BASE_DIR, 'plots')
task_plot_dir_path = ''
all_plots_succeeded = True
......@@ -261,11 +264,12 @@ class QAFilteredOTDBBusListener(BusListener):
logger.error(msg)
self._send_event_message('Error', {'otdb_id': otdb_id,
'message': msg})
if all_plots_succeeded:
self._send_event_message('CreatedInspectionPlots', {'otdb_id': otdb_id,
'hdf5_file_path': hdf5_path,
'plot_dir_path': task_plot_dir_path})
return task_plot_dir_path
self._send_event_message('CreatedInspectionPlots', {'otdb_id': otdb_id,
'hdf5_file_path': hdf5_path,
'plot_dir_path': task_plot_dir_path})
return task_plot_dir_path
except Exception as e:
logging.exception('error in _create_plots_for_h5_file: %s', e)
self._send_event_message('Error', {'otdb_id': otdb_id, 'message': str(e)})
......@@ -282,7 +286,7 @@ class QAFilteredOTDBBusListener(BusListener):
try:
# define default h5 filename use default cep4 qa output dir
h5_filename = 'L%s.MS_extract.h5' % otdb_id
h5_dir_path = os.path.join(self.qa_base_dir, 'ms_extract')
h5_dir_path = os.path.join(QA_LUSTRE_BASE_DIR, 'ms_extract')
hdf5_path = os.path.join(h5_dir_path, h5_filename)
cmd = ['ls', hdf5_path]
......@@ -319,6 +323,40 @@ class QAFilteredOTDBBusListener(BusListener):
self._send_event_message('Error', {'otdb_id': otdb_id, 'message': str(e)})
return None
def _copy_hdf5_to_nfs_dir(self, hdf5_path):
try:
hdf5_filename = os.path.basename(hdf5_path)
hdf5_nfs_path = os.path.join(QA_NFS_BASE_DIR, 'h5', hdf5_filename)
cmd = ['cp', hdf5_path, hdf5_nfs_path]
cmd = wrap_command_in_cep4_head_node_ssh_call(cmd)
logger.debug('copying h5 file to nfs dir: %s', ' '.join(cmd))
if call(cmd) == 0:
logger.info('copied h5 file to nfs dir: %s -> %s', hdf5_path, hdf5_nfs_path)
return hdf5_nfs_path
except Exception as e: