16 Commits

Author SHA1 Message Date
wendtalexander
31bbffc480 removing import 2024-09-23 11:54:27 +02:00
wendtalexander
fa3c704497 adding digital trigger 2024-09-23 11:54:16 +02:00
wendtalexander
ebf7fe89bf moving to repos.py 2024-09-23 11:54:05 +02:00
wendtalexander
2100a0205f adding calibration 2024-09-23 11:53:37 +02:00
wendtalexander
971c1f4347 adding comments and digital trigger 2024-09-22 19:51:21 +02:00
wendtalexander
a298b48bdd adding file for calibration 2024-09-22 19:51:11 +02:00
wendtalexander
cd414da6de adding sending analaog output to repo function 2024-09-20 17:37:22 +02:00
wendtalexander
0e0262a3af example of reading input 2024-09-20 11:08:32 +02:00
wendtalexander
558e0de315 abstract class, currently handly reading analog input 2024-09-20 11:08:20 +02:00
wendtalexander
4d210831c8 deleting unessassery files 2024-09-20 11:07:39 +02:00
wendtalexander
dc96359f1d adding QProcess for executing scripts in the repro folder 2024-09-19 11:24:58 +02:00
wendtalexander
bb88053aaa moving files to repro folder 2024-09-19 11:24:35 +02:00
wendtalexander
1a642c6783 moving repos in folder adding indiviudal scipts for executing from the gui 2024-09-19 11:24:11 +02:00
wendtalexander
3766ea0ea2 Merge branch 'main' of https://whale.am28.uni-tuebingen.de/git/Awendt/pyrelacs 2024-09-18 15:27:32 +02:00
wendtalexander
e00259b15f removing imports 2024-09-18 15:26:01 +02:00
wendtalexander
e19c147059 adding threading to fix segementation fault 2024-09-18 15:25:18 +02:00
11 changed files with 392 additions and 225 deletions

View File

@@ -3,7 +3,7 @@ import sys
import pathlib
import ctypes
from PyQt6.QtCore import QSize, QThreadPool, Qt
from PyQt6.QtCore import QProcess, QSize, QThreadPool, Qt
from PyQt6.QtWidgets import (
QApplication,
QGridLayout,
@@ -11,6 +11,7 @@ from PyQt6.QtWidgets import (
QToolBar,
QWidget,
QMainWindow,
QPlainTextEdit,
)
import tomli
import uldaq
@@ -18,7 +19,6 @@ from IPython import embed
import numpy as np
from pyrelacs.util.logging import config_logging
from pyrelacs.daq_thread import Worker
log = config_logging()
@@ -27,7 +27,11 @@ class PyRelacs(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("PyRelacs")
self.setMinimumSize(1000, 1000)
self.threadpool = QThreadPool()
# for starting a Qprocess
self.p = None
self.daq_connect_button = QPushButton("Connect Daq")
self.daq_connect_button.setCheckable(True)
@@ -37,17 +41,17 @@ class PyRelacs(QMainWindow):
self.daq_disconnect_button.setCheckable(True)
self.daq_disconnect_button.clicked.connect(self.disconnect_dac)
self.repro_button = QPushButton("Load Repros")
self.repro_button.setCheckable(True)
self.repro_button.clicked.connect(self.repro)
self.text = QPlainTextEdit()
self.text.setReadOnly(True)
layout = QGridLayout()
layout.addWidget(self.daq_connect_button, 0, 0)
layout.addWidget(self.daq_disconnect_button, 0, 1)
layout.addWidget(self.repro_button, 1, 0)
layout.addWidget(self.text, 2, 0, 1, 2)
self.toolbar = QToolBar("Repros")
self.addToolBar(self.toolbar)
self.repro()
self.setFixedSize(QSize(400, 300))
widget = QWidget()
@@ -78,52 +82,59 @@ class PyRelacs(QMainWindow):
log.debug("DAQ was not connected")
def repro(self):
config_path = pathlib.Path(__file__).parent.resolve() / "repro.toml"
if not config_path.is_file:
log.error("repro.toml not found")
QApplication.quit()
repors = []
repos_path = pathlib.Path(__file__).parent / "repros"
repos_names = list(repos_path.glob("*.py"))
# exclude the repos.py file
repos_names = [
f.with_suffix("").name for f in repos_names if not f.name == "repos.py"
]
for rep in repos_names:
individual_repro_button = QAction(rep, self)
individual_repro_button.setStatusTip("Button")
individual_repro_button.triggered.connect(
lambda checked, n=rep: self.run_repro(n)
)
self.toolbar.addAction(individual_repro_button)
with open(config_path, mode="rb") as fp:
self.config = tomli.load(fp)
for r in self.config:
repors.append(r)
def message(self, s):
self.text.appendPlainText(s)
individual_repro_button = QAction(f"{repors[0]}", self)
individual_repro_button.setStatusTip("Button")
individual_repro_button.triggered.connect(self.run_repro)
self.toolbar.addAction(individual_repro_button)
def run_repro(self, name_of_repo):
if self.p is None:
self.message(f"Executing process {name_of_repo}")
self.p = QProcess()
self.p.setWorkingDirectory(str(pathlib.Path(__file__).parent / "repros/"))
# log.debug(pathlib.Path(__file__).parent / "repos")
self.p.readyReadStandardOutput.connect(self.handle_stdout)
self.p.readyReadStandardError.connect(self.handle_stderr)
self.p.stateChanged.connect(self.handle_state)
self.p.finished.connect(self.process_finished)
self.p.start("python3", [f"{name_of_repo}" + ".py"])
def run_repro(self):
daq_thread = Worker(self.read_daq)
self.threadpool.start(daq_thread)
def handle_stderr(self):
if self.p is not None:
data = self.p.readAllStandardError()
stderr = bytes(data).decode("utf8")
self.message(stderr)
def on_scan_finished(self):
log.debug("Scan finished")
def handle_stdout(self):
if self.p is not None:
data = self.p.readAllStandardOutput()
stdout = bytes(data).decode("utf8")
self.message(stdout)
def read_daq(self, progress_callback):
log.debug("running repro")
time = np.arange(0, 10, 1 / 30_000.0)
data = 2 * np.sin(2 * np.pi * 10 * time)
def handle_state(self, state):
states = {
QProcess.ProcessState.NotRunning: "Not running",
QProcess.ProcessState.Starting: "Starting",
QProcess.ProcessState.Running: "Running",
}
state_name = states[state]
self.message(f"State changed: {state_name}")
buffer = ctypes.c_double * len(time)
data_c = buffer(*data)
log.debug(f"Created C_double data {data_c}")
ao_device = self.daq_device.get_ao_device()
err = ao_device.a_out_scan(
0,
0,
uldaq.Range.BIP10VOLTS,
int(len(data)),
30_000.0,
uldaq.ScanOption.DEFAULTIO,
uldaq.AOutScanFlag.DEFAULT,
data_c,
)
while ao_device.get_scan_status()[0] == 1:
log.debug("Still running")
def process_finished(self):
self.text.appendPlainText("Process finished")
self.p = None
if __name__ == "__main__":

View File

@@ -1,84 +0,0 @@
import pathlib
import tomli
from PyQt6.QtCore import QRunnable, QThread, pyqtSignal, pyqtSlot, QObject
import ctypes
import uldaq
import numpy as np
from IPython import embed
import traceback, sys
from pyrelacs.util.logging import config_logging
log = config_logging()
class WorkerSignals(QObject):
"""
Defines the signals available from a running worker thread.
Supported signals are:
finished
No data
error
tuple (exctype, value, traceback.format_exc() )
result
object data returned from processing, anything
progress
int indicating % progress
"""
finished = pyqtSignal()
error = pyqtSignal(tuple)
result = pyqtSignal(object)
progress = pyqtSignal(int)
class Worker(QRunnable):
"""
Worker thread
Inherits from QRunnable to handler worker thread setup, signals and wrap-up.
:param callback: The function callback to run on this worker thread. Supplied args and
kwargs will be passed through to the runner.
:type callback: function
:param args: Arguments to pass to the callback function
:param kwargs: Keywords to pass to the callback function
"""
def __init__(self, fn, *args, **kwargs):
super(Worker, self).__init__()
# Store constructor arguments (re-used for processing)
self.fn = fn
self.args = args
self.kwargs = kwargs
self.signals = WorkerSignals()
# Add the callback to our kwargs
self.kwargs["progress_callback"] = self.signals.progress
@pyqtSlot()
def run(self):
"""
Initialise the runner function with passed args, kwargs.
"""
# Retrieve args/kwargs here; and fire processing using them
try:
result = self.fn(*self.args, **self.kwargs)
except:
traceback.print_exc()
exctype, value = sys.exc_info()[:2]
self.signals.error.emit((exctype, value, traceback.format_exc()))
else:
self.signals.result.emit(result) # Return the result of the processing
finally:
self.signals.finished.emit() # Done

View File

@@ -1,53 +0,0 @@
from typing import Optional
import uldaq
from IPython import embed
import typer
from typing_extensions import Annotated
import matplotlib.pyplot as plt
from pyrelacs.util.logging import config_logging
log = config_logging()
class ReadData:
def __init__(self) -> None:
devices = uldaq.get_daq_device_inventory(uldaq.InterfaceType.USB)
log.debug(f"Found daq devices {len(devices)}, connecting to the first one")
self.daq_device = uldaq.DaqDevice(devices[0])
self.daq_device.connect()
log.debug("Connected")
def read_analog_in(self, channel: Annotated[Optional[int], typer.Argument()] = 0):
# Get the Ananlog In device and Analog Info
ai_device = self.daq_device.get_ai_device()
ai_info = ai_device.get_info()
log.debug(
f"Analog info,\n Channels available {ai_info.get_num_chans()}, \n Max Samplerate: {ai_info.get_max_scan_rate()}"
)
buffer_len = 1_000_000
buf = uldaq.create_float_buffer(1, buffer_len)
er = ai_device.a_in_scan(
1,
1,
uldaq.AiInputMode.SINGLE_ENDED,
uldaq.Range.BIP10VOLTS,
buffer_len,
500_000,
uldaq.ScanOption.DEFAULTIO,
uldaq.AInScanFlag.DEFAULT,
data=buf,
)
ai_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, timeout=-1)
log.debug("Scanning")
embed()
self.daq_device.disconnect()
self.daq_device.release()
if __name__ == "__main__":
daq_input = ReadData()
typer.run(daq_input.read_analog_in)

View File

@@ -1,16 +0,0 @@
import uldaq
from IPython import embed
import typer
class Output_daq():
def __init__(self) -> None:
devices = uldaq.get_daq_device_inventory(uldaq.InterfaceType.USB)
daq_device = uldaq.DaqDevice(devices[0])
daq_device.connect()
if __name__ == '__main__':
daq_input = Input_daq()

View File

@@ -1,20 +0,0 @@
import uldaq
from IPython import embed
from pyrelacs.util.logging import config_logging
log = config_logging()
class Repos:
def __init__(self) -> None:
pass
def run_repo(self) -> None:
pass
def stop_repo(self) -> None:
pass
def reload_repo(self) -> None:
pass

View File

@@ -1,6 +0,0 @@
[Sinus]
duration = 10
fs = 30000.0
amplitude = 1
freq = 10

42
pyrelacs/repros/calbi.py Normal file
View File

@@ -0,0 +1,42 @@
import ctypes
import uldaq
from IPython import embed
from pyrelacs.repros.repos import Repos
from pyrelacs.util.logging import config_logging
import numpy as np
import matplotlib.pyplot as plt
log = config_logging()
class Calibration(Repos):
def __init__(self) -> None:
super().__init__()
def run_calibration(self):
# Stimulus
time = np.arange(0, DURATION, 1 / SAMPLERATE)
data = AMPLITUDE * np.sin(2 * np.pi * SINFREQ * time)
# sending stimulus
stim, ao_device = self.send_analog_dac(
data, [0, 0], SAMPLERATE, ScanOption=uldaq.ScanOption.EXTTRIGGER
)
read_data = self.read_analog_daq(
[0, 1], DURATION, SAMPLERATE, ScanOption=uldaq.ScanOption.EXTTRIGGER
)
self.digital_trigger()
ao_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, 11)
self.digital_trigger(data=0)
self.disconnect_dac()
embed()
exit()
if __name__ == "__main__":
SAMPLERATE = 40_000.0
DURATION = 5
AMPLITUDE = 3
SINFREQ = 1
daq_input = Calibration()
daq_input.run_calibration()

