28 Commits

Author SHA1 Message Date
45a267c8c7 Merge pull request 'main' (#1) from jgrewe/minipyrelacs:main into main
Reviewed-on: Awendt/pyrelacs#1
2024-09-27 07:29:03 +00:00
291ed8859c Merge branch 'main' of https://whale.am28.uni-tuebingen.de/git/Awendt/pyrelacs 2024-09-27 09:17:34 +02:00
aa792bcb1d [docs] add attenuator specification 2024-09-27 09:14:44 +02:00
6a73f38fba [app] store window settings and restore upon startup 2024-09-27 09:13:29 +02:00
6008cc03d6 [project] add info.py that reads the project toml file to set application information 2024-09-27 09:12:56 +02:00
938d70fbac [project] add some more information 2024-09-27 09:12:15 +02:00
wendtalexander
dd3e0d045d rewriting the project structure 2024-09-27 09:12:11 +02:00
8d616847f5 [setup] add gui script entry point 2024-09-26 16:30:40 +02:00
836d6dc3d9 [app] restructure to have a valid entry point 2024-09-26 16:30:14 +02:00
wendtalexander
43e0d4b75a adding calbi for different db 2024-09-26 14:29:47 +02:00
wendtalexander
66ea22fb4a checking amplitude, checking beat 2024-09-26 11:25:13 +02:00
wendtalexander
d3800ddfa2 handle initial connection, diggital trigger 2024-09-26 11:24:03 +02:00
wendtalexander
1dc72d00bb adding scipy 2024-09-26 11:23:08 +02:00
wendtalexander
26f43151a2 fixing seg fault by returning data_analog_output 2024-09-25 17:05:21 +02:00
wendtalexander
a9be09dc06 checking if amplitude is the same form input to output 2024-09-25 17:04:53 +02:00
wendtalexander
06f5a6ae46 del file 2024-09-25 17:04:29 +02:00
wendtalexander
b912159b76 Merge branch 'main' of https://whale.am28.uni-tuebingen.de/git/Awendt/pyrelacs 2024-09-25 16:08:58 +02:00
wendtalexander
9cd6aadb3b trying to find changes that lead to the segfault 2024-09-25 16:05:50 +02:00
wendtalexander
1a2185d5e4 trying to fix seg fault 2024-09-25 15:31:15 +02:00
wendtalexander
deb60fa84c updating output 2024-09-25 13:37:40 +02:00
wendtalexander
1579c947c9 changing Repos to mccdac 2024-09-25 07:57:32 +02:00
wendtalexander
d6e2f8c5ba changing Repos to MccDac 2024-09-25 07:57:07 +02:00
wendtalexander
8e73b2ae1f adding to mccdac 2024-09-25 07:56:46 +02:00
wendtalexander
9f7d28ccf8 adding assertions, digital trigger 2024-09-24 17:34:34 +02:00
wendtalexander
3865bb8216 adding try execpt 2024-09-24 17:34:17 +02:00
wendtalexander
2317fd73c8 adding attenuator class 2024-09-24 17:33:59 +02:00
wendtalexander
fbb4d3b81d bug cant read digio line with d_in_scan 2024-09-23 17:21:52 +02:00
wendtalexander
7d02cb994f adding unlimited time 2024-09-23 14:32:49 +02:00
14 changed files with 778 additions and 374 deletions

BIN
docs/0900766b804c9851.pdf Normal file

Binary file not shown.

52
poetry.lock generated
View File

@@ -704,6 +704,56 @@ pygments = ">=2.13.0,<3.0.0"
[package.extras] [package.extras]
jupyter = ["ipywidgets (>=7.5.1,<9)"] jupyter = ["ipywidgets (>=7.5.1,<9)"]
[[package]]
name = "scipy"
version = "1.14.1"
description = "Fundamental algorithms for scientific computing in Python"
optional = false
python-versions = ">=3.10"
files = [
{file = "scipy-1.14.1-cp310-cp310-macosx_10_13_x86_64.whl", hash = "sha256:b28d2ca4add7ac16ae8bb6632a3c86e4b9e4d52d3e34267f6e1b0c1f8d87e389"},
{file = "scipy-1.14.1-cp310-cp310-macosx_12_0_arm64.whl", hash = "sha256:d0d2821003174de06b69e58cef2316a6622b60ee613121199cb2852a873f8cf3"},
{file = "scipy-1.14.1-cp310-cp310-macosx_14_0_arm64.whl", hash = "sha256:8bddf15838ba768bb5f5083c1ea012d64c9a444e16192762bd858f1e126196d0"},
{file = "scipy-1.14.1-cp310-cp310-macosx_14_0_x86_64.whl", hash = "sha256:97c5dddd5932bd2a1a31c927ba5e1463a53b87ca96b5c9bdf5dfd6096e27efc3"},
{file = "scipy-1.14.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:2ff0a7e01e422c15739ecd64432743cf7aae2b03f3084288f399affcefe5222d"},
{file = "scipy-1.14.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8e32dced201274bf96899e6491d9ba3e9a5f6b336708656466ad0522d8528f69"},
{file = "scipy-1.14.1-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:8426251ad1e4ad903a4514712d2fa8fdd5382c978010d1c6f5f37ef286a713ad"},
{file = "scipy-1.14.1-cp310-cp310-win_amd64.whl", hash = "sha256:a49f6ed96f83966f576b33a44257d869756df6cf1ef4934f59dd58b25e0327e5"},
{file = "scipy-1.14.1-cp311-cp311-macosx_10_13_x86_64.whl", hash = "sha256:2da0469a4ef0ecd3693761acbdc20f2fdeafb69e6819cc081308cc978153c675"},
{file = "scipy-1.14.1-cp311-cp311-macosx_12_0_arm64.whl", hash = "sha256:c0ee987efa6737242745f347835da2cc5bb9f1b42996a4d97d5c7ff7928cb6f2"},
{file = "scipy-1.14.1-cp311-cp311-macosx_14_0_arm64.whl", hash = "sha256:3a1b111fac6baec1c1d92f27e76511c9e7218f1695d61b59e05e0fe04dc59617"},
{file = "scipy-1.14.1-cp311-cp311-macosx_14_0_x86_64.whl", hash = "sha256:8475230e55549ab3f207bff11ebfc91c805dc3463ef62eda3ccf593254524ce8"},
{file = "scipy-1.14.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:278266012eb69f4a720827bdd2dc54b2271c97d84255b2faaa8f161a158c3b37"},
{file = "scipy-1.14.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:fef8c87f8abfb884dac04e97824b61299880c43f4ce675dd2cbeadd3c9b466d2"},
{file = "scipy-1.14.1-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:b05d43735bb2f07d689f56f7b474788a13ed8adc484a85aa65c0fd931cf9ccd2"},
{file = "scipy-1.14.1-cp311-cp311-win_amd64.whl", hash = "sha256:716e389b694c4bb564b4fc0c51bc84d381735e0d39d3f26ec1af2556ec6aad94"},
{file = "scipy-1.14.1-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:631f07b3734d34aced009aaf6fedfd0eb3498a97e581c3b1e5f14a04164a456d"},
{file = "scipy-1.14.1-cp312-cp312-macosx_12_0_arm64.whl", hash = "sha256:af29a935803cc707ab2ed7791c44288a682f9c8107bc00f0eccc4f92c08d6e07"},
{file = "scipy-1.14.1-cp312-cp312-macosx_14_0_arm64.whl", hash = "sha256:2843f2d527d9eebec9a43e6b406fb7266f3af25a751aa91d62ff416f54170bc5"},
{file = "scipy-1.14.1-cp312-cp312-macosx_14_0_x86_64.whl", hash = "sha256:eb58ca0abd96911932f688528977858681a59d61a7ce908ffd355957f7025cfc"},
{file = "scipy-1.14.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:30ac8812c1d2aab7131a79ba62933a2a76f582d5dbbc695192453dae67ad6310"},
{file = "scipy-1.14.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8f9ea80f2e65bdaa0b7627fb00cbeb2daf163caa015e59b7516395fe3bd1e066"},
{file = "scipy-1.14.1-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:edaf02b82cd7639db00dbff629995ef185c8df4c3ffa71a5562a595765a06ce1"},
{file = "scipy-1.14.1-cp312-cp312-win_amd64.whl", hash = "sha256:2ff38e22128e6c03ff73b6bb0f85f897d2362f8c052e3b8ad00532198fbdae3f"},
{file = "scipy-1.14.1-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:1729560c906963fc8389f6aac023739ff3983e727b1a4d87696b7bf108316a79"},
{file = "scipy-1.14.1-cp313-cp313-macosx_12_0_arm64.whl", hash = "sha256:4079b90df244709e675cdc8b93bfd8a395d59af40b72e339c2287c91860deb8e"},
{file = "scipy-1.14.1-cp313-cp313-macosx_14_0_arm64.whl", hash = "sha256:e0cf28db0f24a38b2a0ca33a85a54852586e43cf6fd876365c86e0657cfe7d73"},
{file = "scipy-1.14.1-cp313-cp313-macosx_14_0_x86_64.whl", hash = "sha256:0c2f95de3b04e26f5f3ad5bb05e74ba7f68b837133a4492414b3afd79dfe540e"},
{file = "scipy-1.14.1-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b99722ea48b7ea25e8e015e8341ae74624f72e5f21fc2abd45f3a93266de4c5d"},
{file = "scipy-1.14.1-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:5149e3fd2d686e42144a093b206aef01932a0059c2a33ddfa67f5f035bdfe13e"},
{file = "scipy-1.14.1-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:e4f5a7c49323533f9103d4dacf4e4f07078f360743dec7f7596949149efeec06"},
{file = "scipy-1.14.1-cp313-cp313-win_amd64.whl", hash = "sha256:baff393942b550823bfce952bb62270ee17504d02a1801d7fd0719534dfb9c84"},
{file = "scipy-1.14.1.tar.gz", hash = "sha256:5a275584e726026a5699459aa72f828a610821006228e841b94275c4a7c08417"},
]
[package.dependencies]
numpy = ">=1.23.5,<2.3"
[package.extras]
dev = ["cython-lint (>=0.12.2)", "doit (>=0.36.0)", "mypy (==1.10.0)", "pycodestyle", "pydevtool", "rich-click", "ruff (>=0.0.292)", "types-psutil", "typing_extensions"]
doc = ["jupyterlite-pyodide-kernel", "jupyterlite-sphinx (>=0.13.1)", "jupytext", "matplotlib (>=3.5)", "myst-nb", "numpydoc", "pooch", "pydata-sphinx-theme (>=0.15.2)", "sphinx (>=5.0.0,<=7.3.7)", "sphinx-design (>=0.4.0)"]
test = ["Cython", "array-api-strict (>=2.0)", "asv", "gmpy2", "hypothesis (>=6.30)", "meson", "mpmath", "ninja", "pooch", "pytest", "pytest-cov", "pytest-timeout", "pytest-xdist", "scikit-umfpack", "threadpoolctl"]
[[package]] [[package]]
name = "shellingham" name = "shellingham"
version = "1.5.4" version = "1.5.4"
@@ -779,4 +829,4 @@ files = [
[metadata] [metadata]
lock-version = "2.0" lock-version = "2.0"
python-versions = "^3.12" python-versions = "^3.12"
content-hash = "6b680c385942c0a2c0eef934f3fb37fdc3d2e1dc058a7f2d891d4f2f0607d9c6" content-hash = "477748fbc18bde095d13dea548108541c1b584242099398155787361b1dc31fe"

View File

@@ -3,7 +3,20 @@ name = "pyrelacs"
version = "0.1.0" version = "0.1.0"
description = "Relaxed ELectrophysiology Acquisition, Control, and Stimulation in python" description = "Relaxed ELectrophysiology Acquisition, Control, and Stimulation in python"
authors = ["wendtalexander <wendtalexander@protonmail.com>"] authors = ["wendtalexander <wendtalexander@protonmail.com>"]
repository = "https://whale.am28.uni-tuebingen.de/git/awendt/pyrelacs"
readme = "README.md" readme = "README.md"
license = "MIT"
organization = "de.uni-tuebingen.neuroetho"
classifiers = [
"Topic :: Scientific/Engineering",
"Intended Audience :: Science/Research",
"Intended Audience :: End Users/Desktop",
]
copyright = "(c) 2020, Neuroethology lab, Uni Tuebingen"
include = [
{ path = "pyproject.toml" }
]
[tool.poetry.dependencies] [tool.poetry.dependencies]
python = "^3.12" python = "^3.12"
@@ -13,7 +26,11 @@ matplotlib = "^3.9.2"
numpy = "^2.1.1" numpy = "^2.1.1"
pyqt6 = "^6.7.1" pyqt6 = "^6.7.1"
tomli = "^2.0.1" tomli = "^2.0.1"
tomlkit = "^0.13.2"
scipy = "^1.14.1"
[tool.poetry.scripts]
pyrelacs = "pyrelacs.app:main"
[build-system] [build-system]
requires = ["poetry-core"] requires = ["poetry-core"]

View File

@@ -1,9 +1,8 @@
from PyQt6.QtGui import QAction from PyQt6.QtGui import QAction
import sys import sys
import pathlib import pathlib
import ctypes
from PyQt6.QtCore import QProcess, QSize, QThreadPool, Qt from PyQt6.QtCore import QProcess, QSize, QThreadPool, Qt, QSettings
from PyQt6.QtWidgets import ( from PyQt6.QtWidgets import (
QApplication, QApplication,
QGridLayout, QGridLayout,
@@ -13,12 +12,14 @@ from PyQt6.QtWidgets import (
QMainWindow, QMainWindow,
QPlainTextEdit, QPlainTextEdit,
) )
import tomli
import uldaq import uldaq
from IPython import embed from IPython import embed
import numpy as np import numpy as np
from pyrelacs.util.logging import config_logging from pyrelacs.util.logging import config_logging
import pyrelacs.info as info
from pyrelacs.worker import Worker
from pyrelacs.repros.repros import Repro
log = config_logging() log = config_logging()
@@ -30,8 +31,7 @@ class PyRelacs(QMainWindow):
self.setMinimumSize(1000, 1000) self.setMinimumSize(1000, 1000)
self.threadpool = QThreadPool() self.threadpool = QThreadPool()
# for starting a Qprocess self.repros = Repro()
self.p = None
self.daq_connect_button = QPushButton("Connect Daq") self.daq_connect_button = QPushButton("Connect Daq")
self.daq_connect_button.setCheckable(True) self.daq_connect_button.setCheckable(True)
@@ -51,7 +51,7 @@ class PyRelacs(QMainWindow):
self.toolbar = QToolBar("Repros") self.toolbar = QToolBar("Repros")
self.addToolBar(self.toolbar) self.addToolBar(self.toolbar)
self.repro() self.repros_to_toolbar()
self.setFixedSize(QSize(400, 300)) self.setFixedSize(QSize(400, 300))
widget = QWidget() widget = QWidget()
@@ -81,64 +81,49 @@ class PyRelacs(QMainWindow):
except AttributeError: except AttributeError:
log.debug("DAQ was not connected") log.debug("DAQ was not connected")
def repro(self): def repros_to_toolbar(self):
repos_path = pathlib.Path(__file__).parent / "repros" repro_names, file_names = self.repros.names_of_repros()
repos_names = list(repos_path.glob("*.py")) for rep, fn in zip(repro_names, file_names):
# exclude the repos.py file
repos_names = [
f.with_suffix("").name for f in repos_names if not f.name == "repos.py"
]
for rep in repos_names:
individual_repro_button = QAction(rep, self) individual_repro_button = QAction(rep, self)
individual_repro_button.setStatusTip("Button") individual_repro_button.setStatusTip("Button")
individual_repro_button.triggered.connect( individual_repro_button.triggered.connect(
lambda checked, n=rep: self.run_repro(n) lambda checked, n=rep, f=fn: self.run_repro(n, f)
) )
self.toolbar.addAction(individual_repro_button) self.toolbar.addAction(individual_repro_button)
def message(self, s): def run_repro(self, n, fn):
self.text.appendPlainText(s) self.text.appendPlainText(f"started Repro {n}, {fn}")
def run_repro(self, name_of_repo):
if self.p is None:
self.message(f"Executing process {name_of_repo}")
self.p = QProcess()
self.p.setWorkingDirectory(str(pathlib.Path(__file__).parent / "repros/"))
# log.debug(pathlib.Path(__file__).parent / "repos")
self.p.readyReadStandardOutput.connect(self.handle_stdout)
self.p.readyReadStandardError.connect(self.handle_stderr)
self.p.stateChanged.connect(self.handle_state)
self.p.finished.connect(self.process_finished)
self.p.start("python3", [f"{name_of_repo}" + ".py"])
def handle_stderr(self): def main():
if self.p is not None: app = QApplication(sys.argv)
data = self.p.readAllStandardError() app.setApplicationName(info.NAME)
stderr = bytes(data).decode("utf8") app.setApplicationVersion(str(info.VERSION))
self.message(stderr) app.setOrganizationDomain(info.ORGANIZATION)
def handle_stdout(self): # read window settings
if self.p is not None: settings = QSettings(info.ORGANIZATION, info.NAME)
data = self.p.readAllStandardOutput() width = int(settings.value("app/width", 1024))
stdout = bytes(data).decode("utf8") height = int(settings.value("app/height", 768))
self.message(stdout) x = int(settings.value("app/pos_x", 100))
y = int(settings.value("app/pos_y", 100))
def handle_state(self, state): window = PyRelacs()
states = { window.setMinimumWidth(200)
QProcess.ProcessState.NotRunning: "Not running", window.setMinimumHeight(200)
QProcess.ProcessState.Starting: "Starting", window.resize(width, height)
QProcess.ProcessState.Running: "Running", window.move(x, y)
} window.show()
state_name = states[state] app.exec()
self.message(f"State changed: {state_name}")
def process_finished(self): # store window position and size
self.text.appendPlainText("Process finished") pos = window.pos()
self.p = None settings.setValue("app/width", window.width())
settings.setValue("app/height", window.height())
settings.setValue("app/pos_x", pos.x())
settings.setValue("app/pos_y", pos.y())
sys.exit(exit_code)
if __name__ == "__main__": if __name__ == "__main__":
app = QApplication(sys.argv) main()
window = PyRelacs()
window.show()
app.exec()

291
pyrelacs/devices/mccdac.py Normal file
View File

@@ -0,0 +1,291 @@
from ctypes import Array, c_double
import time
from typing import Union
from IPython import embed
import numpy.typing as npt
import uldaq
import numpy as np
from pyrelacs.util.logging import config_logging
log = config_logging()
class MccDac:
def __init__(self) -> None:
devices = uldaq.get_daq_device_inventory(uldaq.InterfaceType.USB)
log.debug(f"Found daq devices {len(devices)}, connecting to the first one")
if len(devices) == 0:
log.error("Did not found daq devices, please connect one")
exit(1)
self.daq_device = uldaq.DaqDevice(devices[0])
try:
self.daq_device.connect()
except uldaq.ul_exception.ULException:
self.disconnect_dac()
self.connect_dac()
self.ai_device = self.daq_device.get_ai_device()
self.ao_device = self.daq_device.get_ao_device()
self.dio_device = self.daq_device.get_dio_device()
log.debug("Connected")
def connect_dac(self):
devices = uldaq.get_daq_device_inventory(uldaq.InterfaceType.USB)
log.debug(f"Found daq devices {len(devices)}, connecting to the first one")
if len(devices) == 0:
log.error("Did not found daq devices, please connect one")
exit(1)
self.daq_device = uldaq.DaqDevice(devices[0])
self.daq_device.connect()
self.ai_device = self.daq_device.get_ai_device()
self.ao_device = self.daq_device.get_ao_device()
self.dio_device = self.daq_device.get_dio_device()
log.debug("Connected")
def read_analog(
self,
channels: list[int],
duration: int,
samplerate: float,
AiInputMode: uldaq.AiInputMode = uldaq.AiInputMode.SINGLE_ENDED,
Range: uldaq.Range = uldaq.Range.BIP10VOLTS,
ScanOption: uldaq.ScanOption = uldaq.ScanOption.DEFAULTIO,
AInScanFlag: uldaq.AInScanFlag = uldaq.AInScanFlag.DEFAULT,
) -> Array[c_double]:
assert len(channels) == 2, log.error("You can only provide two channels [0, 1]")
if channels[0] != channels[1]:
buffer_len_channels = 2
else:
buffer_len_channels = 1
buffer_len = np.shape(np.arange(0, duration, 1 / samplerate))[0]
data_analog_input = uldaq.create_float_buffer(buffer_len_channels, buffer_len)
er = self.ai_device.a_in_scan(
channels[0],
channels[1],
AiInputMode,
Range,
buffer_len,
samplerate,
ScanOption,
AInScanFlag,
data=data_analog_input,
)
return data_analog_input
def write_analog(
self,
data: Union[list, npt.NDArray],
channels: list[int],
samplerate: float,
Range: uldaq.Range = uldaq.Range.BIP10VOLTS,
ScanOption: uldaq.ScanOption = uldaq.ScanOption.DEFAULTIO,
AOutScanFlag: uldaq.AOutScanFlag = uldaq.AOutScanFlag.DEFAULT,
) -> Array[c_double]:
assert len(channels) == 2, log.error("You can only provide two channels [0, 1]")
buffer = c_double * len(data)
data_analog_output = buffer(*data)
log.debug(f"Created C_double data {data_analog_output}")
try:
err = self.ao_device.a_out_scan(
channels[0],
channels[1],
Range,
int(len(data)),
samplerate,
ScanOption,
AOutScanFlag,
data_analog_output,
)
except Exception as e:
print(f"{e}")
self.set_analog_to_zero()
self.disconnect_dac()
return data_analog_output
def set_analog_to_zero(self, channels: list[int] = [0, 1]):
try:
err = self.ao_device.a_out_list(
channels[0],
channels[1],
[
uldaq.Range.BIP10VOLTS,
uldaq.Range.BIP10VOLTS,
],
uldaq.AOutListFlag.DEFAULT,
[0, 0],
)
except Exception as e:
log.error("f{e}")
log.error("disconnection dac")
self.disconnect_dac()
def diggital_trigger(self) -> None:
data = self.read_bit(channel=0)
if data:
self.write_bit(channel=0, bit=0)
time.time_ns()
self.write_bit(channel=0, bit=1)
else:
self.write_bit(channel=0, bit=1)
def write_bit(self, channel: int = 0, bit: int = 1) -> None:
self.dio_device.d_config_bit(
uldaq.DigitalPortType.AUXPORT, channel, uldaq.DigitalDirection.OUTPUT
)
self.dio_device.d_bit_out(
uldaq.DigitalPortType.AUXPORT, bit_number=channel, data=bit
)
def read_bit(self, channel: int = 0):
bit = self.dio_device.d_bit_in(uldaq.DigitalPortType.AUXPORT, channel)
return bit
def read_digitalio(
self,
channels: list[int],
duration,
samplerate,
ScanOptions: uldaq.ScanOption = uldaq.ScanOption.DEFAULTIO,
DInScanFlag: uldaq.DInScanFlag = uldaq.DInScanFlag.DEFAULT,
):
if channels[0] == channels[1]:
channel_len = 1
else:
channel_len = len(channels)
buffer_len = np.shape(np.arange(0, duration, 1 / samplerate))[0]
data_digital_input = uldaq.create_int_buffer(channel_len, buffer_len)
self.dio_device.d_config_port(
uldaq.DigitalPortType.AUXPORT, uldaq.DigitalDirection.INPUT
)
scan_rate = self.dio_device.d_in_scan(
uldaq.DigitalPortType.AUXPORT0,
uldaq.DigitalPortType.AUXPORT0,
len(data_digital_input),
samplerate,
ScanOptions,
DInScanFlag,
data_digital_input,
)
return data_digital_input
def disconnect_dac(self):
self.daq_device.disconnect()
self.daq_device.release()
def check_attenuator(self):
"""
ident : attdev-1
strobepin : 6
datainpin : 5
dataoutpin: -1
cspin : 4
mutepin : 7
zcenpin : -1
"""
SAMPLERATE = 40_000.0
DURATION = 5
AMPLITUDE = 1
SINFREQ = 1
t = np.arange(0, DURATION, 1 / SAMPLERATE)
data = AMPLITUDE * np.sin(2 * np.pi * SINFREQ * t)
# data_channels = np.concatenate((data, data))
db_values = [0, 0, -2, -5, -10, -20, -50]
db_values = [0, -10, -20]
for i, db_value in enumerate(db_values):
log.info(f"Attenuating the Channels, with {db_value}")
if i == 1:
log.info("Muting the Channels")
self.set_attenuation_level(
db_value, db_value, mute_channel1=True, mute_channel2=True
)
else:
self.set_attenuation_level(db_value, db_value)
_ = self.write_analog(
data,
[0, 0],
SAMPLERATE,
ScanOption=uldaq.ScanOption.EXTTRIGGER,
Range=uldaq.Range.BIP10VOLTS,
)
self.diggital_trigger()
try:
self.ao_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, 15)
self.write_bit(channel=0, bit=0)
self.set_analog_to_zero()
except uldaq.ul_exception.ULException:
log.debug("Operation timed out")
self.write_bit(channel=0, bit=0)
self.disconnect_dac()
self.connect_dac()
self.set_analog_to_zero()
finally:
self.write_bit(channel=0, bit=0)
self.disconnect_dac()
self.connect_dac()
self.set_analog_to_zero()
log.info("Sleeping for 1 second, before next attenuation")
time.sleep(1)
def set_attenuation_level(
self,
db_channel1: float = 5.0,
db_channel2: float = 5.0,
mute_channel1: bool = False,
mute_channel2: bool = False,
):
"""
ident : attdev-1
strobepin : 6
datainpin : 5
dataoutpin: -1
cspin : 4
mutepin : 7
zcenpin : -1
"""
self.activate_attenuator()
hardware_possible_db = np.arange(-95.5, 32.0, 0.5)
byte_number = np.arange(1, 256)
byte_number_db1 = byte_number[hardware_possible_db == db_channel1][0]
binary_db1 = np.binary_repr(byte_number_db1, width=8)
byte_number_db2 = byte_number[hardware_possible_db == db_channel2][0]
binary_db2 = np.binary_repr(byte_number_db2, width=8)
if mute_channel1:
log.info("Muting channel one")
binary_db1 = "00000000"
if mute_channel2:
log.info("Muting channel one")
binary_db2 = "00000000"
channels_db = binary_db2 + binary_db1
self.write_bit(channel=4, bit=0)
for b in channels_db:
self.write_bit(channel=5, bit=int(b))
time.time_ns()
self.write_bit(channel=6, bit=1)
time.time_ns()
self.write_bit(channel=6, bit=0)
time.time_ns()
self.write_bit(channel=4, bit=1)
def activate_attenuator(self):
for ch, b in zip([4, 5, 6, 7], [1, 0, 0, 1]):
self.write_bit(channel=ch, bit=b)
def deactivate_attenuator(self):
# mute should be enabled for starting calibration
self.write_bit(channel=7, bit=0)

37
pyrelacs/info.py Normal file
View File

@@ -0,0 +1,37 @@
import tomlkit
import pathlib
def load_project_settings(project_root):
# Read the pyproject.toml file
with open(pathlib.Path.joinpath(project_root, 'pyproject.toml'), 'r') as f:
pyproject_content = f.read()
# Parse the toml content
pyproject = tomlkit.parse(pyproject_content)
# Access project settings
return {
'name': pyproject['tool']['poetry']['name'],
'version': pyproject['tool']['poetry']['version'],
'description': pyproject['tool']['poetry']['description'],
'authors': pyproject['tool']['poetry']['authors'],
'readme': pyproject['tool']['poetry']['authors'],
'licence': pyproject['tool']['poetry']['license'],
'organization': pyproject['tool']['poetry']['organization'],
'classifiers': pyproject['tool']['poetry']['classifiers'],
'copyright': pyproject['tool']['poetry']['copyright'],
"repository": pyproject['tool']['poetry']['repository'],
}
_root = pathlib.Path(__file__).parent.parent
_infodict = load_project_settings(_root)
NAME = _infodict["name"]
VERSION = _infodict["version"]
AUTHORS = _infodict["authors"]
COPYRIGHT = _infodict["copyright"]
HOMEPAGE = _infodict["repository"]
CLASSIFIERS = _infodict["classifiers"]
DESCRIPTION = _infodict["description"]
ORGANIZATION = _infodict["organization"]

View File

View File

@@ -1,42 +1,251 @@
import ctypes import signal
import sys
import faulthandler
import time
import uldaq import uldaq
from IPython import embed from IPython import embed
from pyrelacs.repros.repos import Repos
from pyrelacs.util.logging import config_logging
import numpy as np import numpy as np
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
from scipy.signal import welch, csd
from scipy.signal import find_peaks
from pyrelacs.devices.mccdac import MccDac
from pyrelacs.util.logging import config_logging
log = config_logging() log = config_logging()
faulthandler.enable()
class Calibration(Repos): class Calibration(MccDac):
def __init__(self) -> None: def __init__(self) -> None:
super().__init__() super().__init__()
self.SAMPLERATE = 40_000.0
self.DURATION = 5
self.AMPLITUDE = 1
self.SINFREQ = 750
def run_calibration(self): def run(self):
# Stimulus
time = np.arange(0, DURATION, 1 / SAMPLERATE) def segfault_handler(self, signum, frame):
data = AMPLITUDE * np.sin(2 * np.pi * SINFREQ * time) print(f"Segmentation fault caught! Signal number: {signum}")
# sending stimulus self.disconnect_dac()
stim, ao_device = self.send_analog_dac( sys.exit(1) # Gracefully exit the program
data, [0, 0], SAMPLERATE, ScanOption=uldaq.ScanOption.EXTTRIGGER
) def check_amplitude(self):
read_data = self.read_analog_daq( db_values = [0.0, -5.0, -10.0, -20.0, -50.0]
[0, 1], DURATION, SAMPLERATE, ScanOption=uldaq.ScanOption.EXTTRIGGER colors = ["red", "green", "blue", "black", "yellow"]
) self.set_attenuation_level(db_channel1=0.0, db_channel2=0.0)
self.digital_trigger() # write to ananlog 1
ao_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, 11) t = np.arange(0, self.DURATION, 1 / self.SAMPLERATE)
self.digital_trigger(data=0) data = self.AMPLITUDE * np.sin(2 * np.pi * self.SINFREQ * t)
fig, ax = plt.subplots()
for i, db_value in enumerate(db_values):
self.set_attenuation_level(db_channel1=db_value, db_channel2=db_value)
log.debug(f"{db_value}")
stim = self.write_analog(
data,
[0, 0],
self.SAMPLERATE,
ScanOption=uldaq.ScanOption.EXTTRIGGER,
)
data_channel_one = self.read_analog(
[0, 0], self.DURATION, self.SAMPLERATE, ScanOption=uldaq.ScanOption.EXTTRIGGER
)
time.sleep(1)
log.debug("Starting the Scan")
self.diggital_trigger()
try:
self.ao_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, 15)
log.debug("Scan finished")
self.write_bit(channel=0, bit=0)
time.sleep(1)
self.set_analog_to_zero()
except uldaq.ul_exception.ULException:
log.debug("Operation timed out")
# reset the diggital trigger
self.write_bit(channel=0, bit=0)
time.sleep(1)
self.set_analog_to_zero()
self.disconnect_dac()
if i == 0:
ax.plot(t, stim, label=f"Input_{db_value}", color=colors[i])
ax.plot(t, data_channel_one, label=f"Reaout {db_value}", color=colors[i])
ax.legend()
plt.show()
self.disconnect_dac()
def check_beat(self):
self.set_attenuation_level(db_channel1=-10.0, db_channel2=0.0)
t = np.arange(0, self.DURATION, 1 / self.SAMPLERATE)
data = self.AMPLITUDE * np.sin(2 * np.pi * self.SINFREQ * t)
# data = np.concatenate((data, data))
db_values = [0.0, -5.0, -8.5, -10.0]
colors = ["red", "blue", "black", "green"]
colors_in = ["lightcoral", "lightblue", "grey", "lightgreen"]
fig, axes = plt.subplots(2, 2, sharex="col")
for i, db_value in enumerate(db_values):
self.set_attenuation_level(db_channel1=db_value)
stim = self.write_analog(
data,
[0, 0],
self.SAMPLERATE,
ScanOption=uldaq.ScanOption.EXTTRIGGER,
)
readout = self.read_analog(
[0, 1],
self.DURATION,
self.SAMPLERATE,
ScanOption=uldaq.ScanOption.EXTTRIGGER,
)
self.diggital_trigger()
signal.signal(signal.SIGSEGV, self.segfault_handler)
log.info(self.ao_device)
ai_status = uldaq.ScanStatus.RUNNING
ao_status = uldaq.ScanStatus.RUNNING
log.debug(
f"Status Analog_output {ao_status}\n, Status Analog_input {ai_status}"
)
while (ai_status != uldaq.ScanStatus.IDLE) and (
ao_status != uldaq.ScanStatus.IDLE
):
# log.debug("Scanning")
time.time_ns()
ai_status = self.ai_device.get_scan_status()[0]
ao_status = self.ao_device.get_scan_status()[0]
self.write_bit(channel=0, bit=0)
log.debug(
f"Status Analog_output {ao_status}\n, Status Analog_input {ai_status}"
)
channel1 = np.array(readout[::2])
channel2 = np.array(readout[1::2])
beat = channel1 + channel2
beat_square = beat**2
f, powerspec = welch(beat, fs=self.SAMPLERATE)
powerspec = decibel(powerspec)
f_sq, powerspec_sq = welch(beat_square, fs=self.SAMPLERATE)
powerspec_sq = decibel(powerspec_sq)
peaks = find_peaks(powerspec_sq, prominence=20)[0]
f_stim, powerspec_stim = welch(channel1, fs=self.SAMPLERATE)
powerspec_stim = decibel(powerspec_stim)
f_in, powerspec_in = welch(channel2, fs=self.SAMPLERATE)
powerspec_in = decibel(powerspec_in)
axes[0, 0].plot(
t,
channel1,
label=f"{db_value} Readout Channel0",
color=colors[i],
)
axes[0, 0].plot(
t,
channel2,
label=f"{db_value} Readout Channel1",
color=colors_in[i],
)
axes[0, 1].plot(
f_stim,
powerspec_stim,
label=f"{db_value} powerspec Channel0",
color=colors[i],
)
axes[0, 1].plot(
f_in,
powerspec_in,
label=f"{db_value} powerspec Channel2",
color=colors_in[i],
)
axes[0, 1].set_xlabel("Freq [HZ]")
axes[0, 1].set_ylabel("dB")
axes[1, 0].plot(
t,
beat,
label="Beat",
color=colors[i],
)
axes[1, 0].plot(
t,
beat**2,
label="Beat squared",
color=colors_in[i],
)
axes[1, 0].legend()
axes[1, 1].plot(
f,
powerspec,
color=colors[i],
)
axes[1, 1].plot(
f_sq,
powerspec_sq,
color=colors_in[i],
label=f"dB {db_value}, first peak {np.min(f_sq[peaks])}",
)
axes[1, 1].scatter(
f_sq[peaks],
powerspec_sq[peaks],
color="maroon",
)
axes[1, 1].set_xlabel("Freq [HZ]")
axes[1, 1].set_ylabel("dB")
axes[0, 0].legend()
axes[1, 1].legend()
plt.show()
self.set_analog_to_zero()
self.disconnect_dac() self.disconnect_dac()
embed()
exit()
if __name__ == "__main__": def decibel(power, ref_power=1.0, min_power=1e-20):
SAMPLERATE = 40_000.0 """Transform power to decibel relative to ref_power.
DURATION = 5
AMPLITUDE = 3 \\[ decibel = 10 \\cdot \\log_{10}(power/ref\\_power) \\]
SINFREQ = 1 Power values smaller than `min_power` are set to `-np.inf`.
daq_input = Calibration()
daq_input.run_calibration() Parameters
----------
power: float or array
Power values, for example from a power spectrum or spectrogram.
ref_power: float or None or 'peak'
Reference power for computing decibel.
If set to `None` or 'peak', the maximum power is used.
min_power: float
Power values smaller than `min_power` are set to `-np.inf`.
Returns
-------
decibel_psd: array
Power values in decibel relative to `ref_power`.
"""
if np.isscalar(power):
tmp_power = np.array([power])
decibel_psd = np.array([power])
else:
tmp_power = power
decibel_psd = power.copy()
if ref_power is None or ref_power == "peak":
ref_power = np.max(decibel_psd)
decibel_psd[tmp_power <= min_power] = float("-inf")
decibel_psd[tmp_power > min_power] = 10.0 * np.log10(
decibel_psd[tmp_power > min_power] / ref_power
)
if np.isscalar(power):
return decibel_psd[0]
else:
return decibel_psd

