from ctypes import Array, c_double import time from typing import Union from IPython import embed import numpy.typing as npt import uldaq import numpy as np from pyrelacs.util.logging import config_logging log = config_logging() class MccDac: """ Represents the Digital/Analog Converter from Meassuring Computing. provides methods for writing and reading the Analog / Digital input and output. Connects to the DAC device. Attributes ---------- daq_device : uldaq.DaqDevice DaqDevice for handling connecting, releasing and disconnecting ai_device : uldaq.AiDevice The Analog input Device ao_device : Analog output Device dio_device : Digital Input Output """ def __init__(self) -> None: devices = uldaq.get_daq_device_inventory(uldaq.InterfaceType.USB) log.debug(f"Found daq devices {len(devices)}, connecting to the first one") try: self.daq_device = uldaq.DaqDevice(devices[0]) except uldaq.ul_exception.ULException as e: log.error("Did not found daq devices, please connect one") raise e try: self.daq_device.connect() except uldaq.ul_exception.ULException: self.disconnect_dac() self.connect_dac() self.ai_device = self.daq_device.get_ai_device() self.ao_device = self.daq_device.get_ao_device() self.dio_device = self.daq_device.get_dio_device() log.debug("Connected") def connect_dac(self): """ Connecting to the DAQ device """ devices = uldaq.get_daq_device_inventory(uldaq.InterfaceType.USB) log.debug(f"Found daq devices {len(devices)}, connecting to the first one") if len(devices) == 0: log.error("Did not found daq devices, please connect one") exit(1) self.daq_device = uldaq.DaqDevice(devices[0]) self.daq_device.connect() self.ai_device = self.daq_device.get_ai_device() self.ao_device = self.daq_device.get_ao_device() self.dio_device = self.daq_device.get_dio_device() log.debug("Connected") def read_analog( self, channels: list[int], duration: int, samplerate: float, AiInputMode: uldaq.AiInputMode = uldaq.AiInputMode.SINGLE_ENDED, Range: uldaq.Range = uldaq.Range.BIP10VOLTS, ScanOption: uldaq.ScanOption = uldaq.ScanOption.DEFAULTIO, AInScanFlag: uldaq.AInScanFlag = uldaq.AInScanFlag.DEFAULT, ) -> Array[c_double]: """ Reading the analog input of the DAC device Creates a c_double Array for storing the acquired data Parameters ---------- channels : list[int] channels to read from, provide only two int's in a list (ex [0, 1] or [0, 4]) for sampling from the range(channel0, channel4) duration : int duration of sampling period samplerate : float samplerate for the duration of sampling AiInputMode : uldaq.AiInputMode = uldaq.AiInputMode.SINGLE_ENDED Contains attributes indicating A/D channel input modes. Compares to Ground Range : uldaq.Range = uldaq.Range.BIP10VOLTS Range of the output ScanOption : uldaq.ScanOption = uldaq.ScanOption.DEFAULTIO Specific Flags for acuiring the input AInScanFlag : uldaq.AInScanFlag = uldaq.AInScanFlag.DEFAULT Scaling of the data Returns ------- Array[c_double] """ assert len(channels) == 2, log.error("You can only provide two channels [0, 1]") if channels[0] != channels[1]: buffer_len_channels = 2 else: buffer_len_channels = 1 buffer_len = np.shape(np.arange(0, duration, 1 / samplerate))[0] data_analog_input = uldaq.create_float_buffer(buffer_len_channels, buffer_len) er = self.ai_device.a_in_scan( channels[0], channels[1], AiInputMode, Range, buffer_len, samplerate, ScanOption, AInScanFlag, data=data_analog_input, ) return data_analog_input def write_analog( self, data: Union[list, npt.NDArray], channels: list[int], samplerate: float, Range: uldaq.Range = uldaq.Range.BIP10VOLTS, ScanOption: uldaq.ScanOption = uldaq.ScanOption.DEFAULTIO, AOutScanFlag: uldaq.AOutScanFlag = uldaq.AOutScanFlag.DEFAULT, ) -> Array[c_double]: """ Writes data to the DAC device. Creates a c_double Array for writing the data Parameters ---------- data : Union[list, npt.NDArray] data which should be written to the DAC channels : list[int] channels to read from, provide only two int's in a list (ex [0, 1]) for sampling from the range(channel0, channel1) DAC 16 has only 2 output channels samplerate : float samplerate for the duration of sampling Range : uldaq.Range = uldaq.Range.BIP10VOLTS Range of the output ScanOption : uldaq.ScanOption = uldaq.ScanOption.DEFAULTIO Specific Flags for acuiring the input AOutScanFlag : uldaq.AOutScanFlag = uldaq.AOutScanFlag.DEFAULT For Scaling the data Returns ------- Array[c_double] """ assert len(channels) == 2, log.error("You can only provide two channels [0, 1]") buffer = c_double * len(data) data_analog_output = buffer(*data) log.debug(f"Created C_double data {data_analog_output}") try: err = self.ao_device.a_out_scan( channels[0], channels[1], Range, int(len(data)), samplerate, ScanOption, AOutScanFlag, data_analog_output, ) except Exception as e: print(f"{e}") self.set_analog_to_zero() self.disconnect_dac() return data_analog_output def set_analog_to_zero(self, channels: list[int] = [0, 1]) -> None: """ Sets all analog outputs to zero Parameters ---------- channels : list[int] channels to read from, provide only two int's in a list (ex [0, 1]) for sampling from the range(channel0, channel1) DAC 16 has only 2 output channels """ try: err = self.ao_device.a_out_list( channels[0], channels[1], [ uldaq.Range.BIP10VOLTS, uldaq.Range.BIP10VOLTS, ], uldaq.AOutListFlag.DEFAULT, [0, 0], ) except Exception as e: log.error("f{e}") log.error("disconnection dac") self.disconnect_dac() def digital_trigger(self, ch: int = 0) -> None: """ Writes a 1 to a specified digital channel, if the channel is already on 1 switches it to 0 and after Nanosekond it writes a 1 to the specified digital channel Parameters ---------- ch : int Channel to trigger """ data = self.read_bit(channel=ch) if data: self.write_bit(channel=ch, bit=0) time.time_ns() self.write_bit(channel=ch, bit=1) else: self.write_bit(channel=ch, bit=1) def write_bit(self, channel: int = 0, bit: int = 1) -> None: """ Writes a 0 / 1 to a specified digitial channel Parameters ---------- channel : int Digital channel to write bit : int 0 / 1 for writing to the digital channel """ self.dio_device.d_config_bit( uldaq.DigitalPortType.AUXPORT, channel, uldaq.DigitalDirection.OUTPUT ) self.dio_device.d_bit_out( uldaq.DigitalPortType.AUXPORT, bit_number=channel, data=bit ) def read_bit(self, channel: int = 0) -> int: """ Reads a 0 / 1 from the specified digital channel Parameters ---------- channel : int Digital channel to read from Returns ------- bit : int 0 or 1 from the digital channel """ bit = self.dio_device.d_bit_in(uldaq.DigitalPortType.AUXPORT, channel) return bit def disconnect_dac(self): self.daq_device.disconnect() self.daq_device.release() def check_attenuator(self) -> None: """ For checking the attenuator in the DAC device that was implemented to attenuate the analog signal to mV. Writes to Channel 0 of the analog output with different attenuation levels 0, 0, -2, -5, -10, -20, -50 dB and the second 0 has a software mute """ SAMPLERATE = 40_000.0 DURATION = 5 AMPLITUDE = 1 SINFREQ = 1 t = np.arange(0, DURATION, 1 / SAMPLERATE) data = AMPLITUDE * np.sin(2 * np.pi * SINFREQ * t) # data_channels = np.concatenate((data, data)) db_values = [0, 0, -2, -5, -10, -20, -50] for i, db_value in enumerate(db_values): log.info(f"Attenuating the Channels, with {db_value}") if i == 1: log.info("Muting the Channels") self.set_attenuation_level( db_value, db_value, mute_channel1=True, mute_channel2=True ) else: self.set_attenuation_level(db_value, db_value) _ = self.write_analog( data, [0, 0], SAMPLERATE, ScanOption=uldaq.ScanOption.EXTTRIGGER, Range=uldaq.Range.BIP10VOLTS, ) self.digital_trigger() try: self.ao_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, 15) self.write_bit(channel=0, bit=0) self.set_analog_to_zero() except uldaq.ul_exception.ULException: log.debug("Operation timed out") self.write_bit(channel=0, bit=0) self.disconnect_dac() self.connect_dac() self.set_analog_to_zero() finally: self.write_bit(channel=0, bit=0) self.disconnect_dac() self.connect_dac() self.set_analog_to_zero() log.info("Sleeping for 1 second, before next attenuation") time.sleep(1) def set_attenuation_level( self, db_channel1: float = 5.0, db_channel2: float = 5.0, mute_channel1: bool = False, mute_channel2: bool = False, ): """ Setting the attenuation level of the chip that is connected to the DAQ The attenuation level is set by writing to the connected digital output pin 5 where the strobepin 6 is signaling the when the bit was send. The cspin is set from 1 to 0 for the start and 0 to 1 for signaling the end of the data write process. The mute pin should be set to 1 for the device to be working. More information in the AttCS3310.pdf in the doc ident : attdev-1 strobepin : 6 datainpin : 5 dataoutpin: -1 cspin : 4 mutepin : 7 zcenpin : -1 Parameters ---------- db_channel1 : float dB Attenuation level for the first channel db_channel2 : float dB Attenuation level for the second channel mute_channel1 : bool Software mute for the first channel mute_channel2 : bool Software mute for the second channel """ self.activate_attenuator() hardware_possible_db = np.arange(-95.5, 32.0, 0.5) byte_number = np.arange(1, 256) byte_number_db1 = byte_number[hardware_possible_db == db_channel1][0] binary_db1 = np.binary_repr(byte_number_db1, width=8) byte_number_db2 = byte_number[hardware_possible_db == db_channel2][0] binary_db2 = np.binary_repr(byte_number_db2, width=8) if mute_channel1: log.info("Muting channel one") binary_db1 = "00000000" if mute_channel2: log.info("Muting channel one") binary_db2 = "00000000" channels_db = binary_db2 + binary_db1 self.write_bit(channel=4, bit=0) for b in channels_db: self.write_bit(channel=5, bit=int(b)) time.time_ns() self.write_bit(channel=6, bit=1) time.time_ns() self.write_bit(channel=6, bit=0) time.time_ns() self.write_bit(channel=4, bit=1) def activate_attenuator(self): """ Activation of the attenuator, where the cspin and mute pin is set to 1, and the datapin and strobpin to 0 """ for ch, b in zip([4, 5, 6, 7], [1, 0, 0, 1]): self.write_bit(channel=ch, bit=b) def deactivate_attenuator(self): """ Writes a 0 to the mute pin, which is deactivating the attenuator """ # mute should be enabled for starting calibration self.write_bit(channel=7, bit=0)