Compare commits

..

No commits in common. "26f43151a27672ad662dce7ca22e0f595d6e2f84" and "b912159b760f77fdb066765fd07f6b138bcc1a68" have entirely different histories.

3 changed files with 112 additions and 44 deletions

View File

@ -0,0 +1,26 @@
import time
import uldaq
from IPython import embed
from pyrelacs.repros.repos import MccDac
from pyrelacs.util.logging import config_logging
import numpy as np
log = config_logging()
class Attenuator(MccDac):
def __init__(self) -> None:
super().__init__()
if __name__ == "__main__":
SAMPLERATE = 40_000.0
DURATION = 5
AMPLITUDE = 1
SINFREQ = 1
att = Attenuator()
# att.set_attenuation_level(db_channel1=5, db_channel2=5)
att.check_attenuator()

View File

@ -19,29 +19,60 @@ class Calibration(MccDac):
def __init__(self) -> None: def __init__(self) -> None:
super().__init__() super().__init__()
def run_calibration(self):
# Stimulus
t = np.arange(0, DURATION, 1 / SAMPLERATE)
data = AMPLITUDE * np.sin(2 * np.pi * SINFREQ * t)
# sending stimulus
self.write_analog(
data, [0, 0], SAMPLERATE, ScanOption=uldaq.ScanOption.EXTTRIGGER
)
# read_data = self.read_analog_daq(
# [0, 1], DURATION, SAMPLERATE, ScanOption=uldaq.ScanOption.EXTTRIGGER
# )
self.diggital_trigger()
try:
self.ao_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, 15)
except uldaq.ul_exception.ULException:
log.debug("Operation timed out")
# reset the diggital trigger
self.write_bit(channel=0, bit=0)
time.sleep(1)
self.set_analog_to_zero()
def check_amplitude(self): def check_amplitude(self):
self.set_attenuation_level(db_channel1=0.0, db_channel2=0.0)
# write to ananlog 1 # write to ananlog 1
t = np.arange(0, DURATION, 1 / SAMPLERATE) t = np.arange(0, DURATION, 1 / SAMPLERATE)
data = AMPLITUDE * np.sin(2 * np.pi * SINFREQ * t) data = AMPLITUDE * np.sin(2 * np.pi * SINFREQ * t)
data_channels = np.zeros(2 * len(data))
# c = [(i,for i,j in zip(data, data)]
stim = self.write_analog( # sending stimulus
log.debug(f"{data}, {data.shape}")
# self.set_attenuation_level(db_channel1=0.0, db_channel2=0.0)
# self.set_analog_to_zero()
# time.sleep(1)
log.debug(self.ao_device)
embed()
exit()
self.write_analog(
data, data,
[0, 0], [1, 1],
SAMPLERATE, SAMPLERATE,
ScanOption=uldaq.ScanOption.EXTTRIGGER, ScanOption=uldaq.ScanOption.EXTTRIGGER,
) )
data_channel_one = self.read_analog( # data_channel_one = self.read_analog(
[0, 0], DURATION, SAMPLERATE, ScanOption=uldaq.ScanOption.EXTTRIGGER # [0, 0], DURATION, SAMPLERATE, ScanOption=uldaq.ScanOption.EXTTRIGGER
) # )
log.debug(self.ao_device)
time.sleep(1) time.sleep(1)
self.diggital_trigger() self.diggital_trigger()
try: try:
self.ai_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, 15) self.ao_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, 15)
self.write_bit(channel=0, bit=0) self.write_bit(channel=0, bit=0)
time.sleep(1) time.sleep(1)
self.set_analog_to_zero() self.set_analog_to_zero()
@ -59,9 +90,9 @@ class Calibration(MccDac):
if __name__ == "__main__": if __name__ == "__main__":
SAMPLERATE = 40_000.0 SAMPLERATE = 40_000.0
DURATION = 5 DURATION = 5
AMPLITUDE = 0.5 AMPLITUDE = 1
SINFREQ = 10 SINFREQ = 10
cal = Calibration() daq_input = Calibration()
# cal.ccheck_attenuator() # daq_input.run_calibration()
cal.check_amplitude() daq_input.check_attenuator()

View File

