From 2eb545fa886c51aa4acb6ceb14d6eabe65e68d53 Mon Sep 17 00:00:00 2001 From: tillraab Date: Tue, 31 May 2022 11:25:09 +0200 Subject: [PATCH] created new electrode check file --- new_electrode_check.py | 214 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 214 insertions(+) create mode 100644 new_electrode_check.py diff --git a/new_electrode_check.py b/new_electrode_check.py new file mode 100644 index 0000000..6536ea5 --- /dev/null +++ b/new_electrode_check.py @@ -0,0 +1,214 @@ +from __future__ import print_function +import os +import datetime +import subprocess +import numpy as np +import matplotlib.pyplot as plt +import matplotlib.gridspec as gridspec + +from uldaq import (get_daq_device_inventory, DaqDevice, AInScanFlag, ScanStatus, + ScanOption, create_float_buffer, InterfaceType, AiInputMode) + +class Live_plot(): + def __init__(self): + self.base_path = '/media/pi/data1' + self.n_rows = None + self.n_cols = None + self.max_v = None + self.channel_handle = [] + + self.fig = plt.figure(figsize=(20 / 2.54, 12 / 2.54), facecolor='white') + self.axs = [] + plt.show(block=False) + + def create_axis(self): + gs = gridspec.GridSpec(4, 4, left=0.1, bottom=0.05, right=1, top=0, hspace=0, wspace=0) + for x in range(4): + for y in range(4): + # for x in range(self.n_cols): + ax = plt.subplot(gs[y, x]) + + if not y == 3: + ax.tick_params(axis='x', which='both', bottom=False, top=False, labelbottom=False) + + if not x == 0: + ax.tick_params(axis='y', which='both', left=False, right=False, labelleft=False) + + ax.set_ylim(-self.max_v, self.max_v) + + self.axs.append(ax) + + def DAQ_setup(self): + status = ScanStatus.IDLE + + descriptor_index = 0 + self.range_index = 0 + + interface_type = InterfaceType.USB + self.low_channel = 0 + self.high_channel = self.channels - 1 + + self.buffer_sec = 20 + self.samples_per_channel = self.samplerate * self.buffer_sec # * channels = Buffer size + self.buffer_size = self.samples_per_channel * self.channels + print('\nChannels: %.0f' % self.channels) + # rate = 20000 + + self.scan_options = ScanOption.CONTINUOUS + self.flags = AInScanFlag.DEFAULT + + # Get descriptors for all of the available DAQ devices. + devices = get_daq_device_inventory(interface_type) + number_of_devices = len(devices) + if number_of_devices == 0: + raise Exception('Error: No DAQ devices found') + + print('Found', number_of_devices, 'DAQ device(s):') + for i in range(number_of_devices): + print(' ', devices[i].product_name, ' (', devices[i].unique_id, ')', sep='') + + # Create the DAQ device object associated with the specified descriptor index. + self.daq_device = None + self.daq_device = DaqDevice(devices[descriptor_index]) + + # Get the AiDevice object and verify that it is valid. + self.ai_device = None + self.ai_device = self.daq_device.get_ai_device() + if self.ai_device is None: + raise Exception('Error: The DAQ device does not support analog input') + + # Verify that the specified device supports hardware pacing for analog input. + ai_info = self.ai_device.get_info() + if not ai_info.has_pacer(): + raise Exception('\nError: The specified DAQ device does not support hardware paced analog input') + + # Establish a connection to the DAQ device. + descriptor = self.daq_device.get_descriptor() + print('\nConnecting to', descriptor.dev_string, '- please wait...') + self.daq_device.connect() + + # The default input mode is SINGLE_ENDED. + self.input_mode = AiInputMode.SINGLE_ENDED + # If SINGLE_ENDED input mode is not supported, set to DIFFERENTIAL. + if ai_info.get_num_chans_by_mode(AiInputMode.SINGLE_ENDED) <= 0: + self.input_mode = AiInputMode.DIFFERENTIAL + + # Get the number of channels and validate the high channel number. + number_of_channels = ai_info.get_num_chans_by_mode(self.input_mode) + if self.high_channel >= number_of_channels: + self.high_channel = number_of_channels - 1 + self.channel_count = self.high_channel - self.low_channel + 1 + + # Get a list of supported ranges and validate the range index. + self.ranges = ai_info.get_ranges(self.input_mode) + int_ranges = [] + for r in self.ranges: + int_ranges.append(int(r.name.replace('BIP', '').replace('VOLTS', ''))) + + for idx in np.argsort(int_ranges): + if self.max_v * self.gain / 1000 <= int_ranges[idx]: + self.range_index = idx + break + print(self.ranges[self.range_index]) + + def read_cfg(self): + cfg_file = os.path.join(self.base_path, 'fishgrid.cfg') + cfg_f = open(cfg_file, 'r+') + cfg = cfg_f.readlines() + + for line in cfg: + if 'Columns1' in line: + self.n_cols = int(line.split(':')[1].strip()) + elif 'Rows1' in line: + self.n_rows = int(line.split(':')[1].strip()) + elif 'Extra1' in line: + self.n_extra = int(line.split(':')[1].strip()) + elif "AISampleRate" in line: + self.samplerate = int(float(line.split(':')[-1].strip().replace('kHz', '')) * 1000) + elif "AIMaxVolt" in line: + self.max_v = float(line.split(':')[1].strip().replace('mV', '')) + elif 'Gain' in line: + self.gain = int(line.split(':')[1].strip()) + # ToDo: add option to start now !!! + elif 'StartTime' in line: + self.start_clock = np.array(line.strip().replace(' ', '').split(':')[1:], dtype=int) + elif 'EndTime' in line: + self.end_clock = np.array(line.strip().replace(' ', '').split(':')[1:], dtype=int) + elif 'Gain' in line: + self.gain = int(line.split(':')[1].strip()) + self.channels = self.n_rows * self.n_cols + self.n_extra + + def run(self): + init_fig = True + last_idx = 0 + + self.data = create_float_buffer(self.channel_count, self.samples_per_channel) + self.samplerate = self.ai_device.a_in_scan(self.low_channel, self.high_channel, self.input_mode, self.ranges[self.range_index], self.samples_per_channel, + self.samplerate, self.scan_options, self.flags, self.data) + + status, transfer_status = self.ai_device.get_scan_status() + try: + while True: + status, transfer_status = self.ai_device.get_scan_status() + + index = transfer_status.current_index + + if (last_idx > index) and (index != -1): + channel_array = np.arange(self.channels) + channel_data = list(map(lambda x: self.data[x::self.channels][:250], channel_array)) + channel_std = list(map(lambda x: np.std(self.data[x::self.channels][:250]), channel_array)) + power_channel = int(np.argmax(channel_std)) + + if init_fig == True: + yspan = (np.min(channel_data[power_channel]) / self.gain, np.max(channel_data[power_channel]) / self.gain) + ylim = (yspan[0] - np.abs(np.diff(yspan)) * 0.2, yspan[1] + np.abs(np.diff(yspan)) * 0.2) + + for ch in channel_array: + h, = self.axs[ch].plot(np.arange(250)[:len(channel_data[ch])] / self.samplerate, + np.array(channel_data[ch]) / self.gain, color='k') + self.axs[ch].set_ylim(ylim) + self.channel_handle.append(h) + + self.fig.canvas.draw() + + init_fig = False + else: + yspan = [np.min(channel_data[power_channel]) / self.gain, np.max(channel_data[power_channel]) / self.gain] + ylim = [yspan[0] - np.abs(np.diff(yspan)) * 0.2, yspan[1] + np.abs(np.diff(yspan)) * 0.2] + for ch in channel_array: + self.channel_handle[ch].set_data(np.arange(250)[:len(channel_data[ch])] / self.samplerate, + np.array(channel_data[ch]) / self.gain) + self.axs[ch].set_ylim(ylim) + self.fig.canvas.draw() + + if index == -1: + last_idx = len(self.data) + else: + last_idx = index + + except KeyboardInterrupt: + plt.close() + pass + + # f.close() + if self.daq_device: + # Stop the acquisition if it is still running. + if status == ScanStatus.RUNNING: + self.ai_device.scan_stop() + if self.daq_device.is_connected(): + self.daq_device.disconnect() + self.daq_device.release() + +def main(): + now = datetime.datetime.now() + + Plot = Live_plot() + + Plot.read_cfg() + + Plot.DAQ_setup() + + Plot.create_axis() + +if __name__ == '__main__': + main() \ No newline at end of file