Compare commits

...

4 Commits

Author SHA1 Message Date
31bbffc480 removing import 2024-09-23 11:54:27 +02:00
fa3c704497 adding digital trigger 2024-09-23 11:54:16 +02:00
ebf7fe89bf moving to repos.py 2024-09-23 11:54:05 +02:00
2100a0205f adding calibration 2024-09-23 11:53:37 +02:00
4 changed files with 80 additions and 48 deletions

View File

@ -19,7 +19,6 @@ from IPython import embed
import numpy as np import numpy as np
from pyrelacs.util.logging import config_logging from pyrelacs.util.logging import config_logging
from pyrelacs.daq_thread import Worker
log = config_logging() log = config_logging()

View File

@ -0,0 +1,42 @@
import ctypes
import uldaq
from IPython import embed
from pyrelacs.repros.repos import Repos
from pyrelacs.util.logging import config_logging
import numpy as np
import matplotlib.pyplot as plt
log = config_logging()
class Calibration(Repos):
def __init__(self) -> None:
super().__init__()
def run_calibration(self):
# Stimulus
time = np.arange(0, DURATION, 1 / SAMPLERATE)
data = AMPLITUDE * np.sin(2 * np.pi * SINFREQ * time)
# sending stimulus
stim, ao_device = self.send_analog_dac(
data, [0, 0], SAMPLERATE, ScanOption=uldaq.ScanOption.EXTTRIGGER
)
read_data = self.read_analog_daq(
[0, 1], DURATION, SAMPLERATE, ScanOption=uldaq.ScanOption.EXTTRIGGER
)
self.digital_trigger()
ao_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, 11)
self.digital_trigger(data=0)
self.disconnect_dac()
embed()
exit()
if __name__ == "__main__":
SAMPLERATE = 40_000.0
DURATION = 5
AMPLITUDE = 3
SINFREQ = 1
daq_input = Calibration()
daq_input.run_calibration()

View File

@ -2,6 +2,7 @@ import ctypes
import uldaq import uldaq
from IPython import embed from IPython import embed
from pyrelacs.repros.repos import Repos
from pyrelacs.util.logging import config_logging from pyrelacs.util.logging import config_logging
import numpy as np import numpy as np
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
@ -9,40 +10,23 @@ import matplotlib.pyplot as plt
log = config_logging() log = config_logging()
class Output_daq: class Output_daq(Repos):
def __init__(self) -> None: def __init__(self) -> None:
devices = uldaq.get_daq_device_inventory(uldaq.InterfaceType.USB) super().__init__()
self.daq_device = uldaq.DaqDevice(devices[0]) # devices = uldaq.get_daq_device_inventory(uldaq.InterfaceType.USB)
self.daq_device.connect() # self.daq_device = uldaq.DaqDevice(devices[0])
# self.daq_device.connect()
def write_daq(self): def write_daq(self):
log.debug("running repro") log.debug("running repro")
time = np.arange(0, 10, 1 / 30_000.0) time = np.arange(0, 10, 1 / 30_000.0)
data = 2 * np.sin(2 * np.pi * 10 * time) data = 2 * np.sin(2 * np.pi * 1 * time)
self.send_analog_dac(
buffer = ctypes.c_double * len(time) data, [0, 0], 30_000, ScanOption=uldaq.ScanOption.EXTTRIGGER
data_c = buffer(*data)
log.debug(f"Created C_double data {data_c}")
ao_device = self.daq_device.get_ao_device()
ao_info = ao_device.get_info()
log.debug(
f"Analog info,\n Channels available {ao_info.get_num_chans()}, \n Max Samplerate: {ao_info.get_max_scan_rate()}"
)
err = ao_device.a_out_scan(
0,
0,
uldaq.Range.BIP10VOLTS,
int(len(data)),
30_000.0,
uldaq.ScanOption.DEFAULTIO,
uldaq.AOutScanFlag.DEFAULT,
data_c,
) )
ao_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, 11)
def trigger(self):
self.digital_trigger(1)
self.daq_device.disconnect() self.daq_device.disconnect()
self.daq_device.release() self.daq_device.release()
@ -50,3 +34,4 @@ class Output_daq:
if __name__ == "__main__": if __name__ == "__main__":
daq_input = Output_daq() daq_input = Output_daq()
daq_input.write_daq() daq_input.write_daq()
# daq_input.trigger()

View File