@ -1,6 +1,7 @@
from ctypes import Array, c_double from ctypes import Array, c_double
import time import time
from typing import Union from typing import Union
from IPython import embed from IPython import embed
import numpy.typing as npt import numpy.typing as npt
import uldaq import uldaq
@ -25,6 +26,10 @@ class MccDac:
self.dio_device = self.daq_device.get_dio_device() self.dio_device = self.daq_device.get_dio_device()
log.debug("Connected") log.debug("Connected")
# log.debug("Activate Attenuator")
# self.set_attenuation_level(db_channel1=0.0, db_channel2=0.0)
# self.set_analog_to_zero()
def connect_dac(self): def connect_dac(self):
devices = uldaq.get_daq_device_inventory(uldaq.InterfaceType.USB) devices = uldaq.get_daq_device_inventory(uldaq.InterfaceType.USB)
log.debug(f"Found daq devices {len(devices)}, connecting to the first one") log.debug(f"Found daq devices {len(devices)}, connecting to the first one")
@ -80,30 +85,33 @@ class MccDac:
Range: uldaq.Range = uldaq.Range.BIP10VOLTS, Range: uldaq.Range = uldaq.Range.BIP10VOLTS,
ScanOption: uldaq.ScanOption = uldaq.ScanOption.DEFAULTIO, ScanOption: uldaq.ScanOption = uldaq.ScanOption.DEFAULTIO,
AOutScanFlag: uldaq.AOutScanFlag = uldaq.AOutScanFlag.DEFAULT, AOutScanFlag: uldaq.AOutScanFlag = uldaq.AOutScanFlag.DEFAULT,
) -> Array[c_double]: ):
assert len(channels) == 2, log.error("You can only provide two channels [0, 1]") assert len(channels) == 2, log.error("You can only provide two channels [0, 1]")
log.debug(f"{len(data)}, {type(data)}")
buffer = c_double * len(data) buffer = c_double * len(data)
data_analog_output = buffer(*data) data_analog_output = buffer(*data)
log.debug(f"Created C_double data {data_analog_output}") log.debug(f"Created C_double data {data_analog_output}")
try:
err = self.ao_device.a_out_scan(
channels[0],
channels[1],
Range,
int(len(data)),
samplerate,
ScanOption,
AOutScanFlag,
data_analog_output,
)
except Exception as e:
print(f"{e}")
self.set_analog_to_zero()
self.disconnect_dac()
return data_analog_output log.info(self.ao_device)
err = self.ao_device.a_out_scan(
channels[0],
channels[1],
Range,
int(len(data)),
samplerate,
ScanOption,
AOutScanFlag,
data_analog_output,
)
# self.diggital_trigger()
# self.ao_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, 11)
# except Exception as e:
# print(f"{e}")
# self.set_analog_to_zero()
# self.disconnect_dac()
def set_analog_to_zero(self, channels: list[int] = [0, 1]): def set_analog_to_zero(self, channels: list[int] = [0, 1]):
try: try:
@ -122,15 +130,18 @@ class MccDac:
log.error("disconnection dac") log.error("disconnection dac")
self.disconnect_dac() self.disconnect_dac()
def diggital_trigger(self) -> None: def diggital_trigger(self, channel: int = 0) -> None:
if not self.read_bit(channel=0): bit_channel = self.read_bit(channel)
self.write_bit(channel=0, bit=1) log.debug(bit_channel)
if not bit_channel:
self.write_bit(channel, 1)
else: else:
self.write_bit(channel=0, bit=0) self.write_bit(channel, 0)
time.time_ns() time.sleep(1)
self.write_bit(channel=0, bit=1) self.write_bit(channel, 1)
def write_bit(self, channel: int = 0, bit: int = 1) -> None: def write_bit(self, channel: int = 0, bit: int = 1) -> None:
log.debug(self.dio_device)
self.dio_device.d_config_bit( self.dio_device.d_config_bit(
uldaq.DigitalPortType.AUXPORT, channel, uldaq.DigitalDirection.OUTPUT uldaq.DigitalPortType.AUXPORT, channel, uldaq.DigitalDirection.OUTPUT
) )
@ -139,6 +150,7 @@ class MccDac:
) )
def read_bit(self, channel: int = 0): def read_bit(self, channel: int = 0):
log.debug(self.dio_device)
bit = self.dio_device.d_bit_in(uldaq.DigitalPortType.AUXPORT, channel) bit = self.dio_device.d_bit_in(uldaq.DigitalPortType.AUXPORT, channel)
return bit return bit
@ -193,7 +205,7 @@ class MccDac:
SINFREQ = 1 SINFREQ = 1
t = np.arange(0, DURATION, 1 / SAMPLERATE) t = np.arange(0, DURATION, 1 / SAMPLERATE)
data = AMPLITUDE * np.sin(2 * np.pi * SINFREQ * t) data = AMPLITUDE * np.sin(2 * np.pi * SINFREQ * t)
# data_channels = np.concatenate((data, data)) data_channels = np.concatenate((data, data))
db_values = [0, 0, -2, -5, -10, -20, -50] db_values = [0, 0, -2, -5, -10, -20, -50]
db_values = [0, -10, -20] db_values = [0, -10, -20]
@ -207,9 +219,9 @@ class MccDac:
else: else:
self.set_attenuation_level(db_value, db_value) self.set_attenuation_level(db_value, db_value)
_ = self.write_analog( stim, ao_device = self.write_analog_dac(
data, data_channels,
[0, 0], [0, 1],
SAMPLERATE, SAMPLERATE,
ScanOption=uldaq.ScanOption.EXTTRIGGER, ScanOption=uldaq.ScanOption.EXTTRIGGER,
Range=uldaq.Range.BIP10VOLTS, Range=uldaq.Range.BIP10VOLTS,
@ -217,9 +229,7 @@ class MccDac:
self.diggital_trigger() self.diggital_trigger()
try: try:
self.ao_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, 15) ao_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, 15)
self.write_bit(channel=0, bit=0)
self.set_analog_to_zero()
except uldaq.ul_exception.ULException: except uldaq.ul_exception.ULException:
log.debug("Operation timed out") log.debug("Operation timed out")
self.write_bit(channel=0, bit=0) self.write_bit(channel=0, bit=0)
@ -234,6 +244,7 @@ class MccDac:
log.info("Sleeping for 1 second, before next attenuation") log.info("Sleeping for 1 second, before next attenuation")
time.sleep(1) time.sleep(1)
self.deactivate_attenuator()
def set_attenuation_level( def set_attenuation_level(
self, self,