Compare commits

..

No commits in common. "31bbffc48025328b12633eb7e9e32e620fc4ed30" and "971c1f434768b012c5eb8b136bc20a8f3737afd6" have entirely different histories.

4 changed files with 48 additions and 80 deletions

View File

@ -19,6 +19,7 @@ from IPython import embed
import numpy as np import numpy as np
from pyrelacs.util.logging import config_logging from pyrelacs.util.logging import config_logging
from pyrelacs.daq_thread import Worker
log = config_logging() log = config_logging()

View File

@ -1,42 +0,0 @@
import ctypes
import uldaq
from IPython import embed
from pyrelacs.repros.repos import Repos
from pyrelacs.util.logging import config_logging
import numpy as np
import matplotlib.pyplot as plt
log = config_logging()
class Calibration(Repos):
def __init__(self) -> None:
super().__init__()
def run_calibration(self):
# Stimulus
time = np.arange(0, DURATION, 1 / SAMPLERATE)
data = AMPLITUDE * np.sin(2 * np.pi * SINFREQ * time)
# sending stimulus
stim, ao_device = self.send_analog_dac(
data, [0, 0], SAMPLERATE, ScanOption=uldaq.ScanOption.EXTTRIGGER
)
read_data = self.read_analog_daq(
[0, 1], DURATION, SAMPLERATE, ScanOption=uldaq.ScanOption.EXTTRIGGER
)
self.digital_trigger()
ao_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, 11)
self.digital_trigger(data=0)
self.disconnect_dac()
embed()
exit()
if __name__ == "__main__":
SAMPLERATE = 40_000.0
DURATION = 5
AMPLITUDE = 3
SINFREQ = 1
daq_input = Calibration()
daq_input.run_calibration()

View File

@ -2,7 +2,6 @@ import ctypes
import uldaq import uldaq
from IPython import embed from IPython import embed
from pyrelacs.repros.repos import Repos
from pyrelacs.util.logging import config_logging from pyrelacs.util.logging import config_logging
import numpy as np import numpy as np
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
@ -10,23 +9,40 @@ import matplotlib.pyplot as plt
log = config_logging() log = config_logging()
class Output_daq(Repos): class Output_daq:
def __init__(self) -> None: def __init__(self) -> None:
super().__init__() devices = uldaq.get_daq_device_inventory(uldaq.InterfaceType.USB)
# devices = uldaq.get_daq_device_inventory(uldaq.InterfaceType.USB) self.daq_device = uldaq.DaqDevice(devices[0])
# self.daq_device = uldaq.DaqDevice(devices[0]) self.daq_device.connect()
# self.daq_device.connect()
def write_daq(self): def write_daq(self):
log.debug("running repro") log.debug("running repro")
time = np.arange(0, 10, 1 / 30_000.0) time = np.arange(0, 10, 1 / 30_000.0)
data = 2 * np.sin(2 * np.pi * 1 * time) data = 2 * np.sin(2 * np.pi * 10 * time)
self.send_analog_dac(
data, [0, 0], 30_000, ScanOption=uldaq.ScanOption.EXTTRIGGER buffer = ctypes.c_double * len(time)
data_c = buffer(*data)
log.debug(f"Created C_double data {data_c}")
ao_device = self.daq_device.get_ao_device()
ao_info = ao_device.get_info()
log.debug(
f"Analog info,\n Channels available {ao_info.get_num_chans()}, \n Max Samplerate: {ao_info.get_max_scan_rate()}"
)
err = ao_device.a_out_scan(
0,
0,
uldaq.Range.BIP10VOLTS,
int(len(data)),
30_000.0,
uldaq.ScanOption.DEFAULTIO,
uldaq.AOutScanFlag.DEFAULT,
data_c,
) )
ao_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, 11)
def trigger(self):
self.digital_trigger(1)
self.daq_device.disconnect() self.daq_device.disconnect()
self.daq_device.release() self.daq_device.release()
@ -34,4 +50,3 @@ class Output_daq(Repos):
if __name__ == "__main__": if __name__ == "__main__":
daq_input = Output_daq() daq_input = Output_daq()
daq_input.write_daq() daq_input.write_daq()
# daq_input.trigger()

View File

