19 Commits

Author SHA1 Message Date
wendtalexander
31bbffc480 removing import 2024-09-23 11:54:27 +02:00
wendtalexander
fa3c704497 adding digital trigger 2024-09-23 11:54:16 +02:00
wendtalexander
ebf7fe89bf moving to repos.py 2024-09-23 11:54:05 +02:00
wendtalexander
2100a0205f adding calibration 2024-09-23 11:53:37 +02:00
wendtalexander
971c1f4347 adding comments and digital trigger 2024-09-22 19:51:21 +02:00
wendtalexander
a298b48bdd adding file for calibration 2024-09-22 19:51:11 +02:00
wendtalexander
cd414da6de adding sending analaog output to repo function 2024-09-20 17:37:22 +02:00
wendtalexander
0e0262a3af example of reading input 2024-09-20 11:08:32 +02:00
wendtalexander
558e0de315 abstract class, currently handly reading analog input 2024-09-20 11:08:20 +02:00
wendtalexander
4d210831c8 deleting unessassery files 2024-09-20 11:07:39 +02:00
wendtalexander
dc96359f1d adding QProcess for executing scripts in the repro folder 2024-09-19 11:24:58 +02:00
wendtalexander
bb88053aaa moving files to repro folder 2024-09-19 11:24:35 +02:00
wendtalexander
1a642c6783 moving repos in folder adding indiviudal scipts for executing from the gui 2024-09-19 11:24:11 +02:00
wendtalexander
3766ea0ea2 Merge branch 'main' of https://whale.am28.uni-tuebingen.de/git/Awendt/pyrelacs 2024-09-18 15:27:32 +02:00
wendtalexander
e00259b15f removing imports 2024-09-18 15:26:01 +02:00
wendtalexander
e19c147059 adding threading to fix segementation fault 2024-09-18 15:25:18 +02:00
wendtalexander
e975bcd7e1 adding threading to fix segementation fault 2024-09-18 10:25:22 +02:00
wendtalexander
25cd1c0585 adding toml file for repro 2024-09-17 16:07:25 +02:00
wendtalexander
281640e80c adding repro toolbar 2024-09-17 16:07:11 +02:00
11 changed files with 438 additions and 105 deletions

13
poetry.lock generated
View File

