import pyqtgraph as pg from IPython import embed import numpy as np from pyqtgraph.Qt.QtCore import QTimer from pyrelacs.dataio.circbuffer import CircBuffer from pyrelacs.util.logging import config_logging log = config_logging() class Continously: def __init__(self, figure: pg.GraphicsLayoutWidget, buffer: CircBuffer): self.figure = figure self.buffer = buffer def plot(self, *args, **kwargs): self.figure.setBackground("w") prev_plot = self.figure.getItem(row=0, col=0) if prev_plot: self.figure.removeItem(prev_plot) self.continous_ax = self.figure.addPlot(row=0, col=0) pen = pg.mkPen("red") self.time = np.zeros(self.buffer.size) self.data = np.zeros(self.buffer.size) self.line = self.continous_ax.plot( self.time, self.data, pen=pen, # symbol="o", ) # self.plot_index = 0 self.timer = QTimer() self.CHUNK_PLOT = 10_000 self.timer.setInterval(200) self.timer.timeout.connect(self.update_plot) self.timer.start() def update_plot(self): # log.debug(self.buffer.totalcount()) if self.buffer.totalcount() > self.CHUNK_PLOT: log.debug(self.buffer.totalcount()) try: times, items = self.buffer.read( self.buffer.write_index() - self.CHUNK_PLOT - 1_000, extend=self.CHUNK_PLOT, ) except IndexError as e: items = np.zeros(self.CHUNK_PLOT) times = np.zeros(self.CHUNK_PLOT) log.debug("No Data Available") log.debug(f"Index Error {e}") # self.time = np.roll(self.time, -len(items)) # self.data = np.roll(self.data, -len(items)) # # self.time[-len(times) :] = times # self.data[-len(items) :] = items self.line.setData( times, items, ) # self.plot_index += len(items) else: pass def stop_plotting(self): self.timer.stop() def refresh(self): self.continous_ax.enableAutoRange()