View File

@@ -0,0 +1,86 @@
import ctypes
import uldaq
from IPython import embed
import matplotlib.pyplot as plt
import numpy as np
from pyrelacs.util.logging import config_logging
log = config_logging()
class ReadWrite:
def __init__(self) -> None:
devices = uldaq.get_daq_device_inventory(uldaq.InterfaceType.USB)
log.debug(f"Found daq devices {len(devices)}, connecting to the first one")
self.daq_device = uldaq.DaqDevice(devices[0])
self.daq_device.connect()
log.debug("Connected")
# self.daq_device.enable_event(
# uldaq.DaqEventType.ON_DATA_AVAILABLE,
# 1,
# self.read_write,
# (uldaq.DaqEventType.ON_DATA_AVAILABLE, 1, 1),
# )
def read_write(self) -> None:
# event_type = callback_args.event_type
# event_data = callback_args.event_data
# user_data = callback_args.user_data
FS = 30_000.0
DURATION = 10
FREQUENCY = 100
time = np.arange(0, DURATION, 1 / FS)
data = 2 * np.sin(2 * np.pi * FREQUENCY * time)
buffer = ctypes.c_double * len(time)
data_c = buffer(*data)
buf = uldaq.create_float_buffer(1, len(time))
# Get the Ananlog In device and Analog Info
ai_device = self.daq_device.get_ai_device()
ai_info = ai_device.get_info()
# Get the Analog Out device
ao_device = self.daq_device.get_ao_device()
ao_info = ao_device.get_info()
er_ao = ao_device.a_out_scan(
0,
0,
uldaq.Range.BIP10VOLTS,
int(len(data)),
30_000.0,
uldaq.ScanOption.DEFAULTIO,
uldaq.AOutScanFlag.DEFAULT,
data_c,
)
er_ai = ai_device.a_in_scan(
1,
1,
uldaq.AiInputMode.SINGLE_ENDED,
uldaq.Range.BIP10VOLTS,
len(time),
FS,
uldaq.ScanOption.DEFAULTIO,
uldaq.AInScanFlag.DEFAULT,
data=buf,
)
ai_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, timeout=-1)
log.debug("Scanning")
self.daq_device.disconnect()
self.daq_device.release()
plt.plot(buf)
plt.plot(data_c)
plt.show()
if __name__ == "__main__":
daq_input = ReadWrite()
daq_input.read_write()

28
pyrelacs/repros/input.py Normal file
View File

@@ -0,0 +1,28 @@
import uldaq
import matplotlib.pyplot as plt
from pyrelacs.util.logging import config_logging
from .repos import Repos
log = config_logging()
class ReadData(Repos):
def __init__(self) -> None:
super().__init__()
def analog_in(self) -> None:
# Get the Ananlog In device and Analog Info
data = self.read_analog_daq(
[0, 0],
10,
3000.0,
)
plt.plot(data)
plt.show()
self.disconnect_dac()
if __name__ == "__main__":
daq_input = ReadData()
daq_input.analog_in()

37
pyrelacs/repros/output.py Normal file
View File

@@ -0,0 +1,37 @@
import ctypes
import uldaq
from IPython import embed
from pyrelacs.repros.repos import Repos
from pyrelacs.util.logging import config_logging
import numpy as np
import matplotlib.pyplot as plt
log = config_logging()
class Output_daq(Repos):
def __init__(self) -> None:
super().__init__()
# devices = uldaq.get_daq_device_inventory(uldaq.InterfaceType.USB)
# self.daq_device = uldaq.DaqDevice(devices[0])
# self.daq_device.connect()
def write_daq(self):
log.debug("running repro")
time = np.arange(0, 10, 1 / 30_000.0)
data = 2 * np.sin(2 * np.pi * 1 * time)
self.send_analog_dac(
data, [0, 0], 30_000, ScanOption=uldaq.ScanOption.EXTTRIGGER
)
def trigger(self):
self.digital_trigger(1)
self.daq_device.disconnect()
self.daq_device.release()
if __name__ == "__main__":
daq_input = Output_daq()
daq_input.write_daq()
# daq_input.trigger()

