import time import faulthandler from collections import deque from pyqtgraph import transformToArray import uldaq import numpy as np from IPython import embed import matplotlib.pyplot as plt from pyrelacs.util.logging import config_logging log = config_logging() faulthandler.enable() class DataBuffer: def __init__(self, channels, samples): self.channels = channels self.samples = samples def read_analog_continously( self, device: uldaq.DaqDevice, samplerate: float = 40_000.0, ): data_array = [] max_len_buffer = self.channels * self.samples self.buffer = deque(maxlen=max_len_buffer) samples_per_channel = 40_000 self.device = device self.ai_device = self.device.get_ai_device() data_analog_input = uldaq.create_float_buffer( self.channels, samples_per_channel ) er = self.ai_device.a_in_scan( 0, 1, uldaq.AiInputMode.SINGLE_ENDED, uldaq.Range.BIP10VOLTS, samples_per_channel, samplerate, uldaq.ScanOption.CONTINUOUS, uldaq.AInScanFlag.DEFAULT, data=data_analog_input, ) daq_status = uldaq.ScanStatus.IDLE while daq_status == uldaq.ScanStatus.IDLE: daq_status = self.ai_device.get_scan_status()[0] prev_count = 0 prev_index = 0 while daq_status != uldaq.ScanStatus.IDLE: daq_status, transfer_status = self.ai_device.get_scan_status() # The index into the data buffer immediately following the last sample transferred. curren_index = transfer_status.current_index # total samples since start of the scan total_samples = transfer_status.current_total_count # The number of samples per channel transferred since the scan started channel_samples = transfer_status.current_scan_count self.ai_device.scan_stop() if __name__ == "__main__": devices = uldaq.get_daq_device_inventory(uldaq.InterfaceType.USB) log.debug(f"Found daq devices {len(devices)}, connecting to the first one") try: daq_device = uldaq.DaqDevice(devices[0]) except uldaq.ul_exception.ULException as e: log.error("Did not found daq devices, please connect one") raise e daq_device.connect() buf = DataBuffer(channels=2, samples=100_000) buf.read_analog_continously(daq_device)