8 Commits

Author SHA1 Message Date
wendtalexander
1579c947c9 changing Repos to mccdac 2024-09-25 07:57:32 +02:00
wendtalexander
d6e2f8c5ba changing Repos to MccDac 2024-09-25 07:57:07 +02:00
wendtalexander
8e73b2ae1f adding to mccdac 2024-09-25 07:56:46 +02:00
wendtalexander
9f7d28ccf8 adding assertions, digital trigger 2024-09-24 17:34:34 +02:00
wendtalexander
3865bb8216 adding try execpt 2024-09-24 17:34:17 +02:00
wendtalexander
2317fd73c8 adding attenuator class 2024-09-24 17:33:59 +02:00
wendtalexander
fbb4d3b81d bug cant read digio line with d_in_scan 2024-09-23 17:21:52 +02:00
wendtalexander
7d02cb994f adding unlimited time 2024-09-23 14:32:49 +02:00
6 changed files with 375 additions and 163 deletions

View File

@@ -0,0 +1,26 @@
import time
import uldaq
from IPython import embed
from pyrelacs.repros.repos import MccDac
from pyrelacs.util.logging import config_logging
import numpy as np
log = config_logging()
class Attenuator(MccDac):
def __init__(self) -> None:
super().__init__()
if __name__ == "__main__":
SAMPLERATE = 40_000.0
DURATION = 5
AMPLITUDE = 1
SINFREQ = 1
att = Attenuator()
# att.set_attenuation_level(db_channel1=5, db_channel2=5)
att.check_attenuator()

View File

@@ -1,8 +1,10 @@
import ctypes import ctypes
import time
import uldaq import uldaq
from IPython import embed from IPython import embed
from pyrelacs.repros.repos import Repos from pyrelacs.repros.repos import MccDac
from pyrelacs.repros.attcs3310 import Attenuator
from pyrelacs.util.logging import config_logging from pyrelacs.util.logging import config_logging
import numpy as np import numpy as np
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
@@ -10,33 +12,39 @@ import matplotlib.pyplot as plt
log = config_logging() log = config_logging()
class Calibration(Repos): class Calibration(MccDac):
def __init__(self) -> None: def __init__(self) -> None:
super().__init__() super().__init__()
def run_calibration(self): def run_calibration(self):
# Stimulus # Stimulus
time = np.arange(0, DURATION, 1 / SAMPLERATE) t = np.arange(0, DURATION, 1 / SAMPLERATE)
data = AMPLITUDE * np.sin(2 * np.pi * SINFREQ * time) data = AMPLITUDE * np.sin(2 * np.pi * SINFREQ * t)
# sending stimulus # sending stimulus
stim, ao_device = self.send_analog_dac(
stim, ao_device = self.write_analog_dac(
data, [0, 0], SAMPLERATE, ScanOption=uldaq.ScanOption.EXTTRIGGER data, [0, 0], SAMPLERATE, ScanOption=uldaq.ScanOption.EXTTRIGGER
) )
read_data = self.read_analog_daq(
[0, 1], DURATION, SAMPLERATE, ScanOption=uldaq.ScanOption.EXTTRIGGER # read_data = self.read_analog_daq(
) # [0, 1], DURATION, SAMPLERATE, ScanOption=uldaq.ScanOption.EXTTRIGGER
self.digital_trigger() # )
ao_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, 11)
self.digital_trigger(data=0) self.diggital_trigger()
self.disconnect_dac() try:
embed() ao_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, 15)
exit() except uldaq.ul_exception.ULException:
log.debug("Operation timed out")
# reset the diggital trigger
self.write_bit(channel=0, bit=0)
time.sleep(1)
self.set_analog_to_zero()
if __name__ == "__main__": if __name__ == "__main__":
SAMPLERATE = 40_000.0 SAMPLERATE = 40_000.0
DURATION = 5 DURATION = 10
AMPLITUDE = 3 AMPLITUDE = 1
SINFREQ = 1 SINFREQ = 100
daq_input = Calibration() daq_input = Calibration()
daq_input.run_calibration() daq_input.run_calibration()

View File

@@ -2,12 +2,12 @@ import uldaq
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
from pyrelacs.util.logging import config_logging from pyrelacs.util.logging import config_logging
from .repos import Repos from .repos import MccDac
log = config_logging() log = config_logging()
class ReadData(Repos): class ReadData(MccDac):
def __init__(self) -> None: def __init__(self) -> None:
super().__init__() super().__init__()

320
pyrelacs/repros/mccdac.py Normal file
View File