View File

@@ -1,86 +0,0 @@
import ctypes
import uldaq
from IPython import embed
import matplotlib.pyplot as plt
import numpy as np
from pyrelacs.util.logging import config_logging
log = config_logging()
class ReadWrite:
def __init__(self) -> None:
devices = uldaq.get_daq_device_inventory(uldaq.InterfaceType.USB)
log.debug(f"Found daq devices {len(devices)}, connecting to the first one")
self.daq_device = uldaq.DaqDevice(devices[0])
self.daq_device.connect()
log.debug("Connected")
# self.daq_device.enable_event(
# uldaq.DaqEventType.ON_DATA_AVAILABLE,
# 1,
# self.read_write,
# (uldaq.DaqEventType.ON_DATA_AVAILABLE, 1, 1),
# )
def read_write(self) -> None:
# event_type = callback_args.event_type
# event_data = callback_args.event_data
# user_data = callback_args.user_data
FS = 30_000.0
DURATION = 10
FREQUENCY = 100
time = np.arange(0, DURATION, 1 / FS)
data = 2 * np.sin(2 * np.pi * FREQUENCY * time)
buffer = ctypes.c_double * len(time)
data_c = buffer(*data)
buf = uldaq.create_float_buffer(1, len(time))
# Get the Ananlog In device and Analog Info
ai_device = self.daq_device.get_ai_device()
ai_info = ai_device.get_info()
# Get the Analog Out device
ao_device = self.daq_device.get_ao_device()
ao_info = ao_device.get_info()
er_ao = ao_device.a_out_scan(
0,
0,
uldaq.Range.BIP10VOLTS,
int(len(data)),
30_000.0,
uldaq.ScanOption.DEFAULTIO,
uldaq.AOutScanFlag.DEFAULT,
data_c,
)
er_ai = ai_device.a_in_scan(
1,
1,
uldaq.AiInputMode.SINGLE_ENDED,
uldaq.Range.BIP10VOLTS,
len(time),
FS,
uldaq.ScanOption.DEFAULTIO,
uldaq.AInScanFlag.DEFAULT,
data=buf,
)
ai_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, timeout=-1)
log.debug("Scanning")
self.daq_device.disconnect()
self.daq_device.release()
plt.plot(buf)
plt.plot(data_c)
plt.show()
if __name__ == "__main__":
daq_input = ReadWrite()
daq_input.read_write()