@@ -726,6 +726,17 @@ files = [
{file = "six-1.16.0.tar.gz", hash = "sha256:1e61c37477a1626458e36f7b1d82aa5c9b094fa4802892072e49de9c60c4c926"},
]
[[package]]
name = "tomli"
version = "2.0.1"
description = "A lil' TOML parser"
optional = false
python-versions = ">=3.7"
files = [
{file = "tomli-2.0.1-py3-none-any.whl", hash = "sha256:939de3e7a6161af0c887ef91b7d41a53e7c5a1ca976325f429cb46ea9bc30ecc"},
{file = "tomli-2.0.1.tar.gz", hash = "sha256:de526c12914f0c550d15924c62d72abc48d6fe7364aa87328337a31007fe8a4f"},
]
[[package]]
name = "typer"
version = "0.12.5"
@@ -768,4 +779,4 @@ files = [
[metadata]
lock-version = "2.0"
python-versions = "^3.12"
content-hash = "3095576f87f3ef0a0e8fea79a5d076d7bc2b655d0eb6ed28ac5b325ac233497a"
content-hash = "6b680c385942c0a2c0eef934f3fb37fdc3d2e1dc058a7f2d891d4f2f0607d9c6"

View File

@@ -12,6 +12,7 @@ typer = "^0.12.5"
matplotlib = "^3.9.2"
numpy = "^2.1.1"
pyqt6 = "^6.7.1"
tomli = "^2.0.1"
[build-system]

View File

@@ -1,7 +1,22 @@
from PyQt6.QtGui import QAction
import sys
from PyQt6.QtCore import QSize, Qt
from PyQt6.QtWidgets import QApplication, QGridLayout, QPushButton, QWidget, QMainWindow
import pathlib
import ctypes
from PyQt6.QtCore import QProcess, QSize, QThreadPool, Qt
from PyQt6.QtWidgets import (
QApplication,
QGridLayout,
QPushButton,
QToolBar,
QWidget,
QMainWindow,
QPlainTextEdit,
)
import tomli
import uldaq
from IPython import embed
import numpy as np
from pyrelacs.util.logging import config_logging
@@ -12,23 +27,31 @@ class PyRelacs(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("PyRelacs")
self.setMinimumSize(1000, 1000)
self.daq_connect_button = QPushButton("Connect to Daq")
self.threadpool = QThreadPool()
# for starting a Qprocess
self.p = None
self.daq_connect_button = QPushButton("Connect Daq")
self.daq_connect_button.setCheckable(True)
self.daq_connect_button.clicked.connect(self.connect_dac)
self.daq_disconnect_button = QPushButton("Disconnect to Daq")
self.daq_disconnect_button = QPushButton("Disconnect Daq")
self.daq_disconnect_button.setCheckable(True)
self.daq_disconnect_button.clicked.connect(self.disconnect_dac)
self.send_data_button = QPushButton("Send Sinus")
self.send_data_button.setCheckable(True)
self.send_data_button.clicked.connect(self.send_sinus)
self.text = QPlainTextEdit()
self.text.setReadOnly(True)
layout = QGridLayout()
layout.addWidget(self.daq_connect_button, 0, 0)
layout.addWidget(self.daq_disconnect_button, 0, 1)
layout.addWidget(self.send_data_button, 1, 0)
layout.addWidget(self.text, 2, 0, 1, 2)
self.toolbar = QToolBar("Repros")
self.addToolBar(self.toolbar)
self.repro()
self.setFixedSize(QSize(400, 300))
widget = QWidget()
@@ -36,15 +59,15 @@ class PyRelacs(QMainWindow):
self.setCentralWidget(widget)
def connect_dac(self):
devices = uldaq.get_daq_device_inventory(uldaq.InterfaceType.USB)
try:
devices = uldaq.get_daq_device_inventory(uldaq.InterfaceType.USB)
log.debug(f"Found daq devices {len(devices)}, connecting to the first one")
self.daq_device = uldaq.DaqDevice(devices[0])
log.debug(f"Found daq devices {len(devices)}, connecting to the first one")
self.daq_device.connect()
log.debug("Connected")
except uldaq.ULException as e:
log.debug(f"DAQ was not connected\n {e}")
except IndexError:
log.debug("DAQ is not connected, closing")
QApplication.quit()
self.daq_connect_button.setDisabled(True)
def disconnect_dac(self):
@@ -58,8 +81,60 @@ class PyRelacs(QMainWindow):
except AttributeError:
log.debug("DAQ was not connected")
def send_sinus(self):
pass
def repro(self):
repos_path = pathlib.Path(__file__).parent / "repros"
repos_names = list(repos_path.glob("*.py"))
# exclude the repos.py file
repos_names = [
f.with_suffix("").name for f in repos_names if not f.name == "repos.py"
]
for rep in repos_names:
individual_repro_button = QAction(rep, self)
individual_repro_button.setStatusTip("Button")
individual_repro_button.triggered.connect(
lambda checked, n=rep: self.run_repro(n)
)
self.toolbar.addAction(individual_repro_button)
def message(self, s):
self.text.appendPlainText(s)
def run_repro(self, name_of_repo):
if self.p is None:
self.message(f"Executing process {name_of_repo}")
self.p = QProcess()
self.p.setWorkingDirectory(str(pathlib.Path(__file__).parent / "repros/"))
# log.debug(pathlib.Path(__file__).parent / "repos")
self.p.readyReadStandardOutput.connect(self.handle_stdout)
self.p.readyReadStandardError.connect(self.handle_stderr)
self.p.stateChanged.connect(self.handle_state)
self.p.finished.connect(self.process_finished)
self.p.start("python3", [f"{name_of_repo}" + ".py"])
def handle_stderr(self):
if self.p is not None:
data = self.p.readAllStandardError()
stderr = bytes(data).decode("utf8")
self.message(stderr)
def handle_stdout(self):
if self.p is not None:
data = self.p.readAllStandardOutput()
stdout = bytes(data).decode("utf8")
self.message(stdout)
def handle_state(self, state):
states = {
QProcess.ProcessState.NotRunning: "Not running",
QProcess.ProcessState.Starting: "Starting",
QProcess.ProcessState.Running: "Running",
}
state_name = states[state]
self.message(f"State changed: {state_name}")
def process_finished(self):
self.text.appendPlainText("Process finished")
self.p = None
if __name__ == "__main__":

View File

@@ -1,53 +0,0 @@
from typing import Optional
import uldaq
from IPython import embed
import typer
from typing_extensions import Annotated
import matplotlib.pyplot as plt
from pyrelacs.util.logging import config_logging
log = config_logging()
class ReadData:
def __init__(self) -> None:
devices = uldaq.get_daq_device_inventory(uldaq.InterfaceType.USB)
log.debug(f"Found daq devices {len(devices)}, connecting to the first one")
self.daq_device = uldaq.DaqDevice(devices[0])
self.daq_device.connect()
log.debug("Connected")
def read_analog_in(self, channel: Annotated[Optional[int], typer.Argument()] = 0):
# Get the Ananlog In device and Analog Info
ai_device = self.daq_device.get_ai_device()
ai_info = ai_device.get_info()
log.debug(
f"Analog info,\n Channels available {ai_info.get_num_chans()}, \n Max Samplerate: {ai_info.get_max_scan_rate()}"
)
buffer_len = 1_000_000
buf = uldaq.create_float_buffer(1, buffer_len)
er = ai_device.a_in_scan(
1,
1,
uldaq.AiInputMode.SINGLE_ENDED,
uldaq.Range.BIP10VOLTS,
buffer_len,
500_000,
uldaq.ScanOption.DEFAULTIO,
uldaq.AInScanFlag.DEFAULT,
data=buf,
)
ai_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, timeout=-1)
log.debug("Scanning")
embed()
self.daq_device.disconnect()
self.daq_device.release()
if __name__ == "__main__":
daq_input = ReadData()
typer.run(daq_input.read_analog_in)

View File

@@ -1,16 +0,0 @@
import uldaq
from IPython import embed
import typer
class Output_daq():
def __init__(self) -> None:
devices = uldaq.get_daq_device_inventory(uldaq.InterfaceType.USB)
daq_device = uldaq.DaqDevice(devices[0])
daq_device.connect()
if __name__ == '__main__':
daq_input = Input_daq()

View File

@@ -1,20 +0,0 @@
import uldaq
from IPython import embed
from pyrelacs.util.logging import config_logging
log = config_logging()
class Repos:
def __init__(self) -> None:
pass
def run_repo(self) -> None:
pass
def stop_repo(self) -> None:
pass
def reload_repo(self) -> None:
pass

42
pyrelacs/repros/calbi.py Normal file
View File

@@ -0,0 +1,42 @@
import ctypes
import uldaq
from IPython import embed
from pyrelacs.repros.repos import Repos
from pyrelacs.util.logging import config_logging
import numpy as np
import matplotlib.pyplot as plt
log = config_logging()
class Calibration(Repos):
def __init__(self) -> None:
super().__init__()
def run_calibration(self):
# Stimulus
time = np.arange(0, DURATION, 1 / SAMPLERATE)
data = AMPLITUDE * np.sin(2 * np.pi * SINFREQ * time)
# sending stimulus
stim, ao_device = self.send_analog_dac(
data, [0, 0], SAMPLERATE, ScanOption=uldaq.ScanOption.EXTTRIGGER
)
read_data = self.read_analog_daq(
[0, 1], DURATION, SAMPLERATE, ScanOption=uldaq.ScanOption.EXTTRIGGER
)
self.digital_trigger()
ao_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, 11)
self.digital_trigger(data=0)
self.disconnect_dac()
embed()
exit()
if __name__ == "__main__":
SAMPLERATE = 40_000.0
DURATION = 5
AMPLITUDE = 3
SINFREQ = 1
daq_input = Calibration()
daq_input.run_calibration()

