import numpy as np class CircBuffer: def __init__(self, size: int, channels: int = 1): self._size = size self._channels = channels self._buffer = np.zeros((channels, size), dtype=np.double) # or dtype of your choice self._index = [0 for i in range(channels)] self._is_full = [False for i in range(channels)] self._totalcount = [0 for i in range(channels)] self._overflows = [0 for i in range(channels)] @property def size(self): return self._size @property def channel_count(self): return self._channels def is_full(self, channel: int = 0): return self._is_full[channel] def write_index(self, channel: int = 0): return self._index[channel] def append(self, item, channel: int = 0): self._buffer[channel, self.write_index(channel)] = item self._index[channel] = (self.write_index(channel) + 1) % self._size self._totalcount[channel] += 1 if self._index[channel] == 0: self._is_full[channel] = True self._overflows[channel] += 1 def get_all(self, channel: int = 0): """ Return all valid values from the specified channel """ if self._is_full[channel]: return np.concatenate((self._buffer[channel, self._index[channel]:], self._buffer[channel, :self._index[channel]])) else: return self._buffer[channel, :self._index[channel]] def has_value(self, index, channel): if index < 0 and self.is_full(channel): return True elif index < 0 and not self.is_full(channel): return False if index >= self.size: return False if index > self.write_index(channel) and self.is_full(channel): return True if index == self.write_index(channel) and self._totalcount[channel] == 0: return False return True def valid_range(self, channel: int = 0): """ Return the start index that are valid within the buffer """ start = 0 count = 0 if self._totalcount[channel] == 0: return start, count if not self.is_full(channel): count = self._totalcount[channel] else: count = self.size return start, count def get(self, index: int = -1, channel: int = 0): # easy case first, we can spare the effort of further checking if index >= 0 and index < self.write_index(channel): return self._buffer[channel, index] if index < 0: index = self.write_index() - 1 if self.has_value(index, channel): return self._buffer[channel, index] else: raise IndexError(f"Invalid index {index} on ring buffer for channel{channel}") def read(self, start, count=1, channel=0): """ Reads a numpy array from buffer""" if start < 0 or count < 0: raise IndexError(f"Invalid start ({start}) or count ({count}) for channel{channel}") if count == 1: return np.array(self.get(start, channel)) vs, vc = self.valid_range(channel) if start > self._totalcount[channel]: raise IndexError(f"Invalid start index {start} is invalid with totalcount {self._totalcount[channel]} for channel{channel}") if start > self.size: raise IndexError(f"Invalid start index {start} for buffer with size {self.size}") if count > self.size: count = self.size if count > vc: count = vc if (start + count) < self.size: return self._buffer[channel, start:start + count] else: return np.concatenate((self._buffer[channel, start:], self._buffer[channel, :count - self.size + start]))