diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestjobmanagementserver.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestjobmanagementserver.py index 560eaef0259f0c5bfcf4b9204fdf0f0a7e632f0d..d84ee8bddc4041c54a20aa379d5b404a362edb05 100644 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestjobmanagementserver.py +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestjobmanagementserver.py @@ -99,7 +99,7 @@ class IngestJobManager: else: logger.info('%s jobs are running', len(producing_jads)) - time.sleep(1) + time.sleep(10) except KeyboardInterrupt: break except Exception as e: diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/ingestpipeline.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/ingestpipeline.py index 73998a8d46a76092c14612386450782cb568b122..446b8736daecf45168b48b6d709ffa0be39661fd 100755 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/ingestpipeline.py +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/ingestpipeline.py @@ -76,7 +76,7 @@ class IngestPipeline(): self.ExportID = job['ExportID'] self.Type = job["Type"] self.HostLocation = job['Location'].partition(':')[0] - self.Location = job['Location'].partition(':')[1] + self.Location = job['Location'].partition(':')[2] self.ticket = '' self.FileSize = '0' self.MD5Checksum = '' diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/ingesttransferserver.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/ingesttransferserver.py index f9694ae324d3462d8a662463551d66a7d5323fab..6d0acb8915fc9f92eb8cc90faf8ab4b1eb075dde 100644 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/ingesttransferserver.py +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/ingesttransferserver.py @@ -257,37 +257,35 @@ class IngestTransferServer: try: self.__clearFinishedJobs() - try: - if datetime.utcnow() - self.__running_jobs_log_timestamp > timedelta(seconds = 2): - with self.__lock: - starting_threads = [job_thread_dict['thread'] for job_thread_dict in list(self.__running_jobs.values()) if 'pipeline' not in job_thread_dict] - pipelines = [job_thread_dict['pipeline'] for job_thread_dict in list(self.__running_jobs.values()) if 'pipeline' in job_thread_dict] - initializing_pipelines = [pipeline for pipeline in pipelines if pipeline.status == IngestPipeline.STATUS_INITIALIZING] - transferring_pipelines = [pipeline for pipeline in pipelines if pipeline.status == IngestPipeline.STATUS_TRANSFERRING] - finalizing_pipelines = [pipeline for pipeline in pipelines if pipeline.status == IngestPipeline.STATUS_FINALIZING] - finished_pipelines = [pipeline for pipeline in pipelines if pipeline.status == IngestPipeline.STATUS_FINISHED] - - status_log_line = "status: running %s jobs: #starting=%d, #transferring=%d, #finalizing=%d, #finished=%d, bandwith used on network interface(s) %s %s (%s), load=%.1f" % (len(self.__running_jobs), - len(initializing_pipelines) + len(starting_threads), - len(transferring_pipelines), - len(finalizing_pipelines), - len(finished_pipelines), - NET_IF_TO_MONITOR, - humanreadablesize(self.__prev_used_bandwidth, 'bps'), - humanreadablesize(self.__prev_used_bandwidth / 8, 'Bps'), - os.getloadavg()[0]) - - logger.info(status_log_line) - self.__running_jobs_log_timestamp = datetime.utcnow() - - msg = EventMessage(subject = "%s.%s" % (INGEST_NOTIFICATION_PREFIX, 'TransferServiceStatus'), - content = { 'ingest_server': socket.gethostname(), - 'message' : status_log_line }) - msg.ttl = 3600 # remove message from queue's when not picked up within 1 hours - self.event_bus.send(msg) - except Exception as e: - logger.error(e) - + if datetime.utcnow() - self.__running_jobs_log_timestamp > timedelta(seconds = 2): + with self.__lock: + starting_threads = [job_thread_dict['thread'] for job_thread_dict in list(self.__running_jobs.values()) if 'pipeline' not in job_thread_dict] + pipelines = [job_thread_dict['pipeline'] for job_thread_dict in list(self.__running_jobs.values()) if 'pipeline' in job_thread_dict] + initializing_pipelines = [pipeline for pipeline in pipelines if pipeline.status == IngestPipeline.STATUS_INITIALIZING] + transferring_pipelines = [pipeline for pipeline in pipelines if pipeline.status == IngestPipeline.STATUS_TRANSFERRING] + finalizing_pipelines = [pipeline for pipeline in pipelines if pipeline.status == IngestPipeline.STATUS_FINALIZING] + finished_pipelines = [pipeline for pipeline in pipelines if pipeline.status == IngestPipeline.STATUS_FINISHED] + + status_log_line = "status: running %s jobs: #starting=%d, #transferring=%d, #finalizing=%d, #finished=%d, bandwith used on network interface(s) %s %s (%s), load=%.1f" % (len(self.__running_jobs), + len(initializing_pipelines) + len(starting_threads), + len(transferring_pipelines), + len(finalizing_pipelines), + len(finished_pipelines), + NET_IF_TO_MONITOR, + humanreadablesize(self.__prev_used_bandwidth, 'bps'), + humanreadablesize(self.__prev_used_bandwidth / 8, 'Bps'), + os.getloadavg()[0]) + + logger.info(status_log_line) + self.__running_jobs_log_timestamp = datetime.utcnow() + + msg = EventMessage(subject = "%s.%s" % (INGEST_NOTIFICATION_PREFIX, 'TransferServiceStatus'), + content = { 'ingest_server': socket.gethostname(), + 'message' : status_log_line }) + msg.ttl = 3600 # remove message from queue's when not picked up within 1 hours + self.event_bus.send(msg) + + time.sleep(5) except KeyboardInterrupt: break except Exception as e: