diff --git a/pyrelacs/repros/inandout.py b/pyrelacs/repros/inandout.py new file mode 100644 index 0000000..20d81aa --- /dev/null +++ b/pyrelacs/repros/inandout.py @@ -0,0 +1,86 @@ +import ctypes + +import uldaq +from IPython import embed +import matplotlib.pyplot as plt +import numpy as np + + +from pyrelacs.util.logging import config_logging + +log = config_logging() + + +class ReadWrite: + def __init__(self) -> None: + devices = uldaq.get_daq_device_inventory(uldaq.InterfaceType.USB) + log.debug(f"Found daq devices {len(devices)}, connecting to the first one") + self.daq_device = uldaq.DaqDevice(devices[0]) + self.daq_device.connect() + log.debug("Connected") + # self.daq_device.enable_event( + # uldaq.DaqEventType.ON_DATA_AVAILABLE, + # 1, + # self.read_write, + # (uldaq.DaqEventType.ON_DATA_AVAILABLE, 1, 1), + # ) + + def read_write(self) -> None: + # event_type = callback_args.event_type + # event_data = callback_args.event_data + # user_data = callback_args.user_data + + FS = 30_000.0 + DURATION = 10 + FREQUENCY = 100 + + time = np.arange(0, DURATION, 1 / FS) + data = 2 * np.sin(2 * np.pi * FREQUENCY * time) + + buffer = ctypes.c_double * len(time) + data_c = buffer(*data) + buf = uldaq.create_float_buffer(1, len(time)) + + # Get the Ananlog In device and Analog Info + ai_device = self.daq_device.get_ai_device() + ai_info = ai_device.get_info() + + # Get the Analog Out device + ao_device = self.daq_device.get_ao_device() + ao_info = ao_device.get_info() + + er_ao = ao_device.a_out_scan( + 0, + 0, + uldaq.Range.BIP10VOLTS, + int(len(data)), + 30_000.0, + uldaq.ScanOption.DEFAULTIO, + uldaq.AOutScanFlag.DEFAULT, + data_c, + ) + + er_ai = ai_device.a_in_scan( + 1, + 1, + uldaq.AiInputMode.SINGLE_ENDED, + uldaq.Range.BIP10VOLTS, + len(time), + FS, + uldaq.ScanOption.DEFAULTIO, + uldaq.AInScanFlag.DEFAULT, + data=buf, + ) + ai_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, timeout=-1) + log.debug("Scanning") + + self.daq_device.disconnect() + self.daq_device.release() + plt.plot(buf) + plt.plot(data_c) + plt.show() + + +if __name__ == "__main__": + daq_input = ReadWrite() + daq_input.read_write() diff --git a/pyrelacs/repros/input.py b/pyrelacs/repros/input.py new file mode 100644 index 0000000..51577d2 --- /dev/null +++ b/pyrelacs/repros/input.py @@ -0,0 +1,54 @@ +from typing import Optional + +import uldaq +from IPython import embed +import typer +from typing_extensions import Annotated +import matplotlib.pyplot as plt + +from pyrelacs.util.logging import config_logging + +log = config_logging() + + +class ReadData: + def __init__(self) -> None: + devices = uldaq.get_daq_device_inventory(uldaq.InterfaceType.USB) + log.debug(f"Found daq devices {len(devices)}, connecting to the first one") + self.daq_device = uldaq.DaqDevice(devices[0]) + self.daq_device.connect() + log.debug("Connected") + + def read_analog_in(self, channel: Annotated[Optional[int], typer.Argument()] = 0): + # Get the Ananlog In device and Analog Info + ai_device = self.daq_device.get_ai_device() + ai_info = ai_device.get_info() + log.debug( + f"Analog info,\n Channels available {ai_info.get_num_chans()}, \n Max Samplerate: {ai_info.get_max_scan_rate()}" + ) + buffer_len = 1_000_000 + buf = uldaq.create_float_buffer(1, buffer_len) + + er = ai_device.a_in_scan( + 1, + 1, + uldaq.AiInputMode.SINGLE_ENDED, + uldaq.Range.BIP10VOLTS, + buffer_len, + 500_000, + uldaq.ScanOption.DEFAULTIO, + uldaq.AInScanFlag.DEFAULT, + data=buf, + ) + ai_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, timeout=-1) + log.debug("Scanning") + + self.daq_device.disconnect() + self.daq_device.release() + plt.plot(buf) + plt.show() + + +if __name__ == "__main__": + daq_input = ReadData() + typer.run(daq_input.read_analog_in) diff --git a/pyrelacs/repros/output.py b/pyrelacs/repros/output.py new file mode 100644 index 0000000..aaa4c41 --- /dev/null +++ b/pyrelacs/repros/output.py @@ -0,0 +1,52 @@ +import ctypes + +import uldaq +from IPython import embed +from pyrelacs.util.logging import config_logging +import numpy as np +import matplotlib.pyplot as plt + +log = config_logging() + + +class Output_daq: + def __init__(self) -> None: + devices = uldaq.get_daq_device_inventory(uldaq.InterfaceType.USB) + self.daq_device = uldaq.DaqDevice(devices[0]) + self.daq_device.connect() + + def write_daq(self): + log.debug("running repro") + time = np.arange(0, 10, 1 / 30_000.0) + data = 2 * np.sin(2 * np.pi * 10 * time) + + buffer = ctypes.c_double * len(time) + data_c = buffer(*data) + + log.debug(f"Created C_double data {data_c}") + ao_device = self.daq_device.get_ao_device() + ao_info = ao_device.get_info() + + log.debug( + f"Analog info,\n Channels available {ao_info.get_num_chans()}, \n Max Samplerate: {ao_info.get_max_scan_rate()}" + ) + + err = ao_device.a_out_scan( + 0, + 0, + uldaq.Range.BIP10VOLTS, + int(len(data)), + 30_000.0, + uldaq.ScanOption.DEFAULTIO, + uldaq.AOutScanFlag.DEFAULT, + data_c, + ) + ao_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, 11) + + self.daq_device.disconnect() + self.daq_device.release() + + +if __name__ == "__main__": + daq_input = Output_daq() + daq_input.write_daq() diff --git a/pyrelacs/repros/repos.py b/pyrelacs/repros/repos.py new file mode 100644 index 0000000..3aaecbb --- /dev/null +++ b/pyrelacs/repros/repos.py @@ -0,0 +1,26 @@ +import uldaq +from IPython import embed + +from pyrelacs.util.logging import config_logging + +log = config_logging() + + +class Repos: + def __init__(self) -> None: + pass + + def run_repo(self) -> None: + pass + + def stop_repo(self) -> None: + pass + + def reload_repo(self) -> None: + pass + + def read_daq(self) -> None: + pass + + def write_daq(self) -> None: + pass