-
Jorrit Schaap authoredJorrit Schaap authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
path.py 13.05 KiB
#!/usr/bin/env python3
import os
import os.path
import logging
import socket
import subprocess
from lofar.common import isProductionEnvironment, isTestEnvironment
from lofar.common.subprocess_utils import communicate_returning_strings
from lofar.sas.datamanagement.common.config import CEP4_DATA_MOUNTPOINT
from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC as RADBRPC
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME as RADB_BUSNAME
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_SERVICENAME as RADB_SERVICENAME
from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC
from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME
logger = logging.getLogger(__name__)
class PathResolver:
def __init__(self,
mountpoint=CEP4_DATA_MOUNTPOINT,
radb_busname=RADB_BUSNAME,
radb_servicename=RADB_SERVICENAME,
mom_busname=DEFAULT_MOMQUERY_BUSNAME,
mom_servicename=DEFAULT_MOMQUERY_SERVICENAME,
broker=None):
self.mountpoint = mountpoint
self.projects_path = os.path.join(self.mountpoint, 'projects' if isProductionEnvironment() else 'test-projects')
self.scratch_path = os.path.join(self.mountpoint, 'scratch', 'pipeline')
self.share_path = os.path.join(self.mountpoint, 'share', 'pipeline')
self.radbrpc = RADBRPC(busname=radb_busname, servicename=radb_servicename, broker=broker)
self.momrpc = MoMQueryRPC(busname=mom_busname, servicename=mom_servicename, broker=broker)
def open(self):
self.radbrpc.open()
self.momrpc.open()
def close(self):
self.radbrpc.close()
self.momrpc.close()
def __enter__(self):
self.open()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def getPathForRADBId(self, radb_id):
logger.debug("Get path for radb_id %s" % (radb_id,))
return self.getPathForTask(radb_id=radb_id)
def getPathForMoMId(self, mom_id):
logger.debug("Get path for mom_id %s" % (mom_id,))
return self.getPathForTask(mom_id=mom_id)
def getPathForOTDBId(self, otdb_id):
logger.debug("Get path for otdb_id %s" % (otdb_id,))
return self.getPathForTask(otdb_id=otdb_id)
def getPathForTask(self, radb_id=None, mom_id=None, otdb_id=None, include_scratch_paths=True):
logger.info("getPathForTask(radb_id=%s, mom_id=%s, otdb_id=%s)", radb_id, mom_id, otdb_id)
'''get the path for a task for either the given radb_id, or for the given mom_id, or for the given otdb_id'''
result = self._getProjectPathAndDetails(radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id)
if result['found']:
project_path = result['path']
task = result['task']
task_data_path = os.path.join(project_path, 'L%s' % task['otdb_id'])
logger.info("constructed path '%s' for otdb_id=%s mom_id=%s radb_id=%s" % (task_data_path, task['otdb_id'], task['mom_id'], task['id']))
path_result = {'found': True, 'message': '', 'path': task_data_path,
'radb_id': task.get('id'), 'mom_id': task.get('mom_id'), 'otdb_id': task.get('otdb_id')}
if include_scratch_paths and task['type'] == 'pipeline':
path_result['scratch_paths'] = []
scratch_path = os.path.join(self.scratch_path, 'Observation%s' % task['otdb_id'])
share_path = os.path.join(self.share_path, 'Observation%s' % task['otdb_id'])
logger.info("Checking scratch paths %s %s for otdb_id=%s mom_id=%s radb_id=%s" % (scratch_path, share_path, task['otdb_id'], task['mom_id'], task['id']))
if self.pathExists(scratch_path):
path_result['scratch_paths'].append(scratch_path)
if self.pathExists(share_path):
path_result['scratch_paths'].append(share_path)
logger.info("result for getPathForTask(radb_id=%s, mom_id=%s, otdb_id=%s): %s", radb_id, mom_id, otdb_id, path_result)
return path_result
result = {'found': False, 'message': result.get('message', ''), 'path': '',
'radb_id': radb_id, 'mom_id': mom_id, 'otdb_id': otdb_id}
logger.warn("result for getPathForTask(radb_id=%s, mom_id=%s, otdb_id=%s): %s", radb_id, mom_id, otdb_id, result)
return result
def _getProjectPathAndDetails(self, radb_id=None, mom_id=None, otdb_id=None):
'''get the project path and details of a task for either the given radb_id, or for the given mom_id, or for the given otdb_id'''
ids = [radb_id, mom_id, otdb_id]
validIds = [x for x in ids if x != None and isinstance(x, int)]
if len(validIds) != 1:
raise KeyError("Provide one and only one id: radb_id=%s, mom_id=%s, otdb_id=%s" % (radb_id, mom_id, otdb_id))
task = self.radbrpc.getTask(id=radb_id, mom_id=mom_id, otdb_id=otdb_id)
if not task:
message = "Could not find task in RADB for radb_id=%s, mom_id=%s, otdb_id=%s" % (radb_id, mom_id, otdb_id)
logger.error(message)
return {'found': False, 'message': message, 'path': None}
logger.info("found radb task with radb_id=%s mom_id=%s and otdb_id=%s" % (task['id'], task['mom_id'], task['otdb_id']))
mom_details = self.momrpc.getObjectDetails(task['mom_id'])
if not mom_details or int(task['mom_id']) not in mom_details:
message = "Could not find mom project details for otdb_id=%s mom_id=%s radb_id=%s" % (task['otdb_id'], task['mom_id'], task['id'])
logger.error(message)
return {'found': False, 'message': message, 'path': None}
project_name = mom_details[task['mom_id']]['project_name']
logger.info("found project '%s' for otdb_id=%s mom_id=%s radb_id=%s" % (project_name, task['otdb_id'], task['mom_id'], task['id']))
project_path = os.path.join(self.projects_path, "_".join(project_name.split()))
return {'found': True, 'path': project_path, 'mom_details':mom_details, 'task':task}
def getProjectPath(self, radb_id=None, mom_id=None, otdb_id=None):
result = self._getProjectPathAndDetails(radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id)
if result['found']:
del result['mom_details']
del result['task']
return result
def getProjectDirAndSubDirectories(self, radb_id=None, mom_id=None, otdb_id=None, project_name=None):
'''get the project directory and its subdirectories of either the project_name, or the task's project for either the given radb_id, or for the given mom_id, or for the given otdb_id'''
if project_name:
project_path = os.path.join(self.projects_path, "_".join(project_name.split()))
return self.getSubDirectories(project_path)
result = self.getProjectPath(radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id)
if result['found']:
return self.getSubDirectories(result['path'])
return result
def getSubDirectoriesForOTDBId(self, otdb_id):
return self.getSubDirectoriesForTask(otdb_id=otdb_id)
def getSubDirectoriesForMoMId(self, mom_id):
return self.getSubDirectoriesForTask(mom_id=mom_id)
def getSubDirectoriesForRADBId(self, radb_id):
return self.getSubDirectoriesForTask(radb_id=radb_id)
def getSubDirectoriesForTask(self, radb_id=None, mom_id=None, otdb_id=None):
result = self.getPathForTask(radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id)
if result['found']:
return self.getSubDirectories(result['path'])
return result
def getSubDirectories(self, path):
logger.debug('getSubDirectories(%s)', path)
# get the subdirectories of the given path
cmd = ['lfs', 'find', '--type', 'd', '--maxdepth', '1', path.rstrip('/')]
hostname = socket.gethostname()
if not 'head' in hostname:
cmd = ['ssh', 'lofarsys@head.cep4.control.lofar'] + cmd
logger.debug(' '.join(cmd))
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = communicate_returning_strings(proc)
if proc.returncode != 0:
# lfs puts it's error message in stdout
logger.error(out + err)
return {'found': False, 'path': path, 'message': out + err}
# parse out, clean lines and skip first line which is path itself.
lines = [l.strip() for l in out.split('\n')][1:]
subdir_names = [l.split('/')[-1].strip().strip('/') for l in lines if l]
result = {'found': True, 'path': path, 'sub_directories': subdir_names}
logger.debug('getSubDirectories(%s) result: %s', path, result)
return result
def pathExists(self, path):
cmd = ['lfs', 'ls', path]
hostname = socket.gethostname()
if not 'head' in hostname:
cmd = ['ssh', 'lofarsys@head.cep4.control.lofar'] + cmd
logger.debug(' '.join(cmd))
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = communicate_returning_strings(proc)
if proc.returncode != 0 and 'No such file or directory' in err:
return False
return True
def main():
import sys
from optparse import OptionParser
# Check the invocation arguments
parser = OptionParser('%prog [options]',
description='get path for otdb_id/mom_id/radb_id')
parser.add_option('-p', '--path', dest='path', action='store_true', help='get the path for the given otdb_id/mom_id/radb_id')
parser.add_option('-P', '--project', dest='project', action='store_true', help='get the project path and all its sub directories for the given otdb_id/mom_id/radb_id')
parser.add_option('-s', '--subdirs', dest='subdirs', action='store_true', help='get the sub directories of the path for the given otdb_id/mom_id/radb_id')
parser.add_option('-o', '--otdb_id', dest='otdb_id', type='int', default=None, help='otdb_id of task to get the path for')
parser.add_option('-m', '--mom_id', dest='mom_id', type='int', default=None, help='mom_id of task to get the path for')
parser.add_option('-r', '--radb_id', dest='radb_id', type='int', default=None, help='radb_id of task to get the path for')
parser.add_option('-q', '--broker', dest='broker', type='string', default=None, help='Address of the qpid broker, default: localhost')
parser.add_option("--mountpoint", dest="mountpoint", type="string", default=CEP4_DATA_MOUNTPOINT, help="path of local cep4 mount point, default: %default")
parser.add_option("--radb_busname", dest="radb_busname", type="string", default=RADB_BUSNAME, help="Name of the bus on which the RADB service listens, default: %default")
parser.add_option("--radb_servicename", dest="radb_servicename", type="string", default=RADB_SERVICENAME, help="Name of the RADB service, default: %default")
parser.add_option("--mom_busname", dest="mom_busname", type="string", default=DEFAULT_MOMQUERY_BUSNAME, help="Name of the bus on which the MoM service listens, default: %default")
parser.add_option("--mom_servicename", dest="mom_servicename", type="string", default=DEFAULT_MOMQUERY_SERVICENAME, help="Name of the MoM service, default: %default")
parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging')
(options, args) = parser.parse_args()
if not (options.otdb_id or options.mom_id or options.radb_id):
parser.print_help()
exit(1)
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
level=logging.INFO if options.verbose else logging.WARN)
with PathResolver(radb_busname=options.radb_busname, radb_servicename=options.radb_servicename, mom_busname=options.mom_busname, mom_servicename=options.mom_servicename, broker=options.broker) as path_resolver:
if options.path:
result = path_resolver.getPathForTask(otdb_id=options.otdb_id, mom_id=options.mom_id, radb_id=options.radb_id)
if result['found']:
print("path: %s" % (result['path']))
else:
print(result['message'])
exit(1)
if options.project:
result = path_resolver.getProjectDirAndSubDirectories(otdb_id=options.otdb_id, mom_id=options.mom_id, radb_id=options.radb_id)
if result['found']:
print("projectpath: %s" % (result['path']))
print("subdirectories: %s" % (' '.join(result['sub_directories'])))
else:
print(result['message'])
exit(1)
if options.subdirs:
result = path_resolver.getSubDirectoriesForTask(otdb_id=options.otdb_id, mom_id=options.mom_id, radb_id=options.radb_id)
if result['found']:
print("path: %s" % (result['path']))
print("subdirectories: %s" % (' '.join(result['sub_directories'])))
else:
print(result['message'])
exit(1)
if __name__ == '__main__':
main()