Commit 8d943635 authored by Jorrit Schaap's avatar Jorrit Schaap

saved production edits from post stop-day tweaks and fixes

parent 980c567c
......@@ -19,12 +19,12 @@
try:
import proton
import proton.utils
import uuid
MESSAGING_ENABLED = True
except ImportError:
from . import noqpidfallback as proton
MESSAGING_ENABLED = False
import uuid
import xml.dom.minidom as xml
import xml.parsers.expat as expat
from xml.sax.saxutils import escape
......@@ -223,7 +223,7 @@ class MessageContent(object):
def _property_list(self):
""" List of XML elements that are exposed as properties. """
return {
return {
"system": "message.header.system",
"headerVersion": "message.header.version",
"protocol": "message.header.protocol.name",
......@@ -288,4 +288,3 @@ if __name__ == "__main__":
m = MessageContent("FROM", "FORUSER", "SUMMARY", "PROTOCOL", "1.2.3", "11111", "22222")
print(str(m))
print(m.content())
......@@ -55,10 +55,11 @@ class ObservationControlHandler(ServiceMessageHandler):
killed = False
pid_line = self.connection.run('pidof ObservationControl').stdout
pid_line = self.connection.run('/usr/sbin/pidof ObservationControl').stdout.strip('\n')
pids = pid_line.split(' ')
for pid in pids:
logger.info("Running: ps -p %s --no-heading -o command | awk -F[{}] '{ printf $2; }'", pid)
pid_sas_id = self.connection.run(
"ps -p %s --no-heading -o command | awk -F[{}] '{ printf $2; }'" % pid).stdout
if str(pid_sas_id) == str(sas_id):
......
......@@ -50,35 +50,46 @@ def autocleanup_all_finished_ingested_pipelines(do_submit_to_autocleanup: bool,
finished_pipelines = radbrpc.getTasks(task_type='pipeline', task_status='finished', otdb_ids=otdb_ids_with_data_on_disk)
finished_pipeline_otdb_ids = [pipeline['otdb_id'] for pipeline in finished_pipelines if 'otdb_id' in pipeline]
finished_pipeline_mom_ids = [pipeline['mom_id'] for pipeline in finished_pipelines if 'mom_id' in pipeline]
dps_dict = momrpc.getDataProducts(finished_pipeline_mom_ids)
mom_details_dict = momrpc.getObjectDetails(finished_pipeline_mom_ids)
for pipeline in sorted(finished_pipelines, key=lambda t: t['endtime']):
for pipeline in sorted(finished_pipelines, key=lambda t: t['endtime'], reverse=False):
otdb_id = pipeline['otdb_id']
logger.debug("checking if finished pipeline with otdb_id %d has been fully ingested...", otdb_id)
mom2id = pipeline.get('mom_id')
if mom2id is None:
continue
dps = dps_dict.get(mom2id)
dps = momrpc.getDataProducts(mom2id).get(mom2id)
#import pprint
#pprint.pprint(dps)
if dps is None or len(dps) <= 0:
logger.debug("could not find dataproducts for otdb_id=%d mom2id=%s to check if they are all ingested...", otdb_id, mom2id)
continue
ingested_dps = [dp for dp in dps if dp['status'] == 'ingested']
ingestable_dps = [dp for dp in dps if dp['status'] is not None and dp['fileformat'] != 'none']
ingested_dps = [dp for dp in ingestable_dps if dp['status'] == 'ingested']
not_ingested_dps = [dp for dp in ingestable_dps if dp['status'] != 'ingested']
#pprint.pprint(ingestable_dps)
#pprint.pprint(dps)
is_ingested = len(ingested_dps) == len(dps)
is_ingested = len(ingested_dps)>0 and len(ingested_dps) == len(ingestable_dps)
is_partially_ingested = len(ingested_dps) > 0 and len(ingested_dps) < len(ingestable_dps)
logger.debug("checking diskusage for finished %singested pipeline with otdb_id %d ...", ("fully " if is_ingested else "not-"), otdb_id)
#if not is_ingested:
#logger.info("finished pipeline with otdb_id %d was %singested. Not deleting anything for this pipeline and/or its predecessors.",
# otdb_id, "partially " if is_partially_ingested else "not ")
#continue
try:
logger.debug("finished pipeline with otdb_id %d was fully ingested. checking diskusage...", otdb_id)
du_result = sqrpc.getDiskUsageForOTDBId(otdb_id)
if du_result.get('needs_update'):
du_result = sqrpc.getDiskUsageForOTDBId(otdb_id, force_update=True)
if not du_result.get('found') or (du_result.get('disk_usage', 0) or 0) == 0:
logger.debug(du_result)
continue
mom_details = mom_details_dict.get(mom2id)
mom_details = momrpc.getObjectDetails(mom2id).get(mom2id)
except Exception as e:
logger.warning(e)
continue
......@@ -90,11 +101,12 @@ def autocleanup_all_finished_ingested_pipelines(do_submit_to_autocleanup: bool,
du_result.get('disk_usage_readable'), du_result.get('disk_usage'))
if is_ingested and do_submit_to_autocleanup:
tobus.send(EventMessage(subject="%s.TaskFinished" % INGEST_NOTIFICATION_PREFIX,
content={'type': 'MoM',
'otdb_id': otdb_id,
'message': 'resubmit of TaskFinished event for autocleanupservice'}))
msg = EventMessage(subject="%s.TaskFinished" % INGEST_NOTIFICATION_PREFIX,
content={'type': 'MoM',
'otdb_id': otdb_id,
'message': 'resubmit of TaskFinished event for autocleanupservice'})
logger.info("sending msg to %s: %s", tobus.exchange, msg)
tobus.send(msg)
def main():
from optparse import OptionParser
......@@ -118,4 +130,3 @@ def main():
if __name__ == '__main__':
main()
......@@ -12,6 +12,7 @@ from datetime import datetime
from optparse import OptionParser
from lofar.messaging import RPCService, ServiceMessageHandler
from lofar.messaging import EventMessage, ToBus, DEFAULT_BROKER, DEFAULT_BUSNAME
from lofar.messaging.rpc import RPCTimeoutException
from lofar.common.util import waitForInterrupt, humanreadablesize
from lofar.common.subprocess_utils import communicate_returning_strings
......@@ -241,14 +242,12 @@ class CleanupHandler(ServiceMessageHandler):
claims = radbrpc.getResourceClaims(task_ids=task['id'], resource_type='storage')
cep4_storage_claim_ids = [c['id'] for c in claims if c['resource_id'] == cep4_storage_resource['id']]
for claim_id in cep4_storage_claim_ids:
logger.info("ending the storage claim %s on resource %s %s for task radb_id=%s otdb_id=%s",
claim_id, cep4_storage_resource['id'], cep4_storage_resource['name'],
task['id'], task['otdb_id'])
radbrpc.deleteResourceClaim(claim_id)
logger.info("setting endtime for claim %s on resource %s %s to now", claim_id, cep4_storage_resource['id'], cep4_storage_resource['name'])
radbrpc.updateResourceClaim(claim_id, endtime=datetime.utcnow())
except Exception as e:
logger.error(str(e))
def _removePath(self, path, do_recurse=True):
def _removePath(self, path, do_recurse=False):
logger.info("Remove path: %s" % (path,))
# do various sanity checking to prevent accidental deletes
......@@ -289,7 +288,10 @@ class CleanupHandler(ServiceMessageHandler):
logger.warn(message)
return {'deleted': True, 'message': message, 'path': path}
du_result = self._sqrpc.getDiskUsageForPath(path) if do_recurse else {}
try:
du_result = self._sqrpc.getDiskUsageForPath(path) if do_recurse else {}
except RPCTimeoutException:
du_result = {}
if du_result.get('found'):
logger.info("Attempting to delete %s in %s", du_result.get('disk_usage_readable', '?B'), path)
......
......@@ -423,6 +423,10 @@ class CacheManager:
def onTaskDeleted(self, otdb_id, deleted, paths, message=''):
self._onDiskActivityForOTDBId(otdb_id)
with self._cacheLock:
if deleted and otdb_id != None and otdb_id in self._cache['otdb_id2path']:
del self._cache['otdb_id2path'][otdb_id]
def _onDiskActivityForOTDBId(self, otdb_id):
result = self.disk_usage.getDiskUsageForOTDBId(otdb_id)
self._updateCache(result)
......@@ -493,6 +497,15 @@ class CacheManager:
return task_du_result
# still no path(s) found for otdb_id, now try from cache and ignore possible scratch paths
if otdb_id != None:
with self._cacheLock:
path = self._cache['otdb_id2path'].get(otdb_id)
if path:
logger.info('Using path from cache for otdb_id %s %s (ignoring possible scratch/share paths)', otdb_id, path)
return self.getDiskUsageForPath(path, force_update=force_update)
return {'found': False, 'path': path_result['path']}
def getDiskUsageForTasks(self, radb_ids=None, mom_ids=None, otdb_ids=None, include_scratch_paths=True, force_update=False):
......
......@@ -40,7 +40,7 @@ def createService(exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, cache_manager
handler_kwargs={'cache_manager':cache_manager},
exchange=exchange,
broker=broker,
num_threads=6)
num_threads=12)
def main():
# make sure we run in UTC timezone
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment