import time import pyqtgraph as pg from IPython import embed import numpy as np from pyqtgraph.Qt.QtCore import QTimer from pyrelacs.dataio.circbuffer import CircBuffer from pyrelacs.util.logging import config_logging log = config_logging() class Continously: def __init__(self, figure: pg.GraphicsLayoutWidget, buffer: CircBuffer): self.figure = figure self.buffer = buffer self.last_plotted_index_analog_in_0 = 0 self.last_plotted_index_analog_in_1 = 0 self.timer = QTimer() def plot(self, *args, **kwargs): self.figure.setBackground("w") prev_plot = self.figure.getItem(row=0, col=0) if prev_plot: self.figure.removeItem(prev_plot) self.analog_in_0 = self.figure.addPlot(row=0, col=0) self.analog_in_1 = self.figure.addPlot(row=1, col=0) pen = pg.mkPen("red") self.time_analog_in_0 = np.zeros(self.buffer.size) self.data_analog_in_0 = np.empty(self.buffer.size) self.line_analog_in_0 = self.analog_in_0.plot( self.time_analog_in_0, self.data_analog_in_0, pen=pen, # symbol="o", ) pen = pg.mkPen("red") self.time_analog_in_1 = np.zeros(self.buffer.size) self.data_analog_in_1 = np.empty(self.buffer.size) self.line_analog_in_1 = self.analog_in_1.plot( self.time_analog_in_1, self.data_analog_in_1, pen=pen, # symbol="o", ) # self.plot_index = 0 self.CHUNK_PLOT = int(self.buffer.samplerate / 6) self.PLOT_HISTORY = 500_000 # The amount of data you want to keep on screen self.timer.setInterval(150) self.timer.timeout.connect(self.update_plot) self.timer.timeout.connect(self.update_plot_1) self.timer.start() def update_plot(self): current_index = self.buffer.write_index() total_count = self.buffer.totalcount(channel=0) start_time = time.time() if total_count - self.last_plotted_index_analog_in_0 >= self.CHUNK_PLOT: try: times, items = self.buffer.read( self.last_plotted_index_analog_in_0, extend=self.CHUNK_PLOT, channel=0, ) self.time_analog_in_0 = np.concatenate((self.time_analog_in_0, times))[ -self.PLOT_HISTORY : ] self.data_analog_in_0 = np.concatenate((self.data_analog_in_0, items))[ -self.PLOT_HISTORY : ] self.line_analog_in_0.setData( self.time_analog_in_0, self.data_analog_in_0, ) self.last_plotted_index_analog_in_0 += self.CHUNK_PLOT except IndexError: log.error("Could not acces the data from the buffer for plotting") end_time = time.time() log.debug(f"total time for plotting {end_time - start_time}") else: pass def update_plot_1(self): total_count = self.buffer.totalcount(channel=1) start_time = time.time() if total_count - self.last_plotted_index_analog_in_1 >= self.CHUNK_PLOT: try: times, items = self.buffer.read( self.last_plotted_index_analog_in_1, extend=self.CHUNK_PLOT, channel=1, ) self.time_analog_in_1 = np.concatenate((self.time_analog_in_0, times))[ -self.PLOT_HISTORY : ] self.data_analog_in_1 = np.concatenate((self.data_analog_in_0, items))[ -self.PLOT_HISTORY : ] self.line_analog_in_1.setData( self.time_analog_in_1, self.data_analog_in_1, ) self.last_plotted_index_analog_in_1 += self.CHUNK_PLOT except IndexError: log.error("Could not acces the data from the buffer for plotting") end_time = time.time() log.debug(f"total time for plotting {end_time - start_time}") else: pass def stop_plotting(self): self.timer.stop() if self.last_plotted_index_analog_in_0 > 0: total_count = self.buffer.totalcount() times, items = self.buffer.read( self.last_plotted_index_analog_in_0, extend=total_count - self.last_plotted_index_analog_in_0, ) self.time_analog_in_0 = np.concatenate((self.time_analog_in_0, times))[ -self.PLOT_HISTORY : ] self.data_analog_in_0 = np.concatenate((self.data_analog_in_0, items))[ -self.PLOT_HISTORY : ] self.line_analog_in_0.setData( self.time_analog_in_0, self.data_analog_in_0, ) self.last_plotted_index_analog_in_0 += ( total_count - self.last_plotted_index_analog_in_0 ) def refresh(self): self.analog_in_0.enableAutoRange() self.analog_in_1.enableAutoRange()