View File

@@ -1,28 +0,0 @@
import uldaq
import matplotlib.pyplot as plt
from pyrelacs.util.logging import config_logging
from .repos import Repos
log = config_logging()
class ReadData(Repos):
def __init__(self) -> None:
super().__init__()
def analog_in(self) -> None:
# Get the Ananlog In device and Analog Info
data = self.read_analog_daq(
[0, 0],
10,
3000.0,
)
plt.plot(data)
plt.show()
self.disconnect_dac()
if __name__ == "__main__":
daq_input = ReadData()
daq_input.analog_in()

View File

@@ -1,37 +0,0 @@
import ctypes
import uldaq
from IPython import embed
from pyrelacs.repros.repos import Repos
from pyrelacs.util.logging import config_logging
import numpy as np
import matplotlib.pyplot as plt
log = config_logging()
class Output_daq(Repos):
def __init__(self) -> None:
super().__init__()
# devices = uldaq.get_daq_device_inventory(uldaq.InterfaceType.USB)
# self.daq_device = uldaq.DaqDevice(devices[0])
# self.daq_device.connect()
def write_daq(self):
log.debug("running repro")
time = np.arange(0, 10, 1 / 30_000.0)
data = 2 * np.sin(2 * np.pi * 1 * time)
self.send_analog_dac(
data, [0, 0], 30_000, ScanOption=uldaq.ScanOption.EXTTRIGGER
)
def trigger(self):
self.digital_trigger(1)
self.daq_device.disconnect()
self.daq_device.release()
if __name__ == "__main__":
daq_input = Output_daq()
daq_input.write_daq()
# daq_input.trigger()