@@ -0,0 +1,320 @@
from ctypes import Array, c_double
import time
from typing import Union
from IPython import embed
import numpy.typing as npt
import uldaq
import numpy as np
from pyrelacs.util.logging import config_logging
log = config_logging()
class MccDac:
def __init__(self) -> None:
devices = uldaq.get_daq_device_inventory(uldaq.InterfaceType.USB)
log.debug(f"Found daq devices {len(devices)}, connecting to the first one")
if len(devices) == 0:
log.error("Did not found daq devices, please connect one")
exit(1)
self.daq_device = uldaq.DaqDevice(devices[0])
self.daq_device.connect()
self.ai_device = self.daq_device.get_ai_device()
self.ao_device = self.daq_device.get_ao_device()
self.dio_device = self.daq_device.get_dio_device()
log.debug("Connected")
def connect_dac(self):
devices = uldaq.get_daq_device_inventory(uldaq.InterfaceType.USB)
log.debug(f"Found daq devices {len(devices)}, connecting to the first one")
if len(devices) == 0:
log.error("Did not found daq devices, please connect one")
exit(1)
self.daq_device = uldaq.DaqDevice(devices[0])
self.daq_device.connect()
self.ai_device = self.daq_device.get_ai_device()
self.ao_device = self.daq_device.get_ao_device()
self.dio_device = self.daq_device.get_dio_device()
log.debug("Connected")
def read_analog_daq(
self,
channels: list[int],
duration: int,
samplerate: float,
AiInputMode: uldaq.AiInputMode = uldaq.AiInputMode.SINGLE_ENDED,
Range: uldaq.Range = uldaq.Range.BIP10VOLTS,
ScanOption: uldaq.ScanOption = uldaq.ScanOption.DEFAULTIO,
AInScanFlag: uldaq.AInScanFlag = uldaq.AInScanFlag.DEFAULT,
) -> Array[c_double]:
assert len(channels) == 2, log.error("You can only provide two channels [0, 1]")
if channels[0] != channels[1]:
buffer_len_channels = 2
else:
buffer_len_channels = 1
buffer_len = np.shape(np.arange(0, duration, 1 / samplerate))[0]
data_analog_input = uldaq.create_float_buffer(buffer_len_channels, buffer_len)
er = self.ai_device.a_in_scan(
channels[0],
channels[1],
AiInputMode,
Range,
buffer_len,
samplerate,
ScanOption,
AInScanFlag,
data=data_analog_input,
)
return data_analog_input
def write_analog_dac(
self,
data: Union[list, npt.NDArray],
channels: list[int],
samplerate: float,
Range: uldaq.Range = uldaq.Range.BIP10VOLTS,
ScanOption: uldaq.ScanOption = uldaq.ScanOption.DEFAULTIO,
AOutScanFlag: uldaq.AOutScanFlag = uldaq.AOutScanFlag.DEFAULT,
) -> None:
"""
Parameters
----------
data : Union[list, npt.NDArray]
channels : list[int]
duration : int
samplerate : float
AiInputMode : uldaq.AiInputMode
Range : uldaq.Range
ScanOption : uldaq.ScanOption
AInScanFlag : uldaq.AOutScanFlag
Returns
-------
Array[c_double]
ao_device
"""
assert len(channels) == 2, log.error("You can only provide two channels [0, 1]")
if channels[0] != channels[1]:
buffer_len_channels = 2
else:
buffer_len_channels = 1
buffer = c_double * (len(data) * buffer_len_channels)
data_analog_output = buffer(*data)
log.debug(f"Created C_double data {data_analog_output}")
try:
err = self.ao_device.a_out_scan(
channels[0],
channels[1],
Range,
int(len(data)),
samplerate,
ScanOption,
AOutScanFlag,
data_analog_output,
)
except Exception as e:
print(f"{e}")
self.set_analog_to_zero()
self.disconnect_dac()
def set_analog_to_zero(self, channels: list[int] = [0, 1]):
try:
err = self.ao_device.a_out_list(
channels[0],
channels[1],
[
uldaq.Range.BIP10VOLTS,
uldaq.Range.BIP10VOLTS,
],
uldaq.AOutListFlag.DEFAULT,
[0, 0],
)
except Exception as e:
log.error("f{e}")
log.error("disconnection dac")
self.disconnect_dac()
def diggital_trigger(self) -> None:
if self.read_bit(channel=1):
self.write_bit(channel=0, bit=0)
time.time_ns()
self.write_bit(channel=0, bit=1)
else:
self.write_bit(channel=0, bit=1)
def write_bit(self, channel: int = 0, bit: int = 1) -> None:
self.dio_device.d_config_bit(
uldaq.DigitalPortType.AUXPORT, channel, uldaq.DigitalDirection.OUTPUT
)
self.dio_device.d_bit_out(
uldaq.DigitalPortType.AUXPORT, bit_number=channel, data=bit
)
def read_bit(self, channel: int = 0):
bit = self.dio_device.d_bit_in(uldaq.DigitalPortType.AUXPORT, channel)
return bit
def read_digitalio(
self,
channels: list[int],
duration,
samplerate,
ScanOptions: uldaq.ScanOption = uldaq.ScanOption.DEFAULTIO,
DInScanFlag: uldaq.DInScanFlag = uldaq.DInScanFlag.DEFAULT,
):
if channels[0] == channels[1]:
channel_len = 1
else:
channel_len = len(channels)
buffer_len = np.shape(np.arange(0, duration, 1 / samplerate))[0]
data_digital_input = uldaq.create_int_buffer(channel_len, buffer_len)
self.dio_device.d_config_port(
uldaq.DigitalPortType.AUXPORT, uldaq.DigitalDirection.INPUT
)
scan_rate = self.dio_device.d_in_scan(
uldaq.DigitalPortType.AUXPORT0,
uldaq.DigitalPortType.AUXPORT0,
len(data_digital_input),
samplerate,
ScanOptions,
DInScanFlag,
data_digital_input,
)
return data_digital_input
def disconnect_dac(self):
self.daq_device.disconnect()
self.daq_device.release()
def check_attenuator(self):
"""
ident : attdev-1
strobepin : 6
datainpin : 5
dataoutpin: -1
cspin : 4
mutepin : 7
zcenpin : -1
"""
t = np.arange(0, DURATION, 1 / SAMPLERATE)
data = AMPLITUDE * np.sin(2 * np.pi * SINFREQ * t)
data_channels = np.concatenate((data, data))
db_values = [0, 0, -2, -5, -10, -20, -50]
for i, db_value in enumerate(db_values):
log.info(f"Attenuating the Channels, with {db_value}")
if i == 1:
log.info("Muting the Channels")
self.set_attenuation_level(
db_value, db_value, mute_channel1=True, mute_channel2=True
)
else:
self.set_attenuation_level(db_value, db_value)
self.write_analog_dac(
data_channels,
[0, 1],
SAMPLERATE,
ScanOption=uldaq.ScanOption.EXTTRIGGER,
Range=uldaq.Range.BIP10VOLTS,
)
self.diggital_trigger()
try:
self.ao_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, 15)
except uldaq.ul_exception.ULException:
log.debug("Operation timed out")
self.write_bit(channel=0, bit=0)
self.disconnect_dac()
self.connect_dac()
self.set_analog_to_zero()
finally:
self.write_bit(channel=0, bit=0)
self.disconnect_dac()
self.connect_dac()
self.set_analog_to_zero()
log.info("Sleeping for 1 second, before next attenuation")
time.sleep(1)
self.deactivate_attenuator()
def set_attenuation_level(
self,
db_channel1: float = 5.0,
db_channel2: float = 5.0,
mute_channel1: bool = False,
mute_channel2: bool = False,
):
"""
ident : attdev-1
strobepin : 6
datainpin : 5
dataoutpin: -1
cspin : 4
mutepin : 7
zcenpin : -1
"""
self.activate_attenuator()
hardware_possible_db = np.arange(-95.5, 32.0, 0.5)
byte_number = np.arange(1, 256)
byte_number_db1 = byte_number[hardware_possible_db == db_channel1][0]
binary_db1 = np.binary_repr(byte_number_db1, width=8)
byte_number_db2 = byte_number[hardware_possible_db == db_channel2][0]
binary_db2 = np.binary_repr(byte_number_db2, width=8)
if mute_channel1:
log.info("Muting channel one")
binary_db1 = "00000000"
if mute_channel2:
log.info("Muting channel one")
binary_db2 = "00000000"
channels_db = binary_db1 + binary_db2
self.write_bit(channel=4, bit=0)
for b in channels_db:
self.write_bit(channel=5, bit=int(b))
time.time_ns()
self.write_bit(channel=6, bit=1)
time.time_ns()
self.write_bit(channel=6, bit=0)
time.time_ns()
self.write_bit(channel=4, bit=1)
def activate_attenuator(self):
for ch, b in zip([4, 5, 6, 7], [1, 0, 0, 1]):
self.write_bit(channel=ch, bit=b)
def deactivate_attenuator(self):
# mute should be enabled for starting calibration
self.write_bit(channel=7, bit=0)
if __name__ == "__main__":
SAMPLERATE = 40_000.0
DURATION = 5
AMPLITUDE = 1
SINFREQ = 1
att = MccDac()
# att.set_attenuation_level(db_channel1=5, db_channel2=5)
att.check_attenuator()

