Rasp_grid/grid_recorder.py
2023-02-10 09:50:20 +01:00

469 lines
18 KiB
Python

from __future__ import print_function
import os
import sys
import glob
import datetime
from shutil import copyfile
import subprocess
import numpy as np
import threading
import multiprocessing as mp
from time import sleep, time
from IPython import embed
from uldaq import (get_daq_device_inventory, DaqDevice, AInScanFlag, ScanStatus,
ScanOption, create_float_buffer, InterfaceType, AiInputMode)
try:
import RPi.GPIO as GPIO
except:
pass
class Configuration():
def __init__(self, recording_duration = None):
# recording setup
self.channels = None
self.samplerate = None
self.max_v = None
self.gain = None
self.record_temp = False
# electrodes
self.n_cols = 0
self.n_rows = 0
self.n_extra = 0
# timestamps
self.now = datetime.datetime.now()
self.start_clock = [datetime.datetime.now().hour, datetime.datetime.now().minute]
if not recording_duration == None:
self.end_clock = [datetime.datetime.now().hour + recording_duration[0], datetime.datetime.now().minute + recording_duration[1]]
if self.end_clock[1] >= 60:
self.end_clock[0] += 1
self.end_clock[1] -= 60
if self.end_clock[0] >= 24:
self.end_clock[0] -= 24
else:
self.end_clock = None
# paths
self.path = './'
self.path_format = '%04Y-%02m-%02d-%02H_%02M'
self._base_path = '/media/pi/data1'
# files (paths)
self.file = os.path.join('.', 'traces-grid1.raw')
self.temp_file = os.path.join('.', 'temperatures.csv')
self.led_file = os.path.join('.', 'led_times.csv')
self.led_file2 = os.path.join('.', 'led_idxs.csv')
# init calls
self.read_cfg()
def GPIO_setup(self, LED1_pin, LED2_pin, LED_out_pin, Button1_pin, Button2_pin, power_controll_pin):
GPIO.setmode(GPIO.BOARD)
GPIO.setup(LED1_pin, GPIO.OUT) # 1
GPIO.output(LED1_pin, GPIO.LOW)
GPIO.setup(LED2_pin, GPIO.OUT) # 2
GPIO.output(LED2_pin, GPIO.LOW)
GPIO.setup(LED_out_pin, GPIO.OUT)
GPIO.output(LED_out_pin, GPIO.LOW)
LED_status = [False, False, False]
GPIO.setup(power_controll_pin, GPIO.IN)
GPIO.setup(Button1_pin, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
GPIO.setup(Button2_pin, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
return LED_status
def read_cfg(self):
cfg_file = os.path.join(self._base_path, 'fishgrid.cfg')
cfg_f = open(cfg_file, 'r+')
cfg = cfg_f.readlines()
for line in cfg:
if 'Columns1' in line:
self.n_cols = int(line.split(':')[1].strip())
elif 'Rows1' in line:
self.n_rows = int(line.split(':')[1].strip())
elif 'Extra1' in line:
self.n_extra = int(line.split(':')[1].strip())
elif "AISampleRate" in line:
self.samplerate = int(float(line.split(':')[-1].strip().replace('kHz', '')) * 1000)
elif "AIMaxVolt" in line:
self.max_v = float(line.split(':')[1].strip().replace('mV', ''))
elif 'Gain' in line:
self.gain = int(line.split(':')[1].strip())
# ToDo: add option to start now !!!
elif 'StartTime' in line:
if len(np.array(line.strip().replace(' ', '').split(':')[1:])) > 1:
self.start_clock = np.array(line.strip().replace(' ', '').split(':')[1:], dtype=int)
elif 'EndTime' in line:
if len(np.array(line.strip().replace(' ', '').split(':')[1:])) > 1:
self.end_clock = np.array(line.strip().replace(' ', '').split(':')[1:], dtype=int)
elif 'Gain' in line:
self.gain = int(line.split(':')[1].strip())
self.channels = self.n_rows * self.n_cols + self.n_extra
def create_file_structure(self):
start_str = ('-%2.f_%2.f' % (self.start_clock[0], self.start_clock[1])).replace(' ', '0')
self.path = os.path.join(self._base_path, self.now.strftime('-'.join(self.path_format.split('-')[:3])) + start_str) # ToDo: Edit here
if not os.path.exists(self.path):
os.makedirs(self.path)
copyfile(os.path.join(self._base_path, 'fishgrid.cfg'), os.path.join(self.path, 'fishgrid.cfg'))
cfg_file = os.path.join(self.path, 'fishgrid.cfg')
cfg_f = open(cfg_file, 'r+')
cfg = cfg_f.readlines()
for enu, line in enumerate(cfg):
if "StartDate" in line:
cfg[enu] = (' StartDate : %s\n' % self.now.strftime('-'.join(self.path_format.split('-')[:3])))
cfg_f.close()
cfg_f = open(cfg_file, 'w+')
for line in cfg:
cfg_f.write(line)
cfg_f.close()
self.file = os.path.join(self.path, 'traces-grid1.raw')
self.temp_file = os.path.join(self.path, 'temperatures.csv')
self.led_file = os.path.join(self.path, 'led_times.csv')
self.led_file2 = os.path.join(self.path, 'led_idxs.csv')
class Recorder():
def __init__(self, config):
self.config = config
self.config.create_file_structure()
# GPIO configuration
self.LED1_pin = 11
self.Button1_pin = 16
self.LED2_pin = 13
self.Button2_pin = 18
self.LED_out_pin = 35
self.power_controll_pin = 37
self.last_button_2_t = time()
self.LED_status = self.config.GPIO_setup(LED1_pin = self.LED1_pin, Button1_pin = self.Button1_pin,
LED2_pin = self.LED2_pin, Button2_pin = self.Button2_pin,
LED_out_pin = self.LED_out_pin, power_controll_pin = self.power_controll_pin)
def DAQ_setup(self):
status = ScanStatus.IDLE
descriptor_index = 0
self.range_index = 0
interface_type = InterfaceType.USB
self.low_channel = 0
self.high_channel = self.config.channels - 1
self.buffer_sec = 20
self.samples_per_channel = self.config.samplerate * self.buffer_sec # * channels = Buffer size
self.buffer_size = self.samples_per_channel * self.config.channels
print('\nChannels: %.0f' % self.config.channels)
# rate = 20000
self.scan_options = ScanOption.CONTINUOUS
self.flags = AInScanFlag.DEFAULT
# Get descriptors for all of the available DAQ devices.
devices = get_daq_device_inventory(interface_type)
number_of_devices = len(devices)
if number_of_devices == 0:
raise Exception('Error: No DAQ devices found')
print('Found', number_of_devices, 'DAQ device(s):')
for i in range(number_of_devices):
print(' ', devices[i].product_name, ' (', devices[i].unique_id, ')', sep='')
# Create the DAQ device object associated with the specified descriptor index.
self.daq_device = None
self.daq_device = DaqDevice(devices[descriptor_index])
# Get the AiDevice object and verify that it is valid.
self.ai_device = None
self.ai_device = self.daq_device.get_ai_device()
if self.ai_device is None:
raise Exception('Error: The DAQ device does not support analog input')
# Verify that the specified device supports hardware pacing for analog input.
ai_info = self.ai_device.get_info()
if not ai_info.has_pacer():
raise Exception('\nError: The specified DAQ device does not support hardware paced analog input')
# Establish a connection to the DAQ device.
descriptor = self.daq_device.get_descriptor()
print('\nConnecting to', descriptor.dev_string, '- please wait...')
self.daq_device.connect()
# The default input mode is SINGLE_ENDED.
self.input_mode = AiInputMode.SINGLE_ENDED
# If SINGLE_ENDED input mode is not supported, set to DIFFERENTIAL.
if ai_info.get_num_chans_by_mode(AiInputMode.SINGLE_ENDED) <= 0:
self.input_mode = AiInputMode.DIFFERENTIAL
# Get the number of channels and validate the high channel number.
number_of_channels = ai_info.get_num_chans_by_mode(self.input_mode)
if self.high_channel >= number_of_channels:
self.high_channel = number_of_channels - 1
self.channel_count = self.high_channel - self.low_channel + 1
# Get a list of supported ranges and validate the range index.
self.ranges = ai_info.get_ranges(self.input_mode)
int_ranges = []
for r in self.ranges:
int_ranges.append(int(r.name.replace('BIP', '').replace('VOLTS', '')))
for idx in np.argsort(int_ranges):
if self.config.max_v * self.config.gain / 1000 <= int_ranges[idx]:
self.range_index = idx
break
print(self.ranges[self.range_index])
def open_recording_files(self, file, temp_file, led_file, led_file2):
self.temp_f = None
w1_bus_path = glob.glob('/sys/bus/w1/devices/28*/w1_slave')
if len(w1_bus_path) > 0:
self.w1_bus_path = w1_bus_path[0]
self.record_temp = True
self.temp_f = open(temp_file, 'w')
self.temp_f.write('%-6s; %-7s\n' % ('time/s', 'T/C'))
self.f = open(file, 'wb')
self.led_f = open(led_file, 'w')
self.led_f2 = open(led_file2, 'w')
def run(self):
self.open_recording_files(self.config.file, self.config.temp_file, self.config.led_file, self.config.led_file2)
self.DAQ_setup()
GPIO.output(self.LED1_pin, GPIO.HIGH)
self.LED_status[0] = True
GPIO.output(self.LED2_pin, GPIO.LOW)
self.LED_status[1] = False
sleep(.5)
GPIO.output(self.LED1_pin, GPIO.LOW)
self.LED_status[0] = False
GPIO.output(self.LED2_pin, GPIO.HIGH)
self.LED_status[1] = True
sleep(.5)
GPIO.output(self.LED1_pin, GPIO.HIGH)
self.LED_status[0] = True
GPIO.output(self.LED2_pin, GPIO.LOW)
self.LED_status[1] = False
sleep(.5)
GPIO.output(self.LED1_pin, GPIO.LOW)
self.LED_status[0] = False
GPIO.output(self.LED2_pin, GPIO.HIGH)
self.LED_status[1] = True
sleep(.5)
GPIO.output(self.LED2_pin, GPIO.LOW)
self.LED_status[1] = False
self.record()
def record(self):
while True:
if datetime.datetime.now().hour == self.config.start_clock[0] and datetime.datetime.now().minute == self.config.start_clock[1]:
break
elif datetime.datetime.now().hour * 60 + datetime.datetime.now().minute > \
self.config.start_clock[0] * 60 + self.config.start_clock[1]:
h = datetime.datetime.now().hour
m = datetime.datetime.now().minute
self.config.start_clock = [h, m]
GPIO.output(self.LED2_pin, GPIO.HIGH)
self.LED_status[1] = True
print('\nRecording started.')
data = create_float_buffer(self.channel_count, self.samples_per_channel)
print('----')
print('Buffer size: %.0fn; %.0fsec' % (len(data), self.buffer_sec))
print('Channels: %.0f' % self.channel_count)
print('Samples per channel: %.0f' % self.samples_per_channel)
print('----')
LED_t0 = time()
LED_t_interval = 5
sync_LED_t_interval = 1
temp_t0 = time()
next_temp_t = 0
temp_interval = 300 # sec --> 5 min
disp_eth_power = True
self.config.samplerate = self.ai_device.a_in_scan(self.low_channel, self.high_channel, self.input_mode, self.ranges[self.range_index], self.samples_per_channel,
self.config.samplerate, self.scan_options, self.flags, data)
status, transfer_status = self.ai_device.get_scan_status()
last_idx = 0
buffer_no = 0
self.terminate = False
if True: # close access
subprocess.run(['tvservice', '-o'])
subprocess.run(['vcgencmd', 'display_power', '0']) #
subprocess.run(['sudo', 'ip', 'link', 'set', 'eth0', 'down'])
GPIO.output(self.LED2_pin, GPIO.LOW)
disp_eth_power = False
self.last_button_2_t = time()
while GPIO.input(self.Button1_pin) == GPIO.LOW:
if GPIO.input(self.Button2_pin) == GPIO.HIGH:
if (time() - self.last_button_2_t) > 5:
if disp_eth_power == True:
subprocess.run(['tvservice', '-o'])
subprocess.run(['vcgencmd', 'display_power', '0']) #
subprocess.run(['sudo', 'ip', 'link', 'set', 'eth0', 'down'])
GPIO.output(self.LED2_pin, GPIO.LOW)
disp_eth_power = False
self.last_button_2_t = time()
elif disp_eth_power == False:
subprocess.run(['tvservice', '-p'])
subprocess.run(['vcgencmd', 'display_power', '1'])
subprocess.run(['sudo', '/bin/chvt', '6'])
subprocess.run(['sudo', '/bin/chvt', '7']) #
subprocess.run(['sudo', 'ip', 'link', 'set', 'eth0', 'up'])
GPIO.output(self.LED2_pin, GPIO.HIGH)
disp_eth_power = True
self.last_button_2_t = time()
if self.config.record_temp == True:
if time() - temp_t0 > next_temp_t:
w1_f = open(self.w1_bus_path, 'r')
w1_file = w1_f.readlines()
for line in w1_file:
if 't=' in line:
temp = float((line.split('=')[-1].strip())) / 1000
self.temp_f.write('%6.0f; %7.3f\n' % (next_temp_t, temp))
self.temp_f.flush()
break
w1_f.close()
next_temp_t += temp_interval
# blinking LED (run)
if (time() - LED_t0) % LED_t_interval < .5 and self.LED_status[0] == False:
self.LED_status[0] = True
GPIO.output(self.LED1_pin, GPIO.HIGH)
elif (time() - LED_t0) % LED_t_interval >= .5 and self.LED_status[0] == True:
self.LED_status[0] = False
GPIO.output(self.LED1_pin, GPIO.LOW)
else:
pass
# sync LED
if (time() - LED_t0) % sync_LED_t_interval < .1 and self.LED_status[2] == False:
if self.led_f != None:
Cidx = int((self.buffer_size * buffer_no + last_idx) / self.config.channels)
self.led_f.write('%.4f\n' % (time() - LED_t0))
self.led_f.flush()
self.led_f2.write('%.0f\n' % Cidx)
self.led_f2.flush()
self.LED_status[2] = True
GPIO.output(self.LED_out_pin, GPIO.HIGH)
elif (time() - LED_t0) % sync_LED_t_interval >= .1 and self.LED_status[2] == True:
self.LED_status[2] = False
GPIO.output(self.LED_out_pin, GPIO.LOW)
else:
pass
##########################################
# Get the status of the background operation
status, transfer_status = self.ai_device.get_scan_status()
index = transfer_status.current_index
# print(index)
if index < 0 or index == last_idx:
continue
if index > last_idx:
(np.array(data[last_idx:index], dtype=np.float32) / self.config.gain).tofile(self.f)
else:
(np.array(data[last_idx:], dtype=np.float32) / self.config.gain).tofile(self.f)
if hasattr(self.config.end_clock, '__len__'):
if datetime.datetime.now().hour * 60 + datetime.datetime.now().minute == self.config.end_clock[0] * 60 + self.config.end_clock[1]:
self.f.flush()
GPIO.output(self.LED1_pin, GPIO.LOW)
GPIO.output(self.LED_out_pin, GPIO.LOW)
break
(np.array(data[:index], dtype=np.float32) / self.config.gain).tofile(self.f)
self.f.flush()
buffer_no += 1
last_idx = index
else:
self.terminate = True
print('\nDone!')
self.f.close()
if self.config.record_temp == True:
self.temp_f.close()
self.led_f.close()
GPIO.output(self.LED1_pin, GPIO.HIGH)
GPIO.output(self.LED2_pin, GPIO.HIGH)
sleep(2)
GPIO.output(self.LED1_pin, GPIO.LOW)
GPIO.output(self.LED2_pin, GPIO.LOW)
if self.daq_device:
# Stop the acquisition if it is still running.
if status == ScanStatus.RUNNING:
self.ai_device.scan_stop()
if self.daq_device.is_connected():
self.daq_device.disconnect()
self.daq_device.release()
if disp_eth_power == False:
subprocess.run(['tvservice', '-p'])
subprocess.run(['vcgencmd', 'display_power', '1'])
subprocess.run(['sudo', '/bin/chvt', '6'])
subprocess.run(['sudo', '/bin/chvt', '7'])
GPIO.cleanup()
def main():
while True:
config = Configuration(recording_duration=[12, 0])
if len(sys.argv) > 1:
rec_duration = int(sys.argv[1])
config.start_clock = [config.now.hour, config.now.minute]
config.end_clock = [config.now.hour, config.now.minute + rec_duration]
if config.end_clock[1] >= 60:
config.end_clock[0] += 1
config.end_clock[1] -= 60
rc = Recorder(config)
rc.run()
if rc.terminate == True:
break
if __name__ == '__main__':
main()