[dataio] adding chunking for reading daq input

This commit is contained in:
wendtalexander 2024-10-04 11:49:33 +02:00
parent 8910305262
commit 85fd70f8ca

View File

@ -14,42 +14,59 @@ faulthandler.enable()
class DaqProducer: class DaqProducer:
def __init__(self, buffer: CircBuffer, device: uldaq.DaqDevice): def __init__(
self, buffer: CircBuffer, device: uldaq.DaqDevice, channels: list[int]
):
self.buffer = buffer self.buffer = buffer
self.device = device self.device = device
self.ai_device = self.device.get_ai_device() self.ai_device = self.device.get_ai_device()
self.channels = channels
def read_analog_continously( def read_analog_continously(
self, self,
samplerate: float = 20,
*args, *args,
**kwargs, **kwargs,
): ):
log.debug("starting acquisition") log.debug("starting acquisition")
channel = self.buffer.channel_count if self.channels[0] == self.channels[1]:
size = self.buffer.size channel_range = np.arange(1)
data_in = uldaq.create_float_buffer(channel, size) else:
channel_range = np.arange(self.channels[0], self.channels[1] + 1)
assert channel_range.size == self.buffer.channel_count, ValueError(
f"Missmatch in channel count,\n daq_channel: "
f"{channel_range.size}\n buffer_channel: {self.buffer.channel_count}"
)
# let the buffer for the daq device hold 5 seconds of data
daq_buffer_size = self.buffer.samplerate * 5
data_in = uldaq.create_float_buffer(channel_range.size, daq_buffer_size)
log.debug(f"Buffersize for daq {len(data_in)}")
log.debug(f"Buffersize {self.buffer.size}")
# BUG: Check for multiple channels
er = self.ai_device.a_in_scan( er = self.ai_device.a_in_scan(
1, self.channels[0],
1, self.channels[1],
uldaq.AiInputMode.SINGLE_ENDED, uldaq.AiInputMode.SINGLE_ENDED,
uldaq.Range.BIP10VOLTS, uldaq.Range.BIP10VOLTS,
self.buffer.size, daq_buffer_size,
samplerate, self.buffer.samplerate,
uldaq.ScanOption.CONTINUOUS, uldaq.ScanOption.CONTINUOUS,
uldaq.AInScanFlag.DEFAULT, uldaq.AInScanFlag.DEFAULT,
data=data_in, data=data_in,
) )
chunk_size = int(daq_buffer_size / 10)
wrote_chunk = False
start_time = time.time() start_time = time.time()
daq_status = uldaq.ScanStatus.IDLE daq_status = uldaq.ScanStatus.IDLE
while daq_status == uldaq.ScanStatus.IDLE: while daq_status == uldaq.ScanStatus.IDLE:
daq_status = self.ai_device.get_scan_status()[0] daq_status = self.ai_device.get_scan_status()[0]
prev_count = 0
prev_index = 0
while daq_status != uldaq.ScanStatus.IDLE: while daq_status != uldaq.ScanStatus.IDLE:
prev_count = 0
prev_index = 0
while time.time() - start_time < 10: while time.time() - start_time < 10:
daq_status, transfer_status = self.ai_device.get_scan_status() daq_status, transfer_status = self.ai_device.get_scan_status()
# The index into the data buffer immediately following the last sample transferred. # The index into the data buffer immediately following the last sample transferred.
@ -59,19 +76,83 @@ class DaqProducer:
# The number of samples per channel transferred since the scan started # The number of samples per channel transferred since the scan started
channel_samples = transfer_status.current_scan_count channel_samples = transfer_status.current_scan_count
if current_index > prev_index: new_data_count = total_samples - prev_count
self.buffer.append(data_in[current_index]) # check if counts if new data is bigger than the buffer
prev_index = current_index # if that happends stop the acquisition
if new_data_count > len(data_in):
self.ai_device.scan_stop()
log.error("A Buffer overrun occurred")
break
if new_data_count > chunk_size:
wrote_chunk = True
# index wraps around the buffer
if prev_index + chunk_size > len(data_in) - 1:
log.debug("Chunk wraps around buffersize")
first_chunk = len(data_in) - prev_index
[
self.buffer.append(data_in[prev_index + i])
for i in range(first_chunk)
]
second_chunk = chunk_size - first_chunk
[
self.buffer.append(data_in[i])
for i in range(second_chunk)
]
else:
log.debug("Writing chunk to buffer")
[
self.buffer.append(data_in[prev_index + i])
for i in range(chunk_size)
]
self.buffer.append(data_in[current_index])
if total_samples - prev_count > len(data_in):
self.ai_device.scan_stop()
log.error("A Buffer overrun occurred")
break
else:
wrote_chunk = False
if wrote_chunk:
prev_count += chunk_size
prev_index += chunk_size
prev_index %= daq_buffer_size
self.ai_device.scan_stop() self.ai_device.scan_stop()
daq_status, transfer_status = self.ai_device.get_scan_status() daq_status, transfer_status = self.ai_device.get_scan_status()
current_index = transfer_status.current_index
log.debug(daq_status) log.debug(daq_status)
log.debug(transfer_status.current_index) log.debug(transfer_status.current_index)
log.debug(transfer_status.current_total_count) log.debug(transfer_status.current_total_count)
log.debug(transfer_status.current_scan_count) log.debug(transfer_status.current_scan_count)
log.debug(self.buffer.totocount())
log.debug("Appending last chunk")
if prev_index + chunk_size > len(data_in) - 1:
log.debug("Chunk wraps around buffersize")
first_chunk = len(data_in) - prev_index
[
self.buffer.append(data_in[prev_index + i])
for i in range(first_chunk)
]
second_chunk = chunk_size - first_chunk
[self.buffer.append(data_in[i]) for i in range(second_chunk)]
else:
log.debug("Writing chunk to buffer")
[
self.buffer.append(data_in[prev_index + i])
for i in range(chunk_size)
]
self.buffer.append(data_in[current_index])
log.info("stopping") log.info("stopping")
break break
break break
return "Done. "
if __name__ == "__main__": if __name__ == "__main__":
@ -84,6 +165,6 @@ if __name__ == "__main__":
raise e raise e
daq_device.connect() daq_device.connect()
buf = CircBuffer(size=300) buf = CircBuffer(size=1_000_000, samplerate=100)
producer = DaqProducer(buf, daq_device) producer = DaqProducer(buf, daq_device, [1, 1])
producer.read_analog_continously() producer.read_analog_continously()