View File

@@ -0,0 +1,86 @@
import ctypes
import uldaq
from IPython import embed
import matplotlib.pyplot as plt
import numpy as np
from pyrelacs.util.logging import config_logging
log = config_logging()
class ReadWrite:
def __init__(self) -> None:
devices = uldaq.get_daq_device_inventory(uldaq.InterfaceType.USB)
log.debug(f"Found daq devices {len(devices)}, connecting to the first one")
self.daq_device = uldaq.DaqDevice(devices[0])
self.daq_device.connect()
log.debug("Connected")
# self.daq_device.enable_event(
# uldaq.DaqEventType.ON_DATA_AVAILABLE,
# 1,
# self.read_write,
# (uldaq.DaqEventType.ON_DATA_AVAILABLE, 1, 1),
# )
def read_write(self) -> None:
# event_type = callback_args.event_type
# event_data = callback_args.event_data
# user_data = callback_args.user_data
FS = 30_000.0
DURATION = 10
FREQUENCY = 100
time = np.arange(0, DURATION, 1 / FS)
data = 2 * np.sin(2 * np.pi * FREQUENCY * time)
buffer = ctypes.c_double * len(time)
data_c = buffer(*data)
buf = uldaq.create_float_buffer(1, len(time))
# Get the Ananlog In device and Analog Info
ai_device = self.daq_device.get_ai_device()
ai_info = ai_device.get_info()
# Get the Analog Out device
ao_device = self.daq_device.get_ao_device()
ao_info = ao_device.get_info()
er_ao = ao_device.a_out_scan(
0,
0,
uldaq.Range.BIP10VOLTS,
int(len(data)),
30_000.0,
uldaq.ScanOption.DEFAULTIO,
uldaq.AOutScanFlag.DEFAULT,
data_c,
)
er_ai = ai_device.a_in_scan(
1,
1,
uldaq.AiInputMode.SINGLE_ENDED,
uldaq.Range.BIP10VOLTS,
len(time),
FS,
uldaq.ScanOption.DEFAULTIO,
uldaq.AInScanFlag.DEFAULT,
data=buf,
)
ai_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, timeout=-1)
log.debug("Scanning")
self.daq_device.disconnect()
self.daq_device.release()
plt.plot(buf)
plt.plot(data_c)
plt.show()
if __name__ == "__main__":
daq_input = ReadWrite()
daq_input.read_write()