142
pyrelacs/repros/repos.py Normal file
View File

@@ -0,0 +1,142 @@
from ctypes import Array, c_double
from typing import Union
from IPython import embed
import numpy.typing as npt
import uldaq
import numpy as np
from pyrelacs.util.logging import config_logging
log = config_logging()
class Repos:
def __init__(self) -> None:
devices = uldaq.get_daq_device_inventory(uldaq.InterfaceType.USB)
log.debug(f"Found daq devices {len(devices)}, connecting to the first one")
if len(devices) == 0:
log.error("Did not found daq devices, please connect one")
exit(1)
self.daq_device = uldaq.DaqDevice(devices[0])
self.daq_device.connect()
log.debug("Connected")
def read_analog_daq(
self,
channels: list[int],
duration: int,
samplerate: float,
AiInputMode: uldaq.AiInputMode = uldaq.AiInputMode.SINGLE_ENDED,
Range: uldaq.Range = uldaq.Range.BIP10VOLTS,
ScanOption: uldaq.ScanOption = uldaq.ScanOption.DEFAULTIO,
AInScanFlag: uldaq.AInScanFlag = uldaq.AInScanFlag.DEFAULT,
) -> Array[c_double]:
if channels[0] == channels[1]:
channel_len = 1
else:
channel_len = len(channels)
assert len(channels) == 2, log.error("Please provide a list with two ints")
ai_device = self.daq_device.get_ai_device()
buffer_len = np.shape(np.arange(0, duration, 1 / samplerate))[0]
data_analog_input = uldaq.create_float_buffer(channel_len, buffer_len)
er = ai_device.a_in_scan(
channels[0],
channels[1],
AiInputMode,
Range,
buffer_len,
samplerate,
ScanOption,
AInScanFlag,
data=data_analog_input,
)
# ai_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, timeout=-1)
return data_analog_input
def send_analog_dac(
self,
data: Union[list, npt.NDArray],
channels: list[int],
samplerate: float,
Range: uldaq.Range = uldaq.Range.BIP10VOLTS,
ScanOption: uldaq.ScanOption = uldaq.ScanOption.DEFAULTIO,
AOutScanFlag: uldaq.AOutScanFlag = uldaq.AOutScanFlag.DEFAULT,
):
"""
Parameters
----------
data : Union[list, npt.NDArray]
channels : list[int]
duration : int
samplerate : float
AiInputMode : uldaq.AiInputMode
Range : uldaq.Range
ScanOption : uldaq.ScanOption
AInScanFlag : uldaq.AOutScanFlag
Returns
-------
Array[c_double]
ao_device
"""
buffer = c_double * len(data)
data_analog_output = buffer(*data)
log.debug(f"Created C_double data {data_analog_output}")
ao_device = self.daq_device.get_ao_device()
ao_info = ao_device.get_info()
err = ao_device.a_out_scan(
channels[0],
channels[1],
Range,
int(len(data)),
samplerate,
ScanOption,
AOutScanFlag,
data_analog_output,
)
log.info(f"The actual scan rate was {err}")
# ao_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, 11)
return data_analog_output, ao_device
def digital_trigger(self, portn: int = 0, data: int = 1) -> None:
log.info(f"{self.daq_device}")
dio_device = self.daq_device.get_dio_device()
dio_device.d_config_bit(
uldaq.DigitalPortType.AUXPORT, portn, uldaq.DigitalDirection.OUTPUT
)
dio_device.d_bit_out(uldaq.DigitalPortType.AUXPORT, bit_number=portn, data=data)
def disconnect_dac(self):
self.daq_device.disconnect()
self.daq_device.release()
def clean_up():
pass
def run_repo(self) -> None:
pass
def stop_repo(self) -> None:
pass
def reload_repo(self) -> None:
pass