@ -1,5 +1,6 @@
from ctypes import Array, c_double from ctypes import Array, c_double
from typing import Union from typing import Union
from IPython import embed
import numpy.typing as npt import numpy.typing as npt
import uldaq import uldaq
import numpy as np import numpy as np
@ -30,16 +31,15 @@ class Repos:
ScanOption: uldaq.ScanOption = uldaq.ScanOption.DEFAULTIO, ScanOption: uldaq.ScanOption = uldaq.ScanOption.DEFAULTIO,
AInScanFlag: uldaq.AInScanFlag = uldaq.AInScanFlag.DEFAULT, AInScanFlag: uldaq.AInScanFlag = uldaq.AInScanFlag.DEFAULT,
) -> Array[c_double]: ) -> Array[c_double]:
if channels[0] == channels[1]:
if len(channels) > 2: channel_len = 1
log.error("Please provide only two channels") else:
# Get the Ananlog In device and Analog Info channel_len = len(channels)
assert len(channels) == 2, log.error("Please provide a list with two ints")
ai_device = self.daq_device.get_ai_device() ai_device = self.daq_device.get_ai_device()
ai_info = ai_device.get_info()
buffer_len = np.shape(np.arange(0, duration, 1 / samplerate))[0] buffer_len = np.shape(np.arange(0, duration, 1 / samplerate))[0]
data_analog_input = uldaq.create_float_buffer(1, buffer_len) data_analog_input = uldaq.create_float_buffer(channel_len, buffer_len)
er = ai_device.a_in_scan( er = ai_device.a_in_scan(
channels[0], channels[0],
@ -52,8 +52,7 @@ class Repos:
AInScanFlag, AInScanFlag,
data=data_analog_input, data=data_analog_input,
) )
ai_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, timeout=-1) # ai_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, timeout=-1)
log.debug("Scanning")
return data_analog_input return data_analog_input
@ -61,13 +60,11 @@ class Repos:
self, self,
data: Union[list, npt.NDArray], data: Union[list, npt.NDArray],
channels: list[int], channels: list[int],
duration: int,
samplerate: float, samplerate: float,
AiInputMode: uldaq.AiInputMode = uldaq.AiInputMode.SINGLE_ENDED,
Range: uldaq.Range = uldaq.Range.BIP10VOLTS, Range: uldaq.Range = uldaq.Range.BIP10VOLTS,
ScanOption: uldaq.ScanOption = uldaq.ScanOption.DEFAULTIO, ScanOption: uldaq.ScanOption = uldaq.ScanOption.DEFAULTIO,
AInScanFlag: uldaq.AOutScanFlag = uldaq.AOutScanFlag.DEFAULT, AOutScanFlag: uldaq.AOutScanFlag = uldaq.AOutScanFlag.DEFAULT,
) -> Array[c_double]: ):
""" """
Parameters Parameters
@ -92,6 +89,7 @@ class Repos:
Returns Returns
------- -------
Array[c_double] Array[c_double]
ao_device
""" """
@ -101,7 +99,6 @@ class Repos:
log.debug(f"Created C_double data {data_analog_output}") log.debug(f"Created C_double data {data_analog_output}")
ao_device = self.daq_device.get_ao_device() ao_device = self.daq_device.get_ao_device()
ao_info = ao_device.get_info() ao_info = ao_device.get_info()
ao_device.set_trigger
err = ao_device.a_out_scan( err = ao_device.a_out_scan(
channels[0], channels[0],
@ -110,22 +107,31 @@ class Repos:
int(len(data)), int(len(data)),
samplerate, samplerate,
ScanOption, ScanOption,
AInScanFlag, AOutScanFlag,
data_analog_output, data_analog_output,
) )
ao_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, 11) log.info(f"The actual scan rate was {err}")
# ao_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, 11)
return data_analog_output return data_analog_output, ao_device
def digital_trigger(self, portn):
def digital_trigger(self, portn: int = 0, data: int = 1) -> None:
log.info(f"{self.daq_device}")
dio_device = self.daq_device.get_dio_device() dio_device = self.daq_device.get_dio_device()
dio_device.d_bit_out(uldaq.DigitalPortType.FIRSTPORTA, bit_number=portn, data=1)
dio_device.d_config_bit(
uldaq.DigitalPortType.AUXPORT, portn, uldaq.DigitalDirection.OUTPUT
)
dio_device.d_bit_out(uldaq.DigitalPortType.AUXPORT, bit_number=portn, data=data)
def disconnect_dac(self): def disconnect_dac(self):
self.daq_device.disconnect() self.daq_device.disconnect()
self.daq_device.release() self.daq_device.release()
def clean_up():
pass
def run_repo(self) -> None: def run_repo(self) -> None:
pass pass