28
pyrelacs/repros/input.py Normal file
View File

@@ -0,0 +1,28 @@
import uldaq
import matplotlib.pyplot as plt
from pyrelacs.util.logging import config_logging
from .repos import Repos
log = config_logging()
class ReadData(Repos):
def __init__(self) -> None:
super().__init__()
def analog_in(self) -> None:
# Get the Ananlog In device and Analog Info
data = self.read_analog_daq(
[0, 0],
10,
3000.0,
)
plt.plot(data)
plt.show()
self.disconnect_dac()
if __name__ == "__main__":
daq_input = ReadData()
daq_input.analog_in()

37
pyrelacs/repros/output.py Normal file
View File

@@ -0,0 +1,37 @@
import ctypes
import uldaq
from IPython import embed
from pyrelacs.repros.repos import Repos
from pyrelacs.util.logging import config_logging
import numpy as np
import matplotlib.pyplot as plt
log = config_logging()
class Output_daq(Repos):
def __init__(self) -> None:
super().__init__()
# devices = uldaq.get_daq_device_inventory(uldaq.InterfaceType.USB)
# self.daq_device = uldaq.DaqDevice(devices[0])
# self.daq_device.connect()
def write_daq(self):
log.debug("running repro")
time = np.arange(0, 10, 1 / 30_000.0)
data = 2 * np.sin(2 * np.pi * 1 * time)
self.send_analog_dac(
data, [0, 0], 30_000, ScanOption=uldaq.ScanOption.EXTTRIGGER
)
def trigger(self):
self.digital_trigger(1)
self.daq_device.disconnect()
self.daq_device.release()
if __name__ == "__main__":
daq_input = Output_daq()
daq_input.write_daq()
# daq_input.trigger()