View File

@@ -1,142 +0,0 @@
from ctypes import Array, c_double
from typing import Union
from IPython import embed
import numpy.typing as npt
import uldaq
import numpy as np
from pyrelacs.util.logging import config_logging
log = config_logging()
class Repos:
def __init__(self) -> None:
devices = uldaq.get_daq_device_inventory(uldaq.InterfaceType.USB)
log.debug(f"Found daq devices {len(devices)}, connecting to the first one")
if len(devices) == 0:
log.error("Did not found daq devices, please connect one")
exit(1)
self.daq_device = uldaq.DaqDevice(devices[0])
self.daq_device.connect()
log.debug("Connected")
def read_analog_daq(
self,
channels: list[int],
duration: int,
samplerate: float,
AiInputMode: uldaq.AiInputMode = uldaq.AiInputMode.SINGLE_ENDED,
Range: uldaq.Range = uldaq.Range.BIP10VOLTS,
ScanOption: uldaq.ScanOption = uldaq.ScanOption.DEFAULTIO,
AInScanFlag: uldaq.AInScanFlag = uldaq.AInScanFlag.DEFAULT,
) -> Array[c_double]:
if channels[0] == channels[1]:
channel_len = 1
else:
channel_len = len(channels)
assert len(channels) == 2, log.error("Please provide a list with two ints")
ai_device = self.daq_device.get_ai_device()
buffer_len = np.shape(np.arange(0, duration, 1 / samplerate))[0]
data_analog_input = uldaq.create_float_buffer(channel_len, buffer_len)
er = ai_device.a_in_scan(
channels[0],
channels[1],
AiInputMode,
Range,
buffer_len,
samplerate,
ScanOption,
AInScanFlag,
data=data_analog_input,
)
# ai_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, timeout=-1)
return data_analog_input
def send_analog_dac(
self,
data: Union[list, npt.NDArray],
channels: list[int],
samplerate: float,
Range: uldaq.Range = uldaq.Range.BIP10VOLTS,
ScanOption: uldaq.ScanOption = uldaq.ScanOption.DEFAULTIO,
AOutScanFlag: uldaq.AOutScanFlag = uldaq.AOutScanFlag.DEFAULT,
):
"""
Parameters
----------
data : Union[list, npt.NDArray]
channels : list[int]
duration : int
samplerate : float
AiInputMode : uldaq.AiInputMode
Range : uldaq.Range
ScanOption : uldaq.ScanOption
AInScanFlag : uldaq.AOutScanFlag
Returns
-------
Array[c_double]
ao_device
"""
buffer = c_double * len(data)
data_analog_output = buffer(*data)
log.debug(f"Created C_double data {data_analog_output}")
ao_device = self.daq_device.get_ao_device()
ao_info = ao_device.get_info()
err = ao_device.a_out_scan(
channels[0],
channels[1],
Range,
int(len(data)),
samplerate,
ScanOption,
AOutScanFlag,
data_analog_output,
)
log.info(f"The actual scan rate was {err}")
# ao_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, 11)
return data_analog_output, ao_device
def digital_trigger(self, portn: int = 0, data: int = 1) -> None:
log.info(f"{self.daq_device}")
dio_device = self.daq_device.get_dio_device()
dio_device.d_config_bit(
uldaq.DigitalPortType.AUXPORT, portn, uldaq.DigitalDirection.OUTPUT
)
dio_device.d_bit_out(uldaq.DigitalPortType.AUXPORT, bit_number=portn, data=data)
def disconnect_dac(self):
self.daq_device.disconnect()
self.daq_device.release()
def clean_up():
pass
def run_repo(self) -> None:
pass
def stop_repo(self) -> None:
pass
def reload_repo(self) -> None:
pass

