From 9f7d28ccf85ce6b010710b9f50b7cddf7de8996c Mon Sep 17 00:00:00 2001 From: wendtalexander Date: Tue, 24 Sep 2024 17:34:34 +0200 Subject: [PATCH] adding assertions, digital trigger --- pyrelacs/repros/repos.py | 106 ++++++++++++++++++++++++++------------- 1 file changed, 71 insertions(+), 35 deletions(-) diff --git a/pyrelacs/repros/repos.py b/pyrelacs/repros/repos.py index 67c8d31..a0d0b34 100644 --- a/pyrelacs/repros/repos.py +++ b/pyrelacs/repros/repos.py @@ -1,4 +1,5 @@ from ctypes import Array, c_double +import time from typing import Union from IPython import embed import numpy.typing as npt @@ -47,14 +48,15 @@ class Repos: ScanOption: uldaq.ScanOption = uldaq.ScanOption.DEFAULTIO, AInScanFlag: uldaq.AInScanFlag = uldaq.AInScanFlag.DEFAULT, ) -> Array[c_double]: - if channels[0] == channels[1]: - channel_len = 1 + assert len(channels) == 2, log.error("You can only provide two channels [0, 1]") + + if channels[0] != channels[1]: + buffer_len_channels = 2 else: - channel_len = len(channels) - assert len(channels) == 2, log.error("Please provide a list with two ints") + buffer_len_channels = 1 buffer_len = np.shape(np.arange(0, duration, 1 / samplerate))[0] - data_analog_input = uldaq.create_float_buffer(channel_len, buffer_len) + data_analog_input = uldaq.create_float_buffer(buffer_len_channels, buffer_len) er = self.ai_device.a_in_scan( channels[0], @@ -67,11 +69,10 @@ class Repos: AInScanFlag, data=data_analog_input, ) - # ai_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, timeout=-1) return data_analog_input - def send_analog_dac( + def write_analog_dac( self, data: Union[list, npt.NDArray], channels: list[int], @@ -105,52 +106,79 @@ class Repos: ------- Array[c_double] ao_device + """ + assert len(channels) == 2, log.error("You can only provide two channels [0, 1]") - """ - buffer = c_double * len(data) + if channels[0] != channels[1]: + buffer_len_channels = 2 + else: + buffer_len_channels = 1 + + buffer = c_double * (len(data) * buffer_len_channels) + assert len(data) != len(data) * buffer_len_channels, log.error( + "Data must be duplicated for both channels data[len(data_channel) + len(data_channel1)]..." + ) data_analog_output = buffer(*data) log.debug(f"Created C_double data {data_analog_output}") + try: + err = self.ao_device.a_out_scan( + channels[0], + channels[1], + Range, + int(len(data)), + samplerate, + ScanOption, + AOutScanFlag, + data_analog_output, + ) + except Exception as e: + print(f"{e}") + self.set_analog_to_zero() + self.disconnect_dac() - err = self.ao_device.a_out_scan( - channels[0], - channels[1], - Range, - int(len(data)), - samplerate, - ScanOption, - AOutScanFlag, - data_analog_output, - ) - log.info(f"The actual scan rate was {err}") # ao_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, 11) return data_analog_output, self.ao_device def set_analog_to_zero(self, channels: list[int] = [0, 1]): - err = self.ao_device.a_out_list( - channels[0], - channels[1], - [ - uldaq.Range.BIP10VOLTS, - uldaq.Range.BIP10VOLTS, - ], - uldaq.AOutListFlag.DEFAULT, - [0, 0], - ) - - def digital_trigger(self, channel: int = 0, data: int = 1) -> None: - log.info(f"{self.daq_device}") + try: + err = self.ao_device.a_out_list( + channels[0], + channels[1], + [ + uldaq.Range.BIP10VOLTS, + uldaq.Range.BIP10VOLTS, + ], + uldaq.AOutListFlag.DEFAULT, + [0, 0], + ) + except Exception as e: + log.error("f{e}") + log.error("disconnection dac") + self.disconnect_dac() + + def diggital_trigger(self) -> None: + if not self.read_bit(channel=0): + self.write_bit(channel=0, bit=1) + else: + self.write_bit(channel=0, bit=0) + time.time_ns() + self.write_bit(channel=0, bit=1) + def write_bit(self, channel: int = 0, bit: int = 1) -> None: self.dio_device.d_config_bit( uldaq.DigitalPortType.AUXPORT, channel, uldaq.DigitalDirection.OUTPUT ) - self.dio_device.d_bit_out( - uldaq.DigitalPortType.AUXPORT, bit_number=channel, data=data + uldaq.DigitalPortType.AUXPORT, bit_number=channel, data=bit ) + def read_bit(self, channel: int = 0): + bit = self.dio_device.d_bit_in(uldaq.DigitalPortType.AUXPORT, channel) + return bit + def read_digitalio( self, channels: list[int], @@ -181,6 +209,14 @@ class Repos: ) return data_digital_input + def write_byte(self, channel: int, byte: list[int]): + self.dio_device.d_config_bit( + uldaq.DigitalPortType.AUXPORT, channel, uldaq.DigitalDirection.OUTPUT + ) + for bit in byte: + self.dio_device.d_bit_out(uldaq.DigitalPortType.AUXPORT, channel, bit) + time.sleep(1) + def disconnect_dac(self): self.daq_device.disconnect() self.daq_device.release()