142
pyrelacs/repros/repos.py Normal file
View File

@@ -0,0 +1,142 @@
from ctypes import Array, c_double
from typing import Union
from IPython import embed
import numpy.typing as npt
import uldaq
import numpy as np
from pyrelacs.util.logging import config_logging
log = config_logging()
class Repos:
def __init__(self) -> None:
devices = uldaq.get_daq_device_inventory(uldaq.InterfaceType.USB)
log.debug(f"Found daq devices {len(devices)}, connecting to the first one")
if len(devices) == 0:
log.error("Did not found daq devices, please connect one")
exit(1)
self.daq_device = uldaq.DaqDevice(devices[0])
self.daq_device.connect()
log.debug("Connected")
def read_analog_daq(
self,
channels: list[int],
duration: int,
samplerate: float,
AiInputMode: uldaq.AiInputMode = uldaq.AiInputMode.SINGLE_ENDED,
Range: uldaq.Range = uldaq.Range.BIP10VOLTS,
ScanOption: uldaq.ScanOption = uldaq.ScanOption.DEFAULTIO,
AInScanFlag: uldaq.AInScanFlag = uldaq.AInScanFlag.DEFAULT,
) -> Array[c_double]:
if channels[0] == channels[1]:
channel_len = 1
else:
channel_len = len(channels)
assert len(channels) == 2, log.error("Please provide a list with two ints")
ai_device = self.daq_device.get_ai_device()
buffer_len = np.shape(np.arange(0, duration, 1 / samplerate))[0]
data_analog_input = uldaq.create_float_buffer(channel_len, buffer_len)
er = ai_device.a_in_scan(
channels[0],
channels[1],
AiInputMode,
Range,
buffer_len,
samplerate,
ScanOption,
AInScanFlag,
data=data_analog_input,
)
# ai_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, timeout=-1)
return data_analog_input
def send_analog_dac(
self,
data: Union[list, npt.NDArray],
channels: list[int],
samplerate: float,
Range: uldaq.Range = uldaq.Range.BIP10VOLTS,
ScanOption: uldaq.ScanOption = uldaq.ScanOption.DEFAULTIO,
AOutScanFlag: uldaq.AOutScanFlag = uldaq.AOutScanFlag.DEFAULT,
):
"""
Parameters
----------
data : Union[list, npt.NDArray]
channels : list[int]
duration : int
samplerate : float
AiInputMode : uldaq.AiInputMode
Range : uldaq.Range
ScanOption : uldaq.ScanOption
AInScanFlag : uldaq.AOutScanFlag
Returns
-------
Array[c_double]
ao_device
"""
buffer = c_double * len(data)
data_analog_output = buffer(*data)
log.debug(f"Created C_double data {data_analog_output}")
ao_device = self.daq_device.get_ao_device()
ao_info = ao_device.get_info()
err = ao_device.a_out_scan(
channels[0],
channels[1],
Range,
int(len(data)),
samplerate,
ScanOption,
AOutScanFlag,
data_analog_output,
)
log.info(f"The actual scan rate was {err}")
# ao_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, 11)
return data_analog_output, ao_device
def digital_trigger(self, portn: int = 0, data: int = 1) -> None:
log.info(f"{self.daq_device}")
dio_device = self.daq_device.get_dio_device()
dio_device.d_config_bit(
uldaq.DigitalPortType.AUXPORT, portn, uldaq.DigitalDirection.OUTPUT
)
dio_device.d_bit_out(uldaq.DigitalPortType.AUXPORT, bit_number=portn, data=data)
def disconnect_dac(self):
self.daq_device.disconnect()
self.daq_device.release()
def clean_up():
pass
def run_repo(self) -> None:
pass
def stop_repo(self) -> None:
pass
def reload_repo(self) -> None:
pass