33
pyrelacs/repros/repros.py Normal file
View File

@@ -0,0 +1,33 @@
import ast
import pathlib
from IPython import embed
class Repro:
def __init__(self) -> None:
pass
def run_repro(self, name: str, *args, **kwargs) -> None:
pass
def names_of_repros(self):
file_path_cur = pathlib.Path(__file__).parent
python_files = list(file_path_cur.glob("**/*.py"))
exclude_files = ["repros.py", "__init__.py"]
python_files = [f for f in python_files if f.name not in exclude_files]
repro_names = []
file_names = []
for python_file in python_files:
with open(python_file, "r") as file:
file_content = file.read()
tree = ast.parse(file_content)
class_name = [
node.name
for node in ast.walk(tree)
if isinstance(node, ast.ClassDef)
]
repro_names.extend(class_name)
file_names.append(python_file)
file.close()
return repro_names, file_names

75
pyrelacs/worker.py Normal file
View File

@@ -0,0 +1,75 @@
import sys
import traceback
from PyQt6.QtCore import QRunnable, pyqtSlot, QObject, pyqtSignal
class WorkerSignals(QObject):
"""
Defines the signals available from a running worker thread.
Supported signals are:
finished
No data
error
tuple (exctype, value, traceback.format_exc() )
result
object data returned from processing, anything
progress
int indicating % progress
"""
finished = pyqtSignal()
error = pyqtSignal(tuple)
result = pyqtSignal(object)
progress = pyqtSignal(int)
class Worker(QRunnable):
"""
Worker thread
Inherits from QRunnable to handler worker thread setup, signals and wrap-up.
:param callback: The function callback to run on this worker thread. Supplied args and
kwargs will be passed through to the runner.
:type callback: function
:param args: Arguments to pass to the callback function
:param kwargs: Keywords to pass to the callback function
"""
def __init__(self, fn, *args, **kwargs):
super(Worker, self).__init__()
# Store constructor arguments (re-used for processing)
self.fn = fn
self.args = args
self.kwargs = kwargs
self.signals = WorkerSignals()
# Add the callback to our kwargs
self.kwargs["progress_callback"] = self.signals.progress
@pyqtSlot()
def run(self):
"""
Initialise the runner function with passed args, kwargs.
"""
# Retrieve args/kwargs here; and fire processing using them
try:
result = self.fn(*self.args, **self.kwargs)
except:
traceback.print_exc()
exctype, value = sys.exc_info()[:2]
self.signals.error.emit((exctype, value, traceback.format_exc()))
else:
self.signals.result.emit(result) # Return the result of the processing
finally:
self.signals.finished.emit() # Done