Compare commits

..

18 Commits

6 changed files with 235 additions and 74 deletions

5
.gitignore vendored
View File

@ -162,4 +162,7 @@ cython_debug/
#.idea/
# ignore created data files
*.nix
*.nix
# ignore reource.py as it is created by pyside6-rcc resources.qrc -o resources.py
resources.py

View File

@ -5,9 +5,13 @@ Relaxed ELectrophysiology Acquisition, Control, and Stimulation in python
Implementing [relacs](https://github.com/relacs/relacs) with MCC USB 1608GX-2AO / 1808X devices ([multifunction-usb-daq-devices](https://digilent.com/shop/mcc-daq/data-acquisition/low-cost-daq/))
# Installation
You have to install the MCC library (follow the installing instructions for [linux](https://github.com/mccdaq/uldaq) or [windows](https://github.com/mccdaq/mcculw)).
You have to install the MCC library (follow the installing instructions for [linux/macOS](https://github.com/mccdaq/uldaq) or [windows](https://github.com/mccdaq/mcculw)).
After successful installing, you can use clone the reposity and install it with
For MacOs if you run into problems with the libusb library if installed with homebrew, there is an issue thread on the uldaq repository.
[https://github.com/mccdaq/uldaq/issues/44](https://github.com/mccdaq/uldaq/issues/44)
After successful installing, you can use clone the repository and install it with
```sh
pip install -e .

View File

@ -1,7 +1,6 @@
import sys
import pathlib
from PyQt6.QtCore import QSettings, Qt
from PyQt6.QtCore import QSettings
from PyQt6.QtWidgets import QApplication
from . import info

View File

@ -12,13 +12,32 @@ log = config_logging()
class MccDac:
"""
Represents the Digital/Analog Converter from Meassuring Computing.
provides methods for writing and reading the Analog / Digital input and output.
Connects to the DAC device.
Attributes
----------
daq_device : uldaq.DaqDevice
DaqDevice for handling connecting, releasing and disconnecting
ai_device : uldaq.AiDevice
The Analog input Device
ao_device :
Analog output Device
dio_device :
Digital Input Output
"""
def __init__(self) -> None:
devices = uldaq.get_daq_device_inventory(uldaq.InterfaceType.USB)
log.debug(f"Found daq devices {len(devices)}, connecting to the first one")
if len(devices) == 0:
try:
self.daq_device = uldaq.DaqDevice(devices[0])
except uldaq.ul_exception.ULException as e:
log.error("Did not found daq devices, please connect one")
exit(1)
self.daq_device = uldaq.DaqDevice(devices[0])
raise e
try:
self.daq_device.connect()
except uldaq.ul_exception.ULException:
@ -30,6 +49,10 @@ class MccDac:
log.debug("Connected")
def connect_dac(self):
"""
Connecting to the DAQ device
"""
devices = uldaq.get_daq_device_inventory(uldaq.InterfaceType.USB)
log.debug(f"Found daq devices {len(devices)}, connecting to the first one")
if len(devices) == 0:
@ -52,6 +75,40 @@ class MccDac:
ScanOption: uldaq.ScanOption = uldaq.ScanOption.DEFAULTIO,
AInScanFlag: uldaq.AInScanFlag = uldaq.AInScanFlag.DEFAULT,
) -> Array[c_double]:
"""
Reading the analog input of the DAC device
Creates a c_double Array for storing the acquired data
Parameters
----------
channels : list[int]
channels to read from, provide only two int's in a list (ex [0, 1] or [0, 4])
for sampling from the range(channel0, channel4)
duration : int
duration of sampling period
samplerate : float
samplerate for the duration of sampling
AiInputMode : uldaq.AiInputMode = uldaq.AiInputMode.SINGLE_ENDED
Contains attributes indicating A/D channel input modes.
Compares to Ground
Range : uldaq.Range = uldaq.Range.BIP10VOLTS
Range of the output
ScanOption : uldaq.ScanOption = uldaq.ScanOption.DEFAULTIO
Specific Flags for acuiring the input
AInScanFlag : uldaq.AInScanFlag = uldaq.AInScanFlag.DEFAULT
Scaling of the data
Returns
-------
Array[c_double]
"""
assert len(channels) == 2, log.error("You can only provide two channels [0, 1]")
if channels[0] != channels[1]:
@ -85,6 +142,37 @@ class MccDac:
ScanOption: uldaq.ScanOption = uldaq.ScanOption.DEFAULTIO,
AOutScanFlag: uldaq.AOutScanFlag = uldaq.AOutScanFlag.DEFAULT,
) -> Array[c_double]:
"""
Writes data to the DAC device.
Creates a c_double Array for writing the data
Parameters
----------
data : Union[list, npt.NDArray]
data which should be written to the DAC
channels : list[int]
channels to read from, provide only two int's in a list (ex [0, 1])
for sampling from the range(channel0, channel1)
DAC USB 1608GX-2AO has only 2 output channels
samplerate : float
samplerate for the duration of sampling
Range : uldaq.Range = uldaq.Range.BIP10VOLTS
Range of the output
ScanOption : uldaq.ScanOption = uldaq.ScanOption.DEFAULTIO
Specific Flags for acuiring the input
AOutScanFlag : uldaq.AOutScanFlag = uldaq.AOutScanFlag.DEFAULT
For Scaling the data
Returns
-------
Array[c_double]
"""
assert len(channels) == 2, log.error("You can only provide two channels [0, 1]")
buffer = c_double * len(data)
@ -109,7 +197,18 @@ class MccDac:
return data_analog_output
def set_analog_to_zero(self, channels: list[int] = [0, 1]):
def set_analog_to_zero(self, channels: list[int] = [0, 1]) -> None:
"""
Sets all analog outputs to zero
Parameters
----------
channels : list[int]
channels to read from, provide only two int's in a list (ex [0, 1])
for sampling from the range(channel0, channel1)
DAC USB 1608GX-2AO has only 2 output channels
"""
try:
err = self.ao_device.a_out_list(
channels[0],
@ -126,16 +225,37 @@ class MccDac:
log.error("disconnection dac")
self.disconnect_dac()
def diggital_trigger(self) -> None:
data = self.read_bit(channel=0)
def digital_trigger(self, ch: int = 0) -> None:
"""
Writes a 1 to a specified digital channel, if the channel is already on 1 switches it to
0 and after Nano second it writes a 1 to the specified digital channel
Parameters
----------
ch : int
Channel to trigger
"""
data = self.read_bit(channel=ch)
if data:
self.write_bit(channel=0, bit=0)
self.write_bit(channel=ch, bit=0)
time.time_ns()
self.write_bit(channel=0, bit=1)
self.write_bit(channel=ch, bit=1)
else:
self.write_bit(channel=0, bit=1)
self.write_bit(channel=ch, bit=1)
def write_bit(self, channel: int = 0, bit: int = 1) -> None:
"""
Writes a 0 / 1 to a specified digitial channel
Parameters
----------
channel : int
Digital channel to write
bit : int
0 / 1 for writing to the digital channel
"""
self.dio_device.d_config_bit(
uldaq.DigitalPortType.AUXPORT, channel, uldaq.DigitalDirection.OUTPUT
)
@ -143,55 +263,36 @@ class MccDac:
uldaq.DigitalPortType.AUXPORT, bit_number=channel, data=bit
)
def read_bit(self, channel: int = 0):
bit = self.dio_device.d_bit_in(uldaq.DigitalPortType.AUXPORT, channel)
return bit
def read_digitalio(
self,
channels: list[int],
duration,
samplerate,
ScanOptions: uldaq.ScanOption = uldaq.ScanOption.DEFAULTIO,
DInScanFlag: uldaq.DInScanFlag = uldaq.DInScanFlag.DEFAULT,
):
if channels[0] == channels[1]:
channel_len = 1
else:
channel_len = len(channels)
def read_bit(self, channel: int = 0) -> int:
"""
Reads a 0 / 1 from the specified digital channel
buffer_len = np.shape(np.arange(0, duration, 1 / samplerate))[0]
data_digital_input = uldaq.create_int_buffer(channel_len, buffer_len)
Parameters
----------
channel : int
Digital channel to read from
self.dio_device.d_config_port(
uldaq.DigitalPortType.AUXPORT, uldaq.DigitalDirection.INPUT
)
scan_rate = self.dio_device.d_in_scan(
uldaq.DigitalPortType.AUXPORT0,
uldaq.DigitalPortType.AUXPORT0,
len(data_digital_input),
samplerate,
ScanOptions,
DInScanFlag,
data_digital_input,
)
return data_digital_input
Returns
-------
bit : int
0 or 1 from the digital channel
"""
bit = self.dio_device.d_bit_in(uldaq.DigitalPortType.AUXPORT, channel)
return bit
def disconnect_dac(self):
self.daq_device.disconnect()
self.daq_device.release()
def check_attenuator(self):
"""
ident : attdev-1
strobepin : 6
datainpin : 5
dataoutpin: -1
cspin : 4
mutepin : 7
zcenpin : -1
def check_attenuator(self) -> None:
"""
For checking the attenuator in the DAC device that was implemented to attenuate the
analog signal to mV.
Writes to Channel 0 of the analog output with different attenuation levels
0, 0, -2, -5, -10, -20, -50 dB and the second 0 has a software mute
"""
SAMPLERATE = 40_000.0
DURATION = 5
AMPLITUDE = 1
@ -201,7 +302,6 @@ class MccDac:
# data_channels = np.concatenate((data, data))
db_values = [0, 0, -2, -5, -10, -20, -50]
db_values = [0, -10, -20]
for i, db_value in enumerate(db_values):
log.info(f"Attenuating the Channels, with {db_value}")
if i == 1:
@ -219,7 +319,7 @@ class MccDac:
ScanOption=uldaq.ScanOption.EXTTRIGGER,
Range=uldaq.Range.BIP10VOLTS,
)
self.diggital_trigger()
self.digital_trigger()
try:
self.ao_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, 15)
@ -248,6 +348,16 @@ class MccDac:
mute_channel2: bool = False,
):
"""
Setting the attenuation level of the chip that is connected to the DAQ
The attenuation level is set by writing to the connected digital output pin 5
where the strobepin 6 is signaling the when the bit was send.
The cspin is set from 1 to 0 for the start and 0 to 1 for signaling the end
of the data write process.
The mute pin should be set to 1 for the device to be working.
More information in the AttCS3310.pdf in the doc
ident : attdev-1
strobepin : 6
datainpin : 5
@ -255,8 +365,23 @@ class MccDac:
cspin : 4
mutepin : 7
zcenpin : -1
"""
Parameters
----------
db_channel1 : float
dB Attenuation level for the first channel
db_channel2 : float
dB Attenuation level for the second channel
mute_channel1 : bool
Software mute for the first channel
mute_channel2 : bool
Software mute for the second channel
"""
self.activate_attenuator()
hardware_possible_db = np.arange(-95.5, 32.0, 0.5)
byte_number = np.arange(1, 256)
@ -283,9 +408,18 @@ class MccDac:
self.write_bit(channel=4, bit=1)
def activate_attenuator(self):
"""
Activation of the attenuator, where the cspin and mute pin is set to 1,
and the datapin and strobpin to 0
"""
for ch, b in zip([4, 5, 6, 7], [1, 0, 0, 1]):
self.write_bit(channel=ch, bit=b)
def deactivate_attenuator(self):
"""
Writes a 0 to the mute pin, which is deactivating the attenuator
"""
# mute should be enabled for starting calibration
self.write_bit(channel=7, bit=0)

View File

@ -1,5 +1,3 @@
import signal
import sys
import faulthandler
import time
@ -8,13 +6,14 @@ import uldaq
from IPython import embed
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import welch, csd
from scipy.signal import welch
from scipy.signal import find_peaks
from pyrelacs.devices.mccdac import MccDac
from pyrelacs.util.logging import config_logging
log = config_logging()
# for more information on seg faults
faulthandler.enable()
@ -60,7 +59,7 @@ class Calibration(MccDac):
time.sleep(1)
log.debug("Starting the Scan")
self.diggital_trigger()
self.digital_trigger()
try:
self.ao_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, 15)
@ -109,7 +108,7 @@ class Calibration(MccDac):
self.SAMPLERATE,
ScanOption=uldaq.ScanOption.EXTTRIGGER,
)
self.diggital_trigger()
self.digital_trigger()
log.info(self.ao_device)
ai_status = uldaq.ScanStatus.RUNNING
ao_status = uldaq.ScanStatus.RUNNING
@ -133,15 +132,20 @@ class Calibration(MccDac):
channel1 = np.array(readout[::2])
channel2 = np.array(readout[1::2])
block.create_data_array(
stim_data = block.create_data_array(
f"stimulus_{db_value}",
"Array",
"nix.regular_sampled",
shape=data.shape,
data=channel1,
label="Voltage",
unit="V",
)
block.create_data_array(
stim_data.append_sampled_dimension(
self.SAMPLERATE,
label="time",
unit="s",
)
fish_data = block.create_data_array(
f"fish_{db_value}",
"Array",
shape=data.shape,
@ -149,6 +153,11 @@ class Calibration(MccDac):
label="Voltage",
unit="V",
)
fish_data.append_sampled_dimension(
self.SAMPLERATE,
label="time",
unit="s",
)
beat = channel1 + channel2
beat_square = beat**2

View File

@ -32,8 +32,8 @@ class PyRelacs(QMainWindow):
super().__init__()
# self.setToolButtonStyle(Qt.ToolButtonStyle.ToolButtonTextBesideIcon) # Ensure icons are displayed with text
self.setWindowTitle("PyRelacs")
self.setMinimumSize(1000, 1000)
self.plot_graph = pg.PlotWidget()
self.beat_plot = pg.PlotWidget()
self.power_plot = pg.PlotWidget()
self.threadpool = QThreadPool()
self.repros = Repro()
@ -50,8 +50,14 @@ class PyRelacs(QMainWindow):
layout = QGridLayout()
layout.addWidget(self.plot_calibration_button, 0, 0)
layout.addWidget(self.daq_disconnect_button, 0, 1)
layout.addWidget(self.text, 3, 0, 1, 2)
layout.addWidget(self.plot_graph, 2, 0, 1, 2)
layout.addWidget(self.beat_plot, 2, 0, 1, 2)
layout.addWidget(self.power_plot, 3, 0, 1, 2)
self.toolbar = QToolBar("Repros")
self.addToolBar(self.toolbar)
self.repros_to_toolbar()
# self.setFixedSize(QSize(400, 300))
widget = QWidget()
widget.setLayout(layout)
self.setCentralWidget(widget)
@ -181,8 +187,9 @@ class PyRelacs(QMainWindow):
return decibel_psd
block = self.nix_file.blocks[0]
for stim, fish in zip(
list(block.data_arrays)[::2], list(block.data_arrays)[1::2]
colors = ["red", "green", "blue", "black", "yellow"]
for i, (stim, fish) in enumerate(
zip(list(block.data_arrays)[::2], list(block.data_arrays)[1::2])
):
beat = stim[:] + fish[:]
beat_squared = beat**2
@ -193,10 +200,15 @@ class PyRelacs(QMainWindow):
f_sq, powerspec_sq = welch(beat_squared, fs=40_000.0)
powerspec_sq = decibel(powerspec_sq)
peaks = find_peaks(powerspec_sq, prominence=20)[0]
pen = pg.mkPen()
self.plot_graph.plot(
np.arange(0, len(beat)) / 40_000.0, beat_squared, pen=pen
pen = pg.mkPen(colors[i])
self.beat_plot.plot(
np.arange(0, len(beat)) / 40_000.0,
beat_squared,
pen=pen,
# name=stim.name,
)
self.power_plot.plot(f_sq, powerspec_sq, pen=pen)
self.power_plot.plot(f[peaks], powerspec_sq[peaks], pen=None, symbol="x")
def connect_dac(self):
devices = uldaq.get_daq_device_inventory(uldaq.InterfaceType.USB)