from ctypes import Array, c_double import time from typing import Union from IPython import embed import numpy.typing as npt import uldaq import numpy as np from pyrelacs.util.logging import config_logging log = config_logging() class Repos: def __init__(self) -> None: devices = uldaq.get_daq_device_inventory(uldaq.InterfaceType.USB) log.debug(f"Found daq devices {len(devices)}, connecting to the first one") if len(devices) == 0: log.error("Did not found daq devices, please connect one") exit(1) self.daq_device = uldaq.DaqDevice(devices[0]) self.daq_device.connect() self.ai_device = self.daq_device.get_ai_device() self.ao_device = self.daq_device.get_ao_device() self.dio_device = self.daq_device.get_dio_device() log.debug("Connected") def connect_dac(self): devices = uldaq.get_daq_device_inventory(uldaq.InterfaceType.USB) log.debug(f"Found daq devices {len(devices)}, connecting to the first one") if len(devices) == 0: log.error("Did not found daq devices, please connect one") exit(1) self.daq_device = uldaq.DaqDevice(devices[0]) self.daq_device.connect() self.ai_device = self.daq_device.get_ai_device() self.ao_device = self.daq_device.get_ao_device() self.dio_device = self.daq_device.get_dio_device() log.debug("Connected") def read_analog_daq( self, channels: list[int], duration: int, samplerate: float, AiInputMode: uldaq.AiInputMode = uldaq.AiInputMode.SINGLE_ENDED, Range: uldaq.Range = uldaq.Range.BIP10VOLTS, ScanOption: uldaq.ScanOption = uldaq.ScanOption.DEFAULTIO, AInScanFlag: uldaq.AInScanFlag = uldaq.AInScanFlag.DEFAULT, ) -> Array[c_double]: assert len(channels) == 2, log.error("You can only provide two channels [0, 1]") if channels[0] != channels[1]: buffer_len_channels = 2 else: buffer_len_channels = 1 buffer_len = np.shape(np.arange(0, duration, 1 / samplerate))[0] data_analog_input = uldaq.create_float_buffer(buffer_len_channels, buffer_len) er = self.ai_device.a_in_scan( channels[0], channels[1], AiInputMode, Range, buffer_len, samplerate, ScanOption, AInScanFlag, data=data_analog_input, ) return data_analog_input def write_analog_dac( self, data: Union[list, npt.NDArray], channels: list[int], samplerate: float, Range: uldaq.Range = uldaq.Range.BIP10VOLTS, ScanOption: uldaq.ScanOption = uldaq.ScanOption.DEFAULTIO, AOutScanFlag: uldaq.AOutScanFlag = uldaq.AOutScanFlag.DEFAULT, ): """ Parameters ---------- data : Union[list, npt.NDArray] channels : list[int] duration : int samplerate : float AiInputMode : uldaq.AiInputMode Range : uldaq.Range ScanOption : uldaq.ScanOption AInScanFlag : uldaq.AOutScanFlag Returns ------- Array[c_double] ao_device """ assert len(channels) == 2, log.error("You can only provide two channels [0, 1]") if channels[0] != channels[1]: buffer_len_channels = 2 else: buffer_len_channels = 1 buffer = c_double * (len(data) * buffer_len_channels) assert len(data) != len(data) * buffer_len_channels, log.error( "Data must be duplicated for both channels data[len(data_channel) + len(data_channel1)]..." ) data_analog_output = buffer(*data) log.debug(f"Created C_double data {data_analog_output}") try: err = self.ao_device.a_out_scan( channels[0], channels[1], Range, int(len(data)), samplerate, ScanOption, AOutScanFlag, data_analog_output, ) except Exception as e: print(f"{e}") self.set_analog_to_zero() self.disconnect_dac() # ao_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, 11) return data_analog_output, self.ao_device def set_analog_to_zero(self, channels: list[int] = [0, 1]): try: err = self.ao_device.a_out_list( channels[0], channels[1], [ uldaq.Range.BIP10VOLTS, uldaq.Range.BIP10VOLTS, ], uldaq.AOutListFlag.DEFAULT, [0, 0], ) except Exception as e: log.error("f{e}") log.error("disconnection dac") self.disconnect_dac() def diggital_trigger(self) -> None: if not self.read_bit(channel=0): self.write_bit(channel=0, bit=1) else: self.write_bit(channel=0, bit=0) time.time_ns() self.write_bit(channel=0, bit=1) def write_bit(self, channel: int = 0, bit: int = 1) -> None: self.dio_device.d_config_bit( uldaq.DigitalPortType.AUXPORT, channel, uldaq.DigitalDirection.OUTPUT ) self.dio_device.d_bit_out( uldaq.DigitalPortType.AUXPORT, bit_number=channel, data=bit ) def read_bit(self, channel: int = 0): bit = self.dio_device.d_bit_in(uldaq.DigitalPortType.AUXPORT, channel) return bit def read_digitalio( self, channels: list[int], duration, samplerate, ScanOptions: uldaq.ScanOption = uldaq.ScanOption.DEFAULTIO, DInScanFlag: uldaq.DInScanFlag = uldaq.DInScanFlag.DEFAULT, ): if channels[0] == channels[1]: channel_len = 1 else: channel_len = len(channels) buffer_len = np.shape(np.arange(0, duration, 1 / samplerate))[0] data_digital_input = uldaq.create_int_buffer(channel_len, buffer_len) self.dio_device.d_config_port( uldaq.DigitalPortType.AUXPORT, uldaq.DigitalDirection.INPUT ) scan_rate = self.dio_device.d_in_scan( uldaq.DigitalPortType.AUXPORT0, uldaq.DigitalPortType.AUXPORT0, len(data_digital_input), samplerate, ScanOptions, DInScanFlag, data_digital_input, ) return data_digital_input def write_byte(self, channel: int, byte: list[int]): self.dio_device.d_config_bit( uldaq.DigitalPortType.AUXPORT, channel, uldaq.DigitalDirection.OUTPUT ) for bit in byte: self.dio_device.d_bit_out(uldaq.DigitalPortType.AUXPORT, channel, bit) time.sleep(1) def disconnect_dac(self): self.daq_device.disconnect() self.daq_device.release() def run_repo(self) -> None: pass def stop_repo(self) -> None: pass def reload_repo(self) -> None: pass