forked from awendt/pyrelacs
checking amplitude, checking beat
This commit is contained in:
parent
d3800ddfa2
commit
66ea22fb4a
@ -1,12 +1,14 @@
|
|||||||
import ctypes
|
import signal
|
||||||
|
import sys
|
||||||
import faulthandler
|
import faulthandler
|
||||||
|
|
||||||
import time
|
import time
|
||||||
|
|
||||||
import uldaq
|
import uldaq
|
||||||
from IPython import embed
|
from IPython import embed
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import matplotlib.pyplot as plt
|
import matplotlib.pyplot as plt
|
||||||
|
from scipy.signal import peak_widths, welch, csd
|
||||||
|
from scipy.signal import find_peaks
|
||||||
|
|
||||||
from pyrelacs.repros.mccdac import MccDac
|
from pyrelacs.repros.mccdac import MccDac
|
||||||
from pyrelacs.util.logging import config_logging
|
from pyrelacs.util.logging import config_logging
|
||||||
@ -19,13 +21,67 @@ class Calibration(MccDac):
|
|||||||
def __init__(self) -> None:
|
def __init__(self) -> None:
|
||||||
super().__init__()
|
super().__init__()
|
||||||
|
|
||||||
|
def segfault_handler(self, signum, frame):
|
||||||
|
print(f"Segmentation fault caught! Signal number: {signum}")
|
||||||
|
self.disconnect_dac()
|
||||||
|
sys.exit(1) # Gracefully exit the program
|
||||||
|
|
||||||
def check_amplitude(self):
|
def check_amplitude(self):
|
||||||
|
db_values = [0.0, -5.0, -10.0, -20.0, -50.0]
|
||||||
|
colors = ["red", "green", "blue", "black", "yellow"]
|
||||||
self.set_attenuation_level(db_channel1=0.0, db_channel2=0.0)
|
self.set_attenuation_level(db_channel1=0.0, db_channel2=0.0)
|
||||||
# write to ananlog 1
|
# write to ananlog 1
|
||||||
t = np.arange(0, DURATION, 1 / SAMPLERATE)
|
t = np.arange(0, DURATION, 1 / SAMPLERATE)
|
||||||
data = AMPLITUDE * np.sin(2 * np.pi * SINFREQ * t)
|
data = AMPLITUDE * np.sin(2 * np.pi * SINFREQ * t)
|
||||||
data_channels = np.zeros(2 * len(data))
|
fig, ax = plt.subplots()
|
||||||
# c = [(i,for i,j in zip(data, data)]
|
|
||||||
|
for i, db_value in enumerate(db_values):
|
||||||
|
self.set_attenuation_level(db_channel1=db_value, db_channel2=db_value)
|
||||||
|
log.debug(f"{db_value}")
|
||||||
|
|
||||||
|
stim = self.write_analog(
|
||||||
|
data,
|
||||||
|
[0, 0],
|
||||||
|
SAMPLERATE,
|
||||||
|
ScanOption=uldaq.ScanOption.EXTTRIGGER,
|
||||||
|
)
|
||||||
|
|
||||||
|
data_channel_one = self.read_analog(
|
||||||
|
[0, 0], DURATION, SAMPLERATE, ScanOption=uldaq.ScanOption.EXTTRIGGER
|
||||||
|
)
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
log.debug("Starting the Scan")
|
||||||
|
self.diggital_trigger()
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.ao_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, 15)
|
||||||
|
log.debug("Scan finished")
|
||||||
|
self.write_bit(channel=0, bit=0)
|
||||||
|
time.sleep(1)
|
||||||
|
self.set_analog_to_zero()
|
||||||
|
except uldaq.ul_exception.ULException:
|
||||||
|
log.debug("Operation timed out")
|
||||||
|
# reset the diggital trigger
|
||||||
|
self.write_bit(channel=0, bit=0)
|
||||||
|
time.sleep(1)
|
||||||
|
self.set_analog_to_zero()
|
||||||
|
self.disconnect_dac()
|
||||||
|
|
||||||
|
if i == 0:
|
||||||
|
ax.plot(t, stim, label=f"Input_{db_value}", color=colors[i])
|
||||||
|
ax.plot(t, data_channel_one, label=f"Reaout {db_value}", color=colors[i])
|
||||||
|
|
||||||
|
ax.legend()
|
||||||
|
plt.show()
|
||||||
|
|
||||||
|
self.disconnect_dac()
|
||||||
|
|
||||||
|
def check_beat(self):
|
||||||
|
self.set_attenuation_level(db_channel1=-10.0, db_channel2=0.0)
|
||||||
|
t = np.arange(0, DURATION, 1 / SAMPLERATE)
|
||||||
|
data = AMPLITUDE * np.sin(2 * np.pi * SINFREQ * t)
|
||||||
|
# data = np.concatenate((data, data))
|
||||||
|
|
||||||
stim = self.write_analog(
|
stim = self.write_analog(
|
||||||
data,
|
data,
|
||||||
@ -33,35 +89,119 @@ class Calibration(MccDac):
|
|||||||
SAMPLERATE,
|
SAMPLERATE,
|
||||||
ScanOption=uldaq.ScanOption.EXTTRIGGER,
|
ScanOption=uldaq.ScanOption.EXTTRIGGER,
|
||||||
)
|
)
|
||||||
data_channel_one = self.read_analog(
|
readout = self.read_analog(
|
||||||
[0, 0], DURATION, SAMPLERATE, ScanOption=uldaq.ScanOption.EXTTRIGGER
|
[0, 1],
|
||||||
|
DURATION,
|
||||||
|
SAMPLERATE,
|
||||||
|
ScanOption=uldaq.ScanOption.EXTTRIGGER,
|
||||||
)
|
)
|
||||||
time.sleep(1)
|
|
||||||
|
|
||||||
self.diggital_trigger()
|
self.diggital_trigger()
|
||||||
|
signal.signal(signal.SIGSEGV, self.segfault_handler)
|
||||||
|
log.info(self.ao_device)
|
||||||
|
ai_status = uldaq.ScanStatus.RUNNING
|
||||||
|
ao_status = uldaq.ScanStatus.RUNNING
|
||||||
|
|
||||||
try:
|
log.debug(
|
||||||
self.ai_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, 15)
|
f"Status Analog_output {ao_status}\n, Status Analog_input {ai_status}"
|
||||||
self.write_bit(channel=0, bit=0)
|
)
|
||||||
time.sleep(1)
|
while (ai_status != uldaq.ScanStatus.IDLE) and (
|
||||||
self.set_analog_to_zero()
|
ao_status != uldaq.ScanStatus.IDLE
|
||||||
except uldaq.ul_exception.ULException:
|
):
|
||||||
log.debug("Operation timed out")
|
log.debug("Scanning")
|
||||||
# reset the diggital trigger
|
time.sleep(0.5)
|
||||||
self.write_bit(channel=0, bit=0)
|
ai_status = self.ai_device.get_scan_status()[0]
|
||||||
time.sleep(1)
|
ao_status = self.ao_device.get_scan_status()[0]
|
||||||
self.set_analog_to_zero()
|
|
||||||
self.disconnect_dac()
|
log.debug(
|
||||||
|
f"Status Analog_output {ao_status}\n, Status Analog_input {ai_status}"
|
||||||
|
)
|
||||||
|
fig, axes = plt.subplots(2, 2, sharex="col")
|
||||||
|
channel1 = np.array(readout[::2])
|
||||||
|
channel2 = np.array(readout[1::2])
|
||||||
|
beat = channel1 + channel2
|
||||||
|
beat_square = beat**2
|
||||||
|
|
||||||
|
f, powerspec = welch(beat, fs=SAMPLERATE)
|
||||||
|
powerspec = decibel(powerspec)
|
||||||
|
|
||||||
|
f_sq, powerspec_sq = welch(beat_square, fs=SAMPLERATE)
|
||||||
|
powerspec_sq = decibel(powerspec_sq)
|
||||||
|
peaks = find_peaks(powerspec_sq, prominence=20)[0]
|
||||||
|
|
||||||
|
f_stim, powerspec_stim = welch(channel1, fs=SAMPLERATE)
|
||||||
|
powerspec_stim = decibel(powerspec_stim)
|
||||||
|
|
||||||
|
f_in, powerspec_in = welch(channel2, fs=SAMPLERATE)
|
||||||
|
powerspec_in = decibel(powerspec_in)
|
||||||
|
|
||||||
|
axes[0, 0].plot(t, channel1, label="Readout Channel0")
|
||||||
|
axes[0, 0].plot(t, channel2, label="Readout Channel1")
|
||||||
|
|
||||||
|
axes[0, 1].plot(f_stim, powerspec_stim, label="powerspec Channel0")
|
||||||
|
axes[0, 1].plot(f_in, powerspec_in, label="powerspec Channel2")
|
||||||
|
axes[0, 1].set_xlabel("Freq [HZ]")
|
||||||
|
axes[0, 1].set_ylabel("dB")
|
||||||
|
|
||||||
|
axes[1, 0].plot(t, beat, label="Beat")
|
||||||
|
axes[1, 0].plot(t, beat**2, label="Beat squared")
|
||||||
|
axes[1, 0].legend()
|
||||||
|
|
||||||
|
axes[1, 1].plot(f, powerspec)
|
||||||
|
axes[1, 1].plot(f_sq, powerspec_sq)
|
||||||
|
axes[1, 1].scatter(f_sq[peaks], powerspec_sq[peaks])
|
||||||
|
axes[1, 1].set_xlabel("Freq [HZ]")
|
||||||
|
axes[1, 1].set_ylabel("dB")
|
||||||
|
axes[0, 0].legend()
|
||||||
embed()
|
embed()
|
||||||
exit()
|
exit()
|
||||||
|
|
||||||
|
|
||||||
|
def decibel(power, ref_power=1.0, min_power=1e-20):
|
||||||
|
"""Transform power to decibel relative to ref_power.
|
||||||
|
|
||||||
|
\\[ decibel = 10 \\cdot \\log_{10}(power/ref\\_power) \\]
|
||||||
|
Power values smaller than `min_power` are set to `-np.inf`.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
power: float or array
|
||||||
|
Power values, for example from a power spectrum or spectrogram.
|
||||||
|
ref_power: float or None or 'peak'
|
||||||
|
Reference power for computing decibel.
|
||||||
|
If set to `None` or 'peak', the maximum power is used.
|
||||||
|
min_power: float
|
||||||
|
Power values smaller than `min_power` are set to `-np.inf`.
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
decibel_psd: array
|
||||||
|
Power values in decibel relative to `ref_power`.
|
||||||
|
"""
|
||||||
|
if np.isscalar(power):
|
||||||
|
tmp_power = np.array([power])
|
||||||
|
decibel_psd = np.array([power])
|
||||||
|
else:
|
||||||
|
tmp_power = power
|
||||||
|
decibel_psd = power.copy()
|
||||||
|
if ref_power is None or ref_power == "peak":
|
||||||
|
ref_power = np.max(decibel_psd)
|
||||||
|
decibel_psd[tmp_power <= min_power] = float("-inf")
|
||||||
|
decibel_psd[tmp_power > min_power] = 10.0 * np.log10(
|
||||||
|
decibel_psd[tmp_power > min_power] / ref_power
|
||||||
|
)
|
||||||
|
if np.isscalar(power):
|
||||||
|
return decibel_psd[0]
|
||||||
|
else:
|
||||||
|
return decibel_psd
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
SAMPLERATE = 40_000.0
|
SAMPLERATE = 40_000.0
|
||||||
DURATION = 5
|
DURATION = 5
|
||||||
AMPLITUDE = 0.5
|
AMPLITUDE = 1
|
||||||
SINFREQ = 10
|
SINFREQ = 1000
|
||||||
|
|
||||||
cal = Calibration()
|
cal = Calibration()
|
||||||
# cal.ccheck_attenuator()
|
# cal.check_attenuator()
|
||||||
cal.check_amplitude()
|
# cal.check_amplitude()
|
||||||
|
cal.check_beat()
|
||||||
|
Loading…
Reference in New Issue
Block a user