@ -1,6 +1,5 @@
from ctypes import Array, c_double from ctypes import Array, c_double
from typing import Union from typing import Union
from IPython import embed
import numpy.typing as npt import numpy.typing as npt
import uldaq import uldaq
import numpy as np import numpy as np
@ -31,15 +30,16 @@ class Repos:
ScanOption: uldaq.ScanOption = uldaq.ScanOption.DEFAULTIO, ScanOption: uldaq.ScanOption = uldaq.ScanOption.DEFAULTIO,
AInScanFlag: uldaq.AInScanFlag = uldaq.AInScanFlag.DEFAULT, AInScanFlag: uldaq.AInScanFlag = uldaq.AInScanFlag.DEFAULT,
) -> Array[c_double]: ) -> Array[c_double]:
if channels[0] == channels[1]:
channel_len = 1 if len(channels) > 2:
else: log.error("Please provide only two channels")
channel_len = len(channels) # Get the Ananlog In device and Analog Info
assert len(channels) == 2, log.error("Please provide a list with two ints")
ai_device = self.daq_device.get_ai_device() ai_device = self.daq_device.get_ai_device()
ai_info = ai_device.get_info()
buffer_len = np.shape(np.arange(0, duration, 1 / samplerate))[0] buffer_len = np.shape(np.arange(0, duration, 1 / samplerate))[0]
data_analog_input = uldaq.create_float_buffer(channel_len, buffer_len) data_analog_input = uldaq.create_float_buffer(1, buffer_len)
er = ai_device.a_in_scan( er = ai_device.a_in_scan(
channels[0], channels[0],
@ -52,7 +52,8 @@ class Repos:
AInScanFlag, AInScanFlag,
data=data_analog_input, data=data_analog_input,
) )
# ai_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, timeout=-1) ai_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, timeout=-1)
log.debug("Scanning")
return data_analog_input return data_analog_input
@ -60,11 +61,13 @@ class Repos:
self, self,
data: Union[list, npt.NDArray], data: Union[list, npt.NDArray],
channels: list[int], channels: list[int],
duration: int,
samplerate: float, samplerate: float,
AiInputMode: uldaq.AiInputMode = uldaq.AiInputMode.SINGLE_ENDED,
Range: uldaq.Range = uldaq.Range.BIP10VOLTS, Range: uldaq.Range = uldaq.Range.BIP10VOLTS,
ScanOption: uldaq.ScanOption = uldaq.ScanOption.DEFAULTIO, ScanOption: uldaq.ScanOption = uldaq.ScanOption.DEFAULTIO,
AOutScanFlag: uldaq.AOutScanFlag = uldaq.AOutScanFlag.DEFAULT, AInScanFlag: uldaq.AOutScanFlag = uldaq.AOutScanFlag.DEFAULT,
): ) -> Array[c_double]:
""" """
Parameters Parameters
@ -89,7 +92,6 @@ class Repos:
Returns Returns
------- -------
Array[c_double] Array[c_double]
ao_device
""" """
@ -99,6 +101,7 @@ class Repos:
log.debug(f"Created C_double data {data_analog_output}") log.debug(f"Created C_double data {data_analog_output}")
ao_device = self.daq_device.get_ao_device() ao_device = self.daq_device.get_ao_device()
ao_info = ao_device.get_info() ao_info = ao_device.get_info()
ao_device.set_trigger
err = ao_device.a_out_scan( err = ao_device.a_out_scan(
channels[0], channels[0],
@ -107,31 +110,22 @@ class Repos:
int(len(data)), int(len(data)),
samplerate, samplerate,
ScanOption, ScanOption,
AOutScanFlag, AInScanFlag,
data_analog_output, data_analog_output,
) )
log.info(f"The actual scan rate was {err}") ao_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, 11)
# ao_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, 11)
return data_analog_output, ao_device return data_analog_output
def digital_trigger(self, portn: int = 0, data: int = 1) -> None: def digital_trigger(self, portn):
log.info(f"{self.daq_device}")
dio_device = self.daq_device.get_dio_device()
dio_device.d_config_bit(
uldaq.DigitalPortType.AUXPORT, portn, uldaq.DigitalDirection.OUTPUT
)
dio_device.d_bit_out(uldaq.DigitalPortType.AUXPORT, bit_number=portn, data=data) dio_device = self.daq_device.get_dio_device()
dio_device.d_bit_out(uldaq.DigitalPortType.FIRSTPORTA, bit_number=portn, data=1)
def disconnect_dac(self): def disconnect_dac(self):
self.daq_device.disconnect() self.daq_device.disconnect()
self.daq_device.release() self.daq_device.release()
def clean_up():
pass
def run_repo(self) -> None: def run_repo(self) -> None:
pass pass