Rasp_grid/rasp_grid.py

197 lines
6.5 KiB
Python

from __future__ import print_function
from time import time, sleep
import RPi.GPIO as GPIO
import subprocess
import numpy as np
import matplotlib.pyplot as plt
from time import sleep, time
from os import system
from sys import stdout
from IPython import embed
from uldaq import (get_daq_device_inventory, DaqDevice, AInScanFlag, ScanStatus,
ScanOption, create_float_buffer, InterfaceType, AiInputMode)
def GPIO_setup():
# LED output pins
GPIO.setmode(GPIO.BOARD)
GPIO.setup(11, GPIO.OUT) # 1
GPIO.output(11, GPIO.LOW)
GPIO.setup(13, GPIO.OUT) # 2
GPIO.output(13, GPIO.LOW)
LED_status = [False, False]
# switch controlled input
GPIO.setup(16, GPIO.IN)
GPIO.setup(18, GPIO.IN)
return LED_status
def main():
LED_status = GPIO_setup()
# DAQ setup
if True:
channels = 16
status = ScanStatus.IDLE
descriptor_index = 0 # ToDo: ????
range_index = 0 # ToDo: ????
interface_type = InterfaceType.USB
low_channel = 0
high_channel = channels
samples_per_channel = 20000 # ToDo: ???? Buffer size ?!
rate = 20000
scan_options = ScanOption.CONTINUOUS
flags = AInScanFlag.DEFAULT
# Get descriptors for all of the available DAQ devices.
devices = get_daq_device_inventory(interface_type)
number_of_devices = len(devices)
if number_of_devices == 0:
raise Exception('Error: No DAQ devices found')
print('Found', number_of_devices, 'DAQ device(s):')
for i in range(number_of_devices):
print(' ', devices[i].product_name, ' (', devices[i].unique_id, ')', sep='')
# Create the DAQ device object associated with the specified descriptor index.
daq_device = None
daq_device = DaqDevice(devices[descriptor_index])
# Get the AiDevice object and verify that it is valid.
ai_device = None
ai_device = daq_device.get_ai_device()
if ai_device is None:
raise Exception('Error: The DAQ device does not support analog input')
# Verify that the specified device supports hardware pacing for analog input.
ai_info = ai_device.get_info()
if not ai_info.has_pacer():
raise Exception('\nError: The specified DAQ device does not support hardware paced analog input')
# Establish a connection to the DAQ device.
descriptor = daq_device.get_descriptor()
print('\nConnecting to', descriptor.dev_string, '- please wait...')
daq_device.connect()
# The default input mode is SINGLE_ENDED.
input_mode = AiInputMode.SINGLE_ENDED
# If SINGLE_ENDED input mode is not supported, set to DIFFERENTIAL.
if ai_info.get_num_chans_by_mode(AiInputMode.SINGLE_ENDED) <= 0:
input_mode = AiInputMode.DIFFERENTIAL
# Get the number of channels and validate the high channel number.
number_of_channels = ai_info.get_num_chans_by_mode(input_mode)
if high_channel >= number_of_channels:
high_channel = number_of_channels - 1
channel_count = high_channel - low_channel + 1
# Get a list of supported ranges and validate the range index.
ranges = ai_info.get_ranges(input_mode)
if range_index >= len(ranges):
range_index = len(ranges) - 1
# Allocate a buffer to receive the data.
data = create_float_buffer(channel_count, samples_per_channel)
system('clear')
# Start the acquisition.
rate = ai_device.a_in_scan(low_channel, high_channel, input_mode, ranges[range_index], samples_per_channel,
rate, scan_options, flags, data)
last_idx = 0
f = open('/media/pi/data1/test_file.raw', 'wb')
# LED on when here ... wait for switch to start data aquisition
GPIO.output(11, GPIO.HIGH)
LED_status[0] = True
while GPIO.input(16) == GPIO.LOW:
sleep(.1)
GPIO.output(11, GPIO.LOW)
LED_status[0] = False
LED_t = time()
LED_t_interval = 2
disp_eth_power = True
try:
while GPIO.input(16) == GPIO.HIGH:
# blinking LED
if time() - LED_t < .1 and LED_status[0] == False:
LED_status[0] = True
GPIO.output(11, GPIO.HIGH)
if time() - LED_t >= .1 and LED_status[0] == True:
LED_status[0] = False
GPIO.output(11, GPIO.LOW)
if time() - LED_t >= LED_t_interval:
LED_t = time()
# dist & eth0 controll
if GPIO.input(18) == GPIO.HIGH:
if disp_eth_power == True:
subprocess.run(['tvservice', '-o'])
subprocess.run(['vcgencmd', 'display_power', '0'])
subprocess.run(['sudo', 'ip', 'link', 'set', 'eth0', 'down'])
disp_eth_power = False
else:
if disp_eth_power == False:
subprocess.run(['tvservice', '-p'])
subprocess.run(['vcgencmd', 'display_power', '1'])
subprocess.run(['sudo', '/bin/chvt', '6'])
subprocess.run(['sudo', '/bin/chvt', '7'])
subprocess.run(['sudo', 'ip', 'link', 'set', 'eth0', 'up'])
disp_eth_power = True
# Get the status of the background operation
status, transfer_status = ai_device.get_scan_status()
index = transfer_status.current_index
if (last_idx > index) and (index != -1):
np.array(data[last_idx:], dtype=np.float32).tofile(f)
np.array(data[:index], dtype=np.float32).tofile(f)
if index == -1:
pass
else:
np.array(data[last_idx:index], dtype=np.float32).tofile(f)
if index == -1:
last_idx= len(data)
else:
last_idx = index
except KeyboardInterrupt:
pass
f.close()
if daq_device:
# Stop the acquisition if it is still running.
if status == ScanStatus.RUNNING:
ai_device.scan_stop()
if daq_device.is_connected():
daq_device.disconnect()
daq_device.release()
if disp_eth_power == False:
subprocess.run(['tvservice', '-p'])
subprocess.run(['vcgencmd', 'display_power', '1'])
subprocess.run(['sudo', '/bin/chvt', '6'])
subprocess.run(['sudo', '/bin/chvt', '7'])
# subprocess.run(['sudo', 'ip', 'link', 'set', 'eth0', 'up'])
GPIO.cleanup()
if __name__ == '__main__':
main()