From 85fd70f8cad14be13cb1c0459958a37043ab19f3 Mon Sep 17 00:00:00 2001
From: wendtalexander <wendtalexander@protonmail.com>
Date: Fri, 4 Oct 2024 11:49:33 +0200
Subject: [PATCH] [dataio] adding chunking for reading daq input

---
 pyrelacs/dataio/daq_producer.py | 115 +++++++++++++++++++++++++++-----
 1 file changed, 98 insertions(+), 17 deletions(-)

diff --git a/pyrelacs/dataio/daq_producer.py b/pyrelacs/dataio/daq_producer.py
index b24f7f5..67cff41 100644
--- a/pyrelacs/dataio/daq_producer.py
+++ b/pyrelacs/dataio/daq_producer.py
@@ -14,42 +14,59 @@ faulthandler.enable()
 
 
 class DaqProducer:
-    def __init__(self, buffer: CircBuffer, device: uldaq.DaqDevice):
+    def __init__(
+        self, buffer: CircBuffer, device: uldaq.DaqDevice, channels: list[int]
+    ):
         self.buffer = buffer
         self.device = device
         self.ai_device = self.device.get_ai_device()
+        self.channels = channels
 
     def read_analog_continously(
         self,
-        samplerate: float = 20,
         *args,
         **kwargs,
     ):
         log.debug("starting acquisition")
-        channel = self.buffer.channel_count
-        size = self.buffer.size
-        data_in = uldaq.create_float_buffer(channel, size)
+        if self.channels[0] == self.channels[1]:
+            channel_range = np.arange(1)
+        else:
+            channel_range = np.arange(self.channels[0], self.channels[1] + 1)
+
+        assert channel_range.size == self.buffer.channel_count, ValueError(
+            f"Missmatch in channel count,\n daq_channel: "
+            f"{channel_range.size}\n buffer_channel: {self.buffer.channel_count}"
+        )
+
+        # let the buffer for the daq device hold 5 seconds of data
+        daq_buffer_size = self.buffer.samplerate * 5
+
+        data_in = uldaq.create_float_buffer(channel_range.size, daq_buffer_size)
+        log.debug(f"Buffersize for daq {len(data_in)}")
+        log.debug(f"Buffersize {self.buffer.size}")
 
-        # BUG: Check for multiple channels
         er = self.ai_device.a_in_scan(
-            1,
-            1,
+            self.channels[0],
+            self.channels[1],
             uldaq.AiInputMode.SINGLE_ENDED,
             uldaq.Range.BIP10VOLTS,
-            self.buffer.size,
-            samplerate,
+            daq_buffer_size,
+            self.buffer.samplerate,
             uldaq.ScanOption.CONTINUOUS,
             uldaq.AInScanFlag.DEFAULT,
             data=data_in,
         )
 
+        chunk_size = int(daq_buffer_size / 10)
+        wrote_chunk = False
+
         start_time = time.time()
         daq_status = uldaq.ScanStatus.IDLE
         while daq_status == uldaq.ScanStatus.IDLE:
             daq_status = self.ai_device.get_scan_status()[0]
-            prev_count = 0
-            prev_index = 0
             while daq_status != uldaq.ScanStatus.IDLE:
+                prev_count = 0
+                prev_index = 0
                 while time.time() - start_time < 10:
                     daq_status, transfer_status = self.ai_device.get_scan_status()
                     # The index into the data buffer immediately following the last sample transferred.
@@ -59,19 +76,83 @@ class DaqProducer:
                     # The number of samples per channel transferred since the scan started
                     channel_samples = transfer_status.current_scan_count
 
-                    if current_index > prev_index:
-                        self.buffer.append(data_in[current_index])
-                    prev_index = current_index
+                    new_data_count = total_samples - prev_count
+                    # check if counts if new data is bigger than the buffer
+                    # if that happends stop the acquisition
+                    if new_data_count > len(data_in):
+                        self.ai_device.scan_stop()
+                        log.error("A Buffer overrun occurred")
+                        break
+
+                    if new_data_count > chunk_size:
+                        wrote_chunk = True
+                        # index wraps around the buffer
+                        if prev_index + chunk_size > len(data_in) - 1:
+                            log.debug("Chunk wraps around buffersize")
+                            first_chunk = len(data_in) - prev_index
+                            [
+                                self.buffer.append(data_in[prev_index + i])
+                                for i in range(first_chunk)
+                            ]
+                            second_chunk = chunk_size - first_chunk
+                            [
+                                self.buffer.append(data_in[i])
+                                for i in range(second_chunk)
+                            ]
+                        else:
+                            log.debug("Writing chunk to buffer")
+                            [
+                                self.buffer.append(data_in[prev_index + i])
+                                for i in range(chunk_size)
+                            ]
+
+                            self.buffer.append(data_in[current_index])
+
+                        if total_samples - prev_count > len(data_in):
+                            self.ai_device.scan_stop()
+                            log.error("A Buffer overrun occurred")
+                            break
+
+                    else:
+                        wrote_chunk = False
+                    if wrote_chunk:
+                        prev_count += chunk_size
+                        prev_index += chunk_size
+                        prev_index %= daq_buffer_size
 
                 self.ai_device.scan_stop()
                 daq_status, transfer_status = self.ai_device.get_scan_status()
+                current_index = transfer_status.current_index
                 log.debug(daq_status)
+
                 log.debug(transfer_status.current_index)
                 log.debug(transfer_status.current_total_count)
                 log.debug(transfer_status.current_scan_count)
+                log.debug(self.buffer.totocount())
+                log.debug("Appending last chunk")
+
+                if prev_index + chunk_size > len(data_in) - 1:
+                    log.debug("Chunk wraps around buffersize")
+                    first_chunk = len(data_in) - prev_index
+                    [
+                        self.buffer.append(data_in[prev_index + i])
+                        for i in range(first_chunk)
+                    ]
+                    second_chunk = chunk_size - first_chunk
+                    [self.buffer.append(data_in[i]) for i in range(second_chunk)]
+                else:
+                    log.debug("Writing chunk to buffer")
+                    [
+                        self.buffer.append(data_in[prev_index + i])
+                        for i in range(chunk_size)
+                    ]
+
+                    self.buffer.append(data_in[current_index])
                 log.info("stopping")
+
                 break
             break
+        return "Done. "
 
 
 if __name__ == "__main__":
@@ -84,6 +165,6 @@ if __name__ == "__main__":
         raise e
     daq_device.connect()
 
-    buf = CircBuffer(size=300)
-    producer = DaqProducer(buf, daq_device)
+    buf = CircBuffer(size=1_000_000, samplerate=100)
+    producer = DaqProducer(buf, daq_device, [1, 1])
     producer.read_analog_continously()