diff --git a/pyrelacs/repros/calibration/calibration.py b/pyrelacs/repros/calibration/calibration.py new file mode 100644 index 0000000..c38241c --- /dev/null +++ b/pyrelacs/repros/calibration/calibration.py @@ -0,0 +1,275 @@ +import faulthandler +import time + +import nixio as nix +import uldaq +from IPython import embed +import numpy as np +import matplotlib.pyplot as plt +from scipy.signal import welch, find_peaks +import pyqtgraph as pg + +from pyrelacs.devices.mccdaq import MccDaq +from pyrelacs.util.logging import config_logging + +log = config_logging() +# for more information on seg faults +faulthandler.enable() + + +class Calibration: + def __init__(self, config, mccdaq: MccDaq) -> None: + self.config = config + self.mccdaq = mccdaq + self.SAMPLERATE = 40_000.0 + self.DURATION = 5 + self.AMPLITUDE = 1 + self.SINFREQ = 750 + + @staticmethod + def run(*args, **kwargs): + nix_block = args[0] + figure = args[1] + mccdaq = args[2] + config = args[3] + calb = Calibration(config, mccdaq) + calb.check_beat(nix_block) + calb.plot(figure, nix_block) + + return "finished" + + def check_amplitude(self): + db_values = [0.0, -5.0, -10.0, -20.0, -50.0] + colors = ["red", "green", "blue", "black", "yellow"] + self.mccdaq.set_attenuation_level(db_channel1=0.0, db_channel2=0.0) + # write to ananlog 1 + t = np.arange(0, self.DURATION, 1 / self.SAMPLERATE) + data = self.AMPLITUDE * np.sin(2 * np.pi * self.SINFREQ * t) + fig, ax = plt.subplots() + + for i, db_value in enumerate(db_values): + self.mccdaq.set_attenuation_level( + db_channel1=db_value, db_channel2=db_value + ) + log.debug(f"{db_value}") + + stim = self.mccdaq.write_analog( + data, + [0, 0], + self.SAMPLERATE, + ScanOption=uldaq.ScanOption.EXTTRIGGER, + ) + + data_channel_one = self.mccdaq.read_analog( + [0, 0], + self.DURATION, + self.SAMPLERATE, + ScanOption=uldaq.ScanOption.EXTTRIGGER, + ) + time.sleep(1) + + log.debug("Starting the Scan") + self.mccdaq.digital_trigger() + + try: + self.mccdaq.ao_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, 15) + log.debug("Scan finished") + self.mccdaq.write_bit(channel=0, bit=0) + time.sleep(1) + self.mccdaq.set_analog_to_zero() + except uldaq.ul_exception.ULException: + log.debug("Operation timed out") + # reset the diggital trigger + self.mccdaq.write_bit(channel=0, bit=0) + time.sleep(1) + self.mccdaq.set_analog_to_zero() + # self.mccdaq.disconnect_daq() + + if i == 0: + ax.plot(t, stim, label=f"Input_{db_value}", color=colors[i]) + ax.plot(t, data_channel_one, label=f"Reaout {db_value}", color=colors[i]) + + ax.legend() + plt.show() + + # self.mccdaq.disconnect_daq() + + def check_beat(self, nix_block: nix.Block): + self.mccdaq.set_attenuation_level(db_channel1=-10.0, db_channel2=0.0) + t = np.arange(0, self.DURATION, 1 / self.SAMPLERATE) + data = self.AMPLITUDE * np.sin(2 * np.pi * self.SINFREQ * t) + # data = np.concatenate((data, data)) + db_values = [0.0, -5.0, -8.5, -10.0] + colors = ["red", "blue", "black", "green"] + colors_in = ["lightcoral", "lightblue", "grey", "lightgreen"] + # fig, axes = plt.subplots(2, 2, sharex="col") + for i, db_value in enumerate(db_values): + self.mccdaq.set_attenuation_level(db_channel1=db_value) + stim = self.mccdaq.write_analog( + data, + [0, 0], + self.SAMPLERATE, + ScanOption=uldaq.ScanOption.EXTTRIGGER, + ) + readout = self.mccdaq.read_analog( + [0, 1], + self.DURATION, + self.SAMPLERATE, + ScanOption=uldaq.ScanOption.EXTTRIGGER, + ) + self.mccdaq.digital_trigger() + log.info(self.mccdaq.ao_device) + ai_status = uldaq.ScanStatus.RUNNING + ao_status = uldaq.ScanStatus.RUNNING + + log.debug( + f"Status Analog_output {ao_status}\n, Status Analog_input {ai_status}" + ) + while (ai_status != uldaq.ScanStatus.IDLE) and ( + ao_status != uldaq.ScanStatus.IDLE + ): + # log.debug("Scanning") + time.time_ns() + ai_status = self.mccdaq.ai_device.get_scan_status()[0] + ao_status = self.mccdaq.ao_device.get_scan_status()[0] + + self.mccdaq.write_bit(channel=0, bit=0) + log.debug( + f"Status Analog_output {ao_status}\n, Status Analog_input {ai_status}" + ) + + channel1 = np.array(readout[::2]) + channel2 = np.array(readout[1::2]) + + stim_data = nix_block.create_data_array( + f"stimulus_{db_value}", + "nix.regular_sampled", + shape=data.shape, + data=channel1, + label="Voltage", + unit="V", + ) + stim_data.append_sampled_dimension( + self.SAMPLERATE, + label="time", + unit="s", + ) + fish_data = nix_block.create_data_array( + f"fish_{db_value}", + "Array", + shape=data.shape, + data=channel2, + label="Voltage", + unit="V", + ) + fish_data.append_sampled_dimension( + self.SAMPLERATE, + label="time", + unit="s", + ) + + time.time_ns() + self.mccdaq.set_analog_to_zero() + + def plot(self, figure, block): + self.figure = figure + self.figure.setBackground("w") + self.beat_plot = self.figure.addPlot(row=0, col=0) + self.power_plot = self.figure.addPlot(row=1, col=0) + self.beat_plot.addLegend() + self.power_plot.addLegend() + # self.power_plot.setLogMode(x=False, y=True) + + colors = ["red", "green", "blue", "black", "yellow"] + for i, (stim, fish) in enumerate( + zip(list(block.data_arrays)[::2], list(block.data_arrays)[1::2]) + ): + f_stim, stim_power = welch( + stim[:], + fs=40_000.0, + window="flattop", + nperseg=100_000, + ) + stim_power = decibel(stim_power) + stim_max_power_index = np.argmax(stim_power) + freq_stim = f_stim[stim_max_power_index] + + f_fish, fish_power = welch( + fish[:], + fs=40_000.0, + window="flattop", + nperseg=100_000, + ) + fish_power = decibel(fish_power) + fish_max_power_index = np.argmax(fish_power) + freq_fish = f_fish[fish_max_power_index] + + beat_frequency = np.abs(freq_fish - freq_stim) + + beat = stim[:] + fish[:] + beat_squared = beat**2 + + f, powerspec = welch( + beat_squared, + window="flattop", + fs=40_000.0, + nperseg=100_000, + ) + powerspec = decibel(powerspec) + + padding = 20 + integration_window = powerspec[ + (f > beat_frequency - padding) & (f < beat_frequency + padding) + ] + + peaks = find_peaks(powerspec, prominence=40)[0] + + pen = pg.mkPen(colors[i]) + + self.beat_plot.plot( + np.arange(0, len(beat)) / 40_000.0, + beat, + pen=pen, + name=stim.name, + ) + self.power_plot.plot(f, powerspec, pen=pen, name=stim.name) + self.power_plot.plot(f[peaks], powerspec[peaks], pen=None, symbol="x") + + +def decibel(power, ref_power=1.0, min_power=1e-20): + """Transform power to decibel relative to ref_power. + + \\[ decibel = 10 \\cdot \\log_{10}(power/ref\\_power) \\] + Power values smaller than `min_power` are set to `-np.inf`. + + Parameters + ---------- + power: float or array + Power values, for example from a power spectrum or spectrogram. + ref_power: float or None or 'peak' + Reference power for computing decibel. + If set to `None` or 'peak', the maximum power is used. + min_power: float + Power values smaller than `min_power` are set to `-np.inf`. + + Returns + ------- + decibel_psd: array + Power values in decibel relative to `ref_power`. + """ + if np.isscalar(power): + tmp_power = np.array([power]) + decibel_psd = np.array([power]) + else: + tmp_power = power + decibel_psd = power.copy() + if ref_power is None or ref_power == "peak": + ref_power = np.max(decibel_psd) + decibel_psd[tmp_power <= min_power] = float("-inf") + decibel_psd[tmp_power > min_power] = 10.0 * np.log10( + decibel_psd[tmp_power > min_power] / ref_power + ) + if np.isscalar(power): + return decibel_psd[0] + else: + return decibel_psd diff --git a/pyrelacs/repros/calibration/config.yaml b/pyrelacs/repros/calibration/config.yaml new file mode 100644 index 0000000..e69de29