Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
util.py 6.55 KiB
# util.py: utils for lofar software
#
# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
# $Id: util.py 1584 2015-10-02 12:10:14Z loose $
#
"""
This package contains different utilities that are common for LOFAR software
"""

import sys
import os, os.path
import time


def check_bit(value, bit):
    """
        function to check if a given bit is set
        :param value: value to check
        :param bit: bit to be checked
        :return: true or false
    """
    return bool(value & (1 << bit))


def set_bit(value, bit):
    """
        function to set a bit in a given value
        :param value: value to set bit in
        :param bit: bit to set
        :return: value after the bit is set
    """
    return value | (1 << bit)


def clear_bit(value, bit):
    """
        function to clear a given bit
        :param value: value to clear the bit in
        :param bit: bit to clear
        :return: value after the bit is cleared
    """
    return value & ~(1 << bit)


def feq(val_a, val_b, rtol=1e-05, atol=1e-08):
    """

    :param val_a: float a to compare with
    :param val_b: float b
    :param rtol: The relative tolerance parameter
    :param atol: The absolute tolerance parameter
    :return:
    """
    return abs(val_a - val_b) <= rtol * (abs(val_a) + abs(val_b)) + atol


def chunker(seq, size):
    """
    function to divide a list into equal chunks
    :param seq: initial list
    :param size: size of the chunks
    :return:
    """
    return (seq[pos:pos + size] for pos in range(0, len(seq), size))


def raise_exception(cls, msg):
    """
    Raise an exception of type `cls`. Augment the exception message `msg` with
    the type and value of the last active exception if any.
    :param cls: type of exception that will be raised
    :param msg: exception message
    """
    exc_type, exc_val = sys.exc_info()[:2]
    if exc_type is not None:
        msg = "%s [%s: %s]" % (msg, exc_type.__name__, exc_val)
    raise cls(msg)


def isIntList(lst):
    """
    function to see if a value is a list of ints
    :param lst: the list that needs to be examined
    :return:  True if it is a list of ints otherwise False
    """
    if not isinstance(lst, list):
        return False
    return all(isinstance(x, int) for x in lst)


def isFloatList(lst):
    """
    function to see if a value is a list of floats
    :param lst: the list that needs to be examined
    :return:  True if it is a list of ints otherwise False
    """
    if not isinstance(lst, list):
        return False
    return all(isinstance(x, float) for x in lst)


def waitForInterrupt():
    """
    Useful (low cpu load) loop that waits for keyboard interrupt.
    """
    while True:
        try:
            time.sleep(10)
        except KeyboardInterrupt:
            break


def humanreadablesize(num, suffix='B', base=1000):
    """ converts the given size (number) to a human readable string in powers of 'base'"""
    try:
        for unit in ['', 'K', 'M', 'G', 'T', 'P', 'E', 'Z']:
            if abs(num) < float(base):
                return "%3.1f%s%s" % (num, unit, suffix)
            num /= float(base)
        return "%.2f%s%s" % (num, 'Y', suffix)
    except TypeError:
        return str(num)

def convertIntKeysToString(dct):
    '''recursively convert all int keys in a dict to string'''
    return {str(k): convertIntKeysToString(v) if isinstance(v, dict) else v for k,v in dct.items()}

def convertStringDigitKeysToInt(dct):
    '''recursively convert all string keys which are a digit in a dict to int'''
    return {int(k) if isinstance(k, str) and k.isdigit() else k : convertStringDigitKeysToInt(v) if isinstance(v, dict) else v for k,v in dct.items()}

def to_csv_string(values):
    return ','.join(str(x) for x in values)

def is_iterable(thing):
    try:
        iter(thing)
        return True
    except TypeError:
        return False

def program_name(include_extension=True):
    """gets the name of the current running program
    :param include_extension bool: include the program name's extension (if present), otherwise cut it (if present)
    :return:  the name of the current running program
    """
    name = os.path.basename(os.path.realpath(sys.argv[0]))
    if not include_extension:
        name, extension = os.path.splitext(name)
    return name


def is_empty_function(func):
    """returns True when the given function/method 'func' has an empty 'pass' body (ignoring optional docstrings)"""
    def __empty_func():
        pass

    def __empty_func_with_doc():
        """Empty function with docstring."""
        pass

    return (func.__code__.co_code == __empty_func.__code__.co_code or \
            func.__code__.co_code == __empty_func_with_doc.__code__.co_code) and \
            func.__code__.co_consts[-1] == None

def single_line_with_single_spaces(lines: str) -> str:
    '''return the given lines as a single line with no extra leftover indentation spaces/tabs'''
    if not isinstance(lines, str):
        lines = str(lines)

    line = lines.replace('\n', ' ').replace('\t', ' ')
    length = len(line)
    while True:
        line = line.replace('  ', ' ')
        new_length = len(line)
        if new_length == length:
            return line
        length = new_length

def find_free_port(preferred_port: int=0):
    '''find and return a random free network port, preferably the given <preferred_port>'''
    import socket
    from contextlib import closing

    with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
        try:
            s.bind(('', preferred_port))
        except OSError as e:
            if e.errno==98: # OSError: [Errno 98] Address already in use
                # OS will find a available port with bind option 0
                s.bind(('', 0))
            else:
                raise

        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        return s.getsockname()[1]