View File

@@ -2,7 +2,7 @@ import ctypes
import uldaq import uldaq
from IPython import embed from IPython import embed
from pyrelacs.repros.repos import Repos from pyrelacs.repros.repos import MccDac
from pyrelacs.util.logging import config_logging from pyrelacs.util.logging import config_logging
import numpy as np import numpy as np
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
@@ -10,7 +10,7 @@ import matplotlib.pyplot as plt
log = config_logging() log = config_logging()
class Output_daq(Repos): class Output_daq(MccDac):
def __init__(self) -> None: def __init__(self) -> None:
super().__init__() super().__init__()
# devices = uldaq.get_daq_device_inventory(uldaq.InterfaceType.USB) # devices = uldaq.get_daq_device_inventory(uldaq.InterfaceType.USB)

View File

@@ -1,142 +0,0 @@
from ctypes import Array, c_double
from typing import Union
from IPython import embed
import numpy.typing as npt
import uldaq
import numpy as np
from pyrelacs.util.logging import config_logging
log = config_logging()
class Repos:
def __init__(self) -> None:
devices = uldaq.get_daq_device_inventory(uldaq.InterfaceType.USB)
log.debug(f"Found daq devices {len(devices)}, connecting to the first one")
if len(devices) == 0:
log.error("Did not found daq devices, please connect one")
exit(1)
self.daq_device = uldaq.DaqDevice(devices[0])
self.daq_device.connect()
log.debug("Connected")
def read_analog_daq(
self,
channels: list[int],
duration: int,
samplerate: float,
AiInputMode: uldaq.AiInputMode = uldaq.AiInputMode.SINGLE_ENDED,
Range: uldaq.Range = uldaq.Range.BIP10VOLTS,
ScanOption: uldaq.ScanOption = uldaq.ScanOption.DEFAULTIO,
AInScanFlag: uldaq.AInScanFlag = uldaq.AInScanFlag.DEFAULT,
) -> Array[c_double]:
if channels[0] == channels[1]:
channel_len = 1
else:
channel_len = len(channels)
assert len(channels) == 2, log.error("Please provide a list with two ints")
ai_device = self.daq_device.get_ai_device()
buffer_len = np.shape(np.arange(0, duration, 1 / samplerate))[0]
data_analog_input = uldaq.create_float_buffer(channel_len, buffer_len)
er = ai_device.a_in_scan(
channels[0],
channels[1],
AiInputMode,
Range,
buffer_len,
samplerate,
ScanOption,
AInScanFlag,
data=data_analog_input,
)
# ai_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, timeout=-1)
return data_analog_input
def send_analog_dac(
self,
data: Union[list, npt.NDArray],
channels: list[int],
samplerate: float,
Range: uldaq.Range = uldaq.Range.BIP10VOLTS,
ScanOption: uldaq.ScanOption = uldaq.ScanOption.DEFAULTIO,
AOutScanFlag: uldaq.AOutScanFlag = uldaq.AOutScanFlag.DEFAULT,
):
"""
Parameters
----------
data : Union[list, npt.NDArray]
channels : list[int]
duration : int
samplerate : float
AiInputMode : uldaq.AiInputMode
Range : uldaq.Range
ScanOption : uldaq.ScanOption
AInScanFlag : uldaq.AOutScanFlag
Returns
-------
Array[c_double]
ao_device
"""
buffer = c_double * len(data)
data_analog_output = buffer(*data)
log.debug(f"Created C_double data {data_analog_output}")
ao_device = self.daq_device.get_ao_device()
ao_info = ao_device.get_info()
err = ao_device.a_out_scan(
channels[0],
channels[1],
Range,
int(len(data)),
samplerate,
ScanOption,
AOutScanFlag,
data_analog_output,
)
log.info(f"The actual scan rate was {err}")
# ao_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, 11)
return data_analog_output, ao_device
def digital_trigger(self, portn: int = 0, data: int = 1) -> None:
log.info(f"{self.daq_device}")
dio_device = self.daq_device.get_dio_device()
dio_device.d_config_bit(
uldaq.DigitalPortType.AUXPORT, portn, uldaq.DigitalDirection.OUTPUT
)
dio_device.d_bit_out(uldaq.DigitalPortType.AUXPORT, bit_number=portn, data=data)
def disconnect_dac(self):
self.daq_device.disconnect()
self.daq_device.release()
def clean_up():
pass
def run_repo(self) -> None:
pass
def stop_repo(self) -> None:
pass
def reload_repo(self) -> None:
pass