diff --git a/pyrelacs/dataio/daq_producer.py b/pyrelacs/dataio/daq_producer.py index b24f7f5..67cff41 100644 --- a/pyrelacs/dataio/daq_producer.py +++ b/pyrelacs/dataio/daq_producer.py @@ -14,42 +14,59 @@ faulthandler.enable() class DaqProducer: - def __init__(self, buffer: CircBuffer, device: uldaq.DaqDevice): + def __init__( + self, buffer: CircBuffer, device: uldaq.DaqDevice, channels: list[int] + ): self.buffer = buffer self.device = device self.ai_device = self.device.get_ai_device() + self.channels = channels def read_analog_continously( self, - samplerate: float = 20, *args, **kwargs, ): log.debug("starting acquisition") - channel = self.buffer.channel_count - size = self.buffer.size - data_in = uldaq.create_float_buffer(channel, size) + if self.channels[0] == self.channels[1]: + channel_range = np.arange(1) + else: + channel_range = np.arange(self.channels[0], self.channels[1] + 1) + + assert channel_range.size == self.buffer.channel_count, ValueError( + f"Missmatch in channel count,\n daq_channel: " + f"{channel_range.size}\n buffer_channel: {self.buffer.channel_count}" + ) + + # let the buffer for the daq device hold 5 seconds of data + daq_buffer_size = self.buffer.samplerate * 5 + + data_in = uldaq.create_float_buffer(channel_range.size, daq_buffer_size) + log.debug(f"Buffersize for daq {len(data_in)}") + log.debug(f"Buffersize {self.buffer.size}") - # BUG: Check for multiple channels er = self.ai_device.a_in_scan( - 1, - 1, + self.channels[0], + self.channels[1], uldaq.AiInputMode.SINGLE_ENDED, uldaq.Range.BIP10VOLTS, - self.buffer.size, - samplerate, + daq_buffer_size, + self.buffer.samplerate, uldaq.ScanOption.CONTINUOUS, uldaq.AInScanFlag.DEFAULT, data=data_in, ) + chunk_size = int(daq_buffer_size / 10) + wrote_chunk = False + start_time = time.time() daq_status = uldaq.ScanStatus.IDLE while daq_status == uldaq.ScanStatus.IDLE: daq_status = self.ai_device.get_scan_status()[0] - prev_count = 0 - prev_index = 0 while daq_status != uldaq.ScanStatus.IDLE: + prev_count = 0 + prev_index = 0 while time.time() - start_time < 10: daq_status, transfer_status = self.ai_device.get_scan_status() # The index into the data buffer immediately following the last sample transferred. @@ -59,19 +76,83 @@ class DaqProducer: # The number of samples per channel transferred since the scan started channel_samples = transfer_status.current_scan_count - if current_index > prev_index: - self.buffer.append(data_in[current_index]) - prev_index = current_index + new_data_count = total_samples - prev_count + # check if counts if new data is bigger than the buffer + # if that happends stop the acquisition + if new_data_count > len(data_in): + self.ai_device.scan_stop() + log.error("A Buffer overrun occurred") + break + + if new_data_count > chunk_size: + wrote_chunk = True + # index wraps around the buffer + if prev_index + chunk_size > len(data_in) - 1: + log.debug("Chunk wraps around buffersize") + first_chunk = len(data_in) - prev_index + [ + self.buffer.append(data_in[prev_index + i]) + for i in range(first_chunk) + ] + second_chunk = chunk_size - first_chunk + [ + self.buffer.append(data_in[i]) + for i in range(second_chunk) + ] + else: + log.debug("Writing chunk to buffer") + [ + self.buffer.append(data_in[prev_index + i]) + for i in range(chunk_size) + ] + + self.buffer.append(data_in[current_index]) + + if total_samples - prev_count > len(data_in): + self.ai_device.scan_stop() + log.error("A Buffer overrun occurred") + break + + else: + wrote_chunk = False + if wrote_chunk: + prev_count += chunk_size + prev_index += chunk_size + prev_index %= daq_buffer_size self.ai_device.scan_stop() daq_status, transfer_status = self.ai_device.get_scan_status() + current_index = transfer_status.current_index log.debug(daq_status) + log.debug(transfer_status.current_index) log.debug(transfer_status.current_total_count) log.debug(transfer_status.current_scan_count) + log.debug(self.buffer.totocount()) + log.debug("Appending last chunk") + + if prev_index + chunk_size > len(data_in) - 1: + log.debug("Chunk wraps around buffersize") + first_chunk = len(data_in) - prev_index + [ + self.buffer.append(data_in[prev_index + i]) + for i in range(first_chunk) + ] + second_chunk = chunk_size - first_chunk + [self.buffer.append(data_in[i]) for i in range(second_chunk)] + else: + log.debug("Writing chunk to buffer") + [ + self.buffer.append(data_in[prev_index + i]) + for i in range(chunk_size) + ] + + self.buffer.append(data_in[current_index]) log.info("stopping") + break break + return "Done. " if __name__ == "__main__": @@ -84,6 +165,6 @@ if __name__ == "__main__": raise e daq_device.connect() - buf = CircBuffer(size=300) - producer = DaqProducer(buf, daq_device) + buf = CircBuffer(size=1_000_000, samplerate=100) + producer = DaqProducer(buf, daq_device, [1, 1]) producer.read_analog_continously()