diff --git a/pyrelacs/repros/input.py b/pyrelacs/repros/input.py index 8728f41..7497a76 100644 --- a/pyrelacs/repros/input.py +++ b/pyrelacs/repros/input.py @@ -2,12 +2,12 @@ import uldaq import matplotlib.pyplot as plt from pyrelacs.util.logging import config_logging -from .repos import Repos +from .repos import MccDac log = config_logging() -class ReadData(Repos): +class ReadData(MccDac): def __init__(self) -> None: super().__init__() diff --git a/pyrelacs/repros/repos.py b/pyrelacs/repros/mccdac.py similarity index 63% rename from pyrelacs/repros/repos.py rename to pyrelacs/repros/mccdac.py index a0d0b34..78e3f56 100644 --- a/pyrelacs/repros/repos.py +++ b/pyrelacs/repros/mccdac.py @@ -11,7 +11,7 @@ from pyrelacs.util.logging import config_logging log = config_logging() -class Repos: +class MccDac: def __init__(self) -> None: devices = uldaq.get_daq_device_inventory(uldaq.InterfaceType.USB) log.debug(f"Found daq devices {len(devices)}, connecting to the first one") @@ -80,7 +80,7 @@ class Repos: Range: uldaq.Range = uldaq.Range.BIP10VOLTS, ScanOption: uldaq.ScanOption = uldaq.ScanOption.DEFAULTIO, AOutScanFlag: uldaq.AOutScanFlag = uldaq.AOutScanFlag.DEFAULT, - ): + ) -> None: """ Parameters @@ -116,9 +116,6 @@ class Repos: buffer_len_channels = 1 buffer = c_double * (len(data) * buffer_len_channels) - assert len(data) != len(data) * buffer_len_channels, log.error( - "Data must be duplicated for both channels data[len(data_channel) + len(data_channel1)]..." - ) data_analog_output = buffer(*data) log.debug(f"Created C_double data {data_analog_output}") @@ -138,10 +135,6 @@ class Repos: self.set_analog_to_zero() self.disconnect_dac() - # ao_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, 11) - - return data_analog_output, self.ao_device - def set_analog_to_zero(self, channels: list[int] = [0, 1]): try: err = self.ao_device.a_out_list( @@ -160,12 +153,12 @@ class Repos: self.disconnect_dac() def diggital_trigger(self) -> None: - if not self.read_bit(channel=0): - self.write_bit(channel=0, bit=1) - else: + if self.read_bit(channel=1): self.write_bit(channel=0, bit=0) time.time_ns() self.write_bit(channel=0, bit=1) + else: + self.write_bit(channel=0, bit=1) def write_bit(self, channel: int = 0, bit: int = 1) -> None: self.dio_device.d_config_bit( @@ -209,23 +202,119 @@ class Repos: ) return data_digital_input - def write_byte(self, channel: int, byte: list[int]): - self.dio_device.d_config_bit( - uldaq.DigitalPortType.AUXPORT, channel, uldaq.DigitalDirection.OUTPUT - ) - for bit in byte: - self.dio_device.d_bit_out(uldaq.DigitalPortType.AUXPORT, channel, bit) - time.sleep(1) - def disconnect_dac(self): self.daq_device.disconnect() self.daq_device.release() - def run_repo(self) -> None: - pass + def check_attenuator(self): + """ + ident : attdev-1 + strobepin : 6 + datainpin : 5 + dataoutpin: -1 + cspin : 4 + mutepin : 7 + zcenpin : -1 + """ + t = np.arange(0, DURATION, 1 / SAMPLERATE) + data = AMPLITUDE * np.sin(2 * np.pi * SINFREQ * t) + data_channels = np.concatenate((data, data)) + + db_values = [0, 0, -2, -5, -10, -20, -50] + for i, db_value in enumerate(db_values): + log.info(f"Attenuating the Channels, with {db_value}") + if i == 1: + log.info("Muting the Channels") + self.set_attenuation_level( + db_value, db_value, mute_channel1=True, mute_channel2=True + ) + else: + self.set_attenuation_level(db_value, db_value) + + self.write_analog_dac( + data_channels, + [0, 1], + SAMPLERATE, + ScanOption=uldaq.ScanOption.EXTTRIGGER, + Range=uldaq.Range.BIP10VOLTS, + ) + self.diggital_trigger() + + try: + self.ao_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, 15) + except uldaq.ul_exception.ULException: + log.debug("Operation timed out") + self.write_bit(channel=0, bit=0) + self.disconnect_dac() + self.connect_dac() + self.set_analog_to_zero() + finally: + self.write_bit(channel=0, bit=0) + self.disconnect_dac() + self.connect_dac() + self.set_analog_to_zero() + + log.info("Sleeping for 1 second, before next attenuation") + time.sleep(1) + self.deactivate_attenuator() + + def set_attenuation_level( + self, + db_channel1: float = 5.0, + db_channel2: float = 5.0, + mute_channel1: bool = False, + mute_channel2: bool = False, + ): + """ + ident : attdev-1 + strobepin : 6 + datainpin : 5 + dataoutpin: -1 + cspin : 4 + mutepin : 7 + zcenpin : -1 + """ + + self.activate_attenuator() + hardware_possible_db = np.arange(-95.5, 32.0, 0.5) + byte_number = np.arange(1, 256) + byte_number_db1 = byte_number[hardware_possible_db == db_channel1][0] + binary_db1 = np.binary_repr(byte_number_db1, width=8) + byte_number_db2 = byte_number[hardware_possible_db == db_channel2][0] + binary_db2 = np.binary_repr(byte_number_db2, width=8) + if mute_channel1: + log.info("Muting channel one") + binary_db1 = "00000000" + if mute_channel2: + log.info("Muting channel one") + binary_db2 = "00000000" + + channels_db = binary_db1 + binary_db2 + self.write_bit(channel=4, bit=0) + for b in channels_db: + self.write_bit(channel=5, bit=int(b)) + time.time_ns() + self.write_bit(channel=6, bit=1) + time.time_ns() + self.write_bit(channel=6, bit=0) + time.time_ns() + self.write_bit(channel=4, bit=1) + + def activate_attenuator(self): + for ch, b in zip([4, 5, 6, 7], [1, 0, 0, 1]): + self.write_bit(channel=ch, bit=b) + + def deactivate_attenuator(self): + # mute should be enabled for starting calibration + self.write_bit(channel=7, bit=0) + - def stop_repo(self) -> None: - pass +if __name__ == "__main__": + SAMPLERATE = 40_000.0 + DURATION = 5 + AMPLITUDE = 1 + SINFREQ = 1 - def reload_repo(self) -> None: - pass + att = MccDac() + # att.set_attenuation_level(db_channel1=5, db_channel2=5) + att.check_attenuator() diff --git a/pyrelacs/repros/output.py b/pyrelacs/repros/output.py index 9b74df1..2eebe79 100644 --- a/pyrelacs/repros/output.py +++ b/pyrelacs/repros/output.py @@ -2,7 +2,7 @@ import ctypes import uldaq from IPython import embed -from pyrelacs.repros.repos import Repos +from pyrelacs.repros.repos import MccDac from pyrelacs.util.logging import config_logging import numpy as np import matplotlib.pyplot as plt @@ -10,7 +10,7 @@ import matplotlib.pyplot as plt log = config_logging() -class Output_daq(Repos): +class Output_daq(MccDac): def __init__(self) -> None: super().__init__() # devices = uldaq.get_daq_device_inventory(uldaq.InterfaceType.USB)