Skip to content
Snippets Groups Projects
Commit 0c0d0910 authored by Jörn Künsemöller's avatar Jörn Künsemöller
Browse files

Task SW-567: Fix some bugs after testing today.

parent 6bedc179
No related branches found
No related tags found
No related merge requests found
......@@ -25,9 +25,13 @@ if __name__ == '__main__':
parser.add_option("-n", "--tbb_notification_busname", dest="tbb_notification_busname", type="string",
default=DEFAULT_TBB_NOTIFICATION_BUSNAME,
help='Name of the notification bus exchange on the qpid broker on which the tbb notifications are published, default: %default')
parser.add_option("-d", "--duration", dest='num_samples_per_subband', required=True,
help="Expected duration in number of samples per subband.")
(options, args) = parser.parse_args()
start_datawriters_and_wait_until_finished(output_path=options.output_path,
num_samples_per_subband=options.num_samples_per_subband,
service_busname=options.tbb_service_busname,
service_name=options.tbb_service_name,
notification_busname=options.tbb_notification_busname,
......
......@@ -26,6 +26,7 @@ class Waiter(TBBBusListener):
self.wait_event.wait(timeout)
def start_datawriters_and_wait_until_finished(output_path,
num_samples_per_subband,
service_busname=DEFAULT_TBB_BUSNAME,
service_name=DEFAULT_TBB_SERVICENAME,
notification_busname=DEFAULT_TBB_NOTIFICATION_BUSNAME,
......@@ -35,7 +36,7 @@ def start_datawriters_and_wait_until_finished(output_path,
'''
with Waiter() as waiter:
with TBBRPC(busname=service_busname, servicename=service_name, broker=broker) as rpc:
rpc.start_datawriters(output_path=output_path)
rpc.start_datawriters(output_path=output_path, num_samples_per_subband=num_samples_per_subband)
waiter.wait()
def stop_datawriters_and_wait_until_finished(service_busname=DEFAULT_TBB_BUSNAME,
......
......@@ -14,9 +14,9 @@ class TBBRPC(RPCWrapper):
servicename=servicename,
broker=broker)
def start_datawriters(self, output_path, voevent_xml=""):
def start_datawriters(self, output_path, num_samples_per_subband, voevent_xml=""):
logger.info("Requesting start of tbb data writers...")
result = self.rpc('start_datawriters', output_path, voevent_xml)
result = self.rpc('start_datawriters', output_path, num_samples_per_subband, voevent_xml)
logger.info("Received start datawriter request result %s" % result)
return result
......
......@@ -235,10 +235,11 @@ class TBBControlService:
return parset_path, parset
def start_datawriters(self, output_path, voevent=""):
def start_datawriters(self, output_path, num_samples_per_subband, voevent=""):
"""
start the tbb datawriters and notify when done.
:param output_path: string defining the full directory path where to save the data, like: "/data/projects/<my_project>/L<obs_id>/
:param num_samples_per_subband: integer value for number of samples to expect (depending on duration, 1 sample = 5.12 microseconds)
:param voevent: voevent xml string (for metadata)
:return: nodes where a datawriter has been started as list of strings
"""
......@@ -281,7 +282,7 @@ class TBBControlService:
'-p', ports,
'-k', '0',
'-t', '60',
'-d', '3840', #TODO: compute instead of hardcoded
'-d', str(num_samples_per_subband),
'-o', output_path]
# wrap the command in a cep4 docker ssh call
......@@ -513,7 +514,10 @@ class TBBControlService:
sanitized_triggerid = triggerid.replace('/', '_').replace(':', '_').replace('#', '_')
output_path = '/data/projects/%s/tbb_spectral/%s' % (project, sanitized_triggerid)
datawriter_nodes = self.start_datawriters(output_path=output_path, voevent=voevent_xml)
# determine number of samples
num_samples = int(float(duration) // 0.00000512)
datawriter_nodes = self.start_datawriters(output_path=output_path, num_samples_per_subband=num_samples, voevent=voevent_xml)
if nodes is None:
logger.info('Nodes to use are not configured, using all with datawriter')
nodes = ['cpu%02d' % (node,) for node in datawriter_nodes]
......
......@@ -44,7 +44,7 @@ def freeze_tbb(stations, dm, timesec, timensec):
station_str = ','.join(stationlists[num_boards])
relay = lcurun_command + [station_str]
slicenr = timensec / 5
slicenr = int(timensec // (5 * 1024)) # -> 5.12 microseconds per slice
# string magic to create single cmdline ';' seperated tbbctl commands to set the dispersion measure and the stoptime
set_dm_cmd = " ; ".join(['%s --dispmeas=%s,%s' % (tbb_command, board, dm) for board in range(num_boards)])
......
......@@ -20,7 +20,7 @@ def upload_tbb_data(stations, dm, start_time, duration, sub_bands, wait_time, bo
:param stations: Only TBBs of these stations will upload their data to CEP.
:param dm: The dispersion measure that was set during data recording.
:param start_time: Designates the start of the recorded data in the TBB memory which will be uploaded to CEP. Earlier data in TBB memory will not be uploaded.
:param duration: The time span for which the data will be uploaded.
:param duration: The time span for which the data will be uploaded in seconds.
:param sub_bands: The list of sub-bands that will be uploaded.
:param wait_time: The time that has to be waited before another sub-band upload can be commanded.
:param boards: Only these boards will be commanded to upload the spectral data to CEP.
......@@ -55,7 +55,7 @@ def upload_tbb_data(stations, dm, start_time, duration, sub_bands, wait_time, bo
cmd_list = []
for channel in range(15):
for board in board_list:
tbb_cmd = "tbbctl --readband=%s,%s,%s,%s,%s,%s" % (board, channel, sub_band, adjusted_start_time, slice_nr, duration)
tbb_cmd = "tbbctl --readband=%s,%s,%s,%s,%s,%s" % (board, channel, sub_band, adjusted_start_time, slice_nr, duration*1000) # milliseconds
cmd_list.append(tbb_cmd)
cmd_list.append("sleep %s" % (wait_time,))
......
......@@ -37,9 +37,11 @@ ALERT_PACKET_TYPE_FILTER = None # list of int or None for all
DEFAULT_TBB_CEP_NODES = None # list of nodes to dump to, e.g. ["cpu%s" % str(num).zfill(2) for num in expand_list("01-50")], or None for all available
DEFAULT_TBB_SUBBANDS = expand_list("10-496") # The subbands to dump. Note: When starting the recording (tbbservice_start_recording), the subband range HAS to cover 487 subbands (typically 10-496)
DEFAULT_TBB_STATIONS = ['rs409'] # ['cs001','cs002','cs003','cs004','cs005','cs006','cs007','cs011','cs013','cs017','cs021','cs024','cs026','cs028','cs030','cs031','cs032','cs101','cs103','cs201','cs301','cs302','cs401','cs501','rs106','rs205','rs208','rs210','rs305','rs306','rs307','rs310','rs406','rs407','rs409','rs503','rs508','rs509'] # List of stations to include in tbb dump (filtered for those who are actually observing at event ToA)
DEFAULT_TBB_STATIONS = ['cs004'] # ['rs409'] # ['cs001','cs002','cs003','cs004','cs005','cs006','cs007','cs011','cs013','cs017','cs021','cs024','cs026','cs028','cs030','cs031','cs032','cs101','cs103','cs201','cs301','cs302','cs401','cs501','rs106','rs205','rs208','rs210','rs305','rs306','rs307','rs310','rs406','rs407','rs409','rs503','rs508','rs509'] # List of stations to include in tbb dump (filtered for those who are actually observing at event ToA)
DEFAULT_TBB_PROJECT = "COM_ALERT"
DEFAULT_TBB_ALERT_MODE = "subband"
DEFAULT_TBB_BOARDS = expand_list("0-5")
DEFAULT_TBB_DUMP_DURATION = 0.1 # todo! -> should be 5.0 in production
\ No newline at end of file
DEFAULT_TBB_DUMP_DURATION = 0.1 # todo! -> should be 5.0 in production
STOPTIME_DELAY = 0.1 # stop this much after the actual stoptime to make sure the data is not stopped too early. This should be zero, especially when we want to dump the full 5 seconds, but for shorter dump durations, we can give some wiggle room.
\ No newline at end of file
......@@ -255,9 +255,9 @@ class ALERTHandler(VOEventListenerInterface):
# convert float starttime to second and nanosecond component
# todo: Do we need higher precision up to here? Investigate!
# ...We agreed to try this out first, but it could be problematic fr use cases with extremely short recordings.
starttime_sec, starttime_nsec = ("%.9f" % starttime).split(".")
starttime_sec = int(starttime_sec)
starttime_nsec = int(starttime_nsec)
stoptime_sec, stoptime_nsec = ("%.9f" % stoptime).split(".")
stoptime_sec = int(stoptime_sec)
stoptime_nsec = int(stoptime_nsec)
if self._tbb_trigger_is_acceptable(stoptime):
logger.info('ALERT event %s passed system pre-flight checks' % triggerid)
......@@ -274,7 +274,7 @@ class ALERTHandler(VOEventListenerInterface):
# do a fast direct freeze call here, so the boards still contain data for this event.
# if we freeze via rpc/service calls, that takes time, so we might loose precious data from the buffers.
freeze_tbb(lcu_str, dm, starttime_sec , starttime_nsec)
freeze_tbb(lcu_str, dm, stoptime_sec + STOPTIME_DELAY, stoptime_nsec)
# initiate the dumping via an rpc call to the tbbservice which takes care of all bookkeeping.
with TBBRPC() as rpc:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment