From a7b62c5b3a7d905957675aeadf9624dc0985b394 Mon Sep 17 00:00:00 2001 From: Jan Grewe Date: Wed, 2 Oct 2024 10:28:36 +0200 Subject: [PATCH] [ringbuffer] first implementation --- pyrelacs/dataio/circbuffer.py | 110 ++++++++++++++++++++++++++++++++++ 1 file changed, 110 insertions(+) create mode 100644 pyrelacs/dataio/circbuffer.py diff --git a/pyrelacs/dataio/circbuffer.py b/pyrelacs/dataio/circbuffer.py new file mode 100644 index 0000000..b7b478f --- /dev/null +++ b/pyrelacs/dataio/circbuffer.py @@ -0,0 +1,110 @@ +import numpy as np + + +class CircBuffer: + def __init__(self, size: int, channels: int = 1): + self._size = size + self._channels = channels + self._buffer = np.zeros((channels, size), dtype=np.double) # or dtype of your choice + self._index = [0 for i in range(channels)] + self._is_full = [False for i in range(channels)] + self._totalcount = [0 for i in range(channels)] + self._overflows = [0 for i in range(channels)] + + @property + def size(self): + return self._size + + @property + def channel_count(self): + return self._channels + + def is_full(self, channel: int = 0): + return self._is_full[channel] + + def write_index(self, channel: int = 0): + return self._index[channel] + + def append(self, item, channel: int = 0): + self._buffer[channel, self.write_index(channel)] = item + self._index[channel] = (self.write_index(channel) + 1) % self._size + self._totalcount[channel] += 1 + if self._index[channel] == 0: + self._is_full[channel] = True + self._overflows[channel] += 1 + + def get_all(self, channel: int = 0): + """ + Return all valid values from the specified channel + """ + if self._is_full[channel]: + return np.concatenate((self._buffer[channel, self._index[channel]:], + self._buffer[channel, :self._index[channel]])) + else: + return self._buffer[channel, :self._index[channel]] + + def has_value(self, index, channel): + if index < 0 and self.is_full(channel): + return True + elif index < 0 and not self.is_full(channel): + return False + + if index >= self.size: + return False + + if index > self.write_index(channel) and self.is_full(channel): + return True + if index == self.write_index(channel) and self._totalcount[channel] == 0: + return False + + return True + + def valid_range(self, channel: int = 0): + """ Return the start index that are valid within the buffer + """ + start = 0 + count = 0 + if self._totalcount[channel] == 0: + return start, count + + if not self.is_full(channel): + count = self._totalcount[channel] + else: + count = self.size + return start, count + + def get(self, index: int = -1, channel: int = 0): + # easy case first, we can spare the effort of further checking + if index >= 0 and index < self.write_index(channel): + return self._buffer[channel, index] + + if index < 0: + index = self.write_index() - 1 + if self.has_value(index, channel): + return self._buffer[channel, index] + else: + raise IndexError(f"Invalid index {index} on ring buffer for channel{channel}") + + def read(self, start, count=1, channel=0): + """ Reads a numpy array from buffer""" + if start < 0 or count < 0: + raise IndexError(f"Invalid start ({start}) or count ({count}) for channel{channel}") + + if count == 1: + return np.array(self.get(start, channel)) + + vs, vc = self.valid_range(channel) + if start > self._totalcount[channel]: + raise IndexError(f"Invalid start index {start} is invalid with totalcount {self._totalcount[channel]} for channel{channel}") + if start > self.size: + raise IndexError(f"Invalid start index {start} for buffer with size {self.size}") + if count > self.size: + count = self.size + if count > vc: + count = vc + + if (start + count) < self.size: + return self._buffer[channel, start:start + count] + else: + return np.concatenate((self._buffer[channel, start:], + self._buffer[channel, :count - self.size + start]))