from ctypes import Array, c_double from typing import Union import numpy.typing as npt import uldaq import numpy as np from pyrelacs.util.logging import config_logging log = config_logging() class Repos: def __init__(self) -> None: devices = uldaq.get_daq_device_inventory(uldaq.InterfaceType.USB) log.debug(f"Found daq devices {len(devices)}, connecting to the first one") if len(devices) == 0: log.error("Did not found daq devices, please connect one") exit(1) self.daq_device = uldaq.DaqDevice(devices[0]) self.daq_device.connect() log.debug("Connected") def read_analog_daq( self, channels: list[int], duration: int, samplerate: float, AiInputMode: uldaq.AiInputMode = uldaq.AiInputMode.SINGLE_ENDED, Range: uldaq.Range = uldaq.Range.BIP10VOLTS, ScanOption: uldaq.ScanOption = uldaq.ScanOption.DEFAULTIO, AInScanFlag: uldaq.AInScanFlag = uldaq.AInScanFlag.DEFAULT, ) -> Array[c_double]: if len(channels) > 2: log.error("Please provide only two channels") # Get the Ananlog In device and Analog Info ai_device = self.daq_device.get_ai_device() ai_info = ai_device.get_info() buffer_len = np.shape(np.arange(0, duration, 1 / samplerate))[0] data_analog_input = uldaq.create_float_buffer(1, buffer_len) er = ai_device.a_in_scan( channels[0], channels[1], AiInputMode, Range, buffer_len, samplerate, ScanOption, AInScanFlag, data=data_analog_input, ) ai_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, timeout=-1) log.debug("Scanning") return data_analog_input def send_analog_dac( self, data: Union[list, npt.NDArray], channels: list[int], duration: int, samplerate: float, AiInputMode: uldaq.AiInputMode = uldaq.AiInputMode.SINGLE_ENDED, Range: uldaq.Range = uldaq.Range.BIP10VOLTS, ScanOption: uldaq.ScanOption = uldaq.ScanOption.DEFAULTIO, AInScanFlag: uldaq.AOutScanFlag = uldaq.AOutScanFlag.DEFAULT, ) -> Array[c_double]: buffer = c_double * len(data) data_analog_output = buffer(*data) log.debug(f"Created C_double data {data_analog_output}") ao_device = self.daq_device.get_ao_device() ao_info = ao_device.get_info() err = ao_device.a_out_scan( channels[0], channels[1], Range, int(len(data)), samplerate, ScanOption, AInScanFlag, data_analog_output, ) ao_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, 11) return data_analog_output def disconnect_dac(self): self.daq_device.disconnect() self.daq_device.release() def write_daq(self) -> None: pass def run_repo(self) -> None: pass def stop_repo(self) -> None: pass def reload_repo(self) -> None: pass