From 440243de5a35b94dc676d95193649cef6b4c4f4b Mon Sep 17 00:00:00 2001 From: Leon Braungardt Date: Wed, 14 May 2025 10:36:24 +0200 Subject: [PATCH 1/6] Change frame_buff from type list to type dict and add helper function to place received UDP payload in the correct position of the frame buffer --- mmwave/dataloader/adc.py | 62 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 60 insertions(+), 2 deletions(-) diff --git a/mmwave/dataloader/adc.py b/mmwave/dataloader/adc.py index f7d607dd..d8888c1b 100644 --- a/mmwave/dataloader/adc.py +++ b/mmwave/dataloader/adc.py @@ -16,6 +16,7 @@ from enum import Enum import numpy as np +import time class CMD(Enum): @@ -116,7 +117,7 @@ def __init__(self, static_ip='192.168.33.30', adc_ip='192.168.33.180', self.packet_count = [] self.byte_count = [] - self.frame_buff = [] + self.frame_buff = {} self.curr_buff = None self.last_frame = None @@ -157,7 +158,7 @@ def close(self): self.config_socket.close() def read(self, timeout=1): - """ Read in a single packet via UDP + """ Read in a single frame via UDP Args: timeout (float): Time to wait for packet before moving on @@ -236,6 +237,63 @@ def _read_data_packet(self): byte_count = struct.unpack('>Q', b'\x00\x00' + data[4:10][::-1])[0] packet_data = np.frombuffer(data[10:], dtype=np.uint16) return packet_num, byte_count, packet_data + + def _place_data_packet_in_frame_buffer(self, byte_count: int, payload: np.ndarray): + """Helper function to place one UDP packet at the correct position in the frame buffer + + Args: + byte_count (int): cumulative Bytes before this payload (from DCA1000 header) + payload (np.ndarray): uint16 from the UDP packet + + Returns: + (int, np.ndarray): Complete frame as a tuple of (frame_num, frame_data), + (None, None) if no frame is complete yet + """ + + offset = byte_count // 2 # Absolute position in UDP packet stream + idx = 0 # Read-index of payload + remaining = payload.size # Number of uint16 to process + completed = (None, None) # Tuple of (frame_id, frame_data) for complete captured frame + + while remaining > 0: + # Determine which frame_id this data chunk belongs to + frame_id = offset // UINT16_IN_FRAME + # Determine which packet number this is within the frame + packet_num_within_frame = offset % UINT16_IN_FRAME + n_uint16_to_frame_end = UINT16_IN_FRAME - packet_num_within_frame + + # Determine the size chunk of the data which is written to buffer + # (detect if the frame border is within this packet or not) + chunk_size = min(remaining, n_uint16_to_frame_end) + + # Create buffer within frame_buff obj for this frame if neccessary + buf = self.frame_buff.setdefault( + frame_id, + { + 'data': np.empty(UINT16_IN_FRAME, dtype=np.uint16), + 'filled': np.zeros(UINT16_IN_FRAME, dtype=bool), + 'first_seen': time.time() + } + ) + + # Write chunk to appropriate position in the frame's buffer + start = packet_num_within_frame + end = packet_num_within_frame + chunk_size + buf['data'][start:end] = payload[idx:idx+chunk_size] + buf['filled'][start:end] = True + + # If all packets for the frame have been read, add it to completed tuple + # (but do not return yet, as otherwise the rest of the packet data is lost) + if buf['filled'].all(): + completed = (frame_id, buf['data'].copy()) + del self.frame_buff[frame_id] + + # Persist in helper vars that chunk has been read + offset += chunk_size + idx += chunk_size + remaining -= chunk_size + + return completed def _listen_for_error(self): """Helper function to try and read in for an error message from the FPGA From d8f030dcee40bc1708e7b4ed50e1adee13bdcd30 Mon Sep 17 00:00:00 2001 From: Leon Braungardt Date: Wed, 14 May 2025 10:51:23 +0200 Subject: [PATCH 2/6] Add helper function to delete incomplete frames once a given timeout is exceeded, for example when one or more packets of a frame are missing --- mmwave/dataloader/adc.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/mmwave/dataloader/adc.py b/mmwave/dataloader/adc.py index d8888c1b..47d561db 100644 --- a/mmwave/dataloader/adc.py +++ b/mmwave/dataloader/adc.py @@ -294,6 +294,25 @@ def _place_data_packet_in_frame_buffer(self, byte_count: int, payload: np.ndarra remaining -= chunk_size return completed + + def _delete_incomplete_frames(self, timeout_seconds: float=0.2): + """Helper function to delete incomplete frames from frame buffer which exceed a given timeout + + Args: + timeout_seconds (float): Time after which incomplete frames are deleted + + Returns: + List[int]: List of frame numbers which were deleted (can be empty) + """ + now = time.time() + to_delete = [] + for frame_number, buf in self.frame_buff.items(): + if now - buf['first_seen'] > timeout_seconds: + to_delete.append(frame_number) + for frame_number in to_delete: + del self.frame_buff[frame_number] + + return to_delete def _listen_for_error(self): """Helper function to try and read in for an error message from the FPGA From c1de58c526bc40f2d94acc973fefac4f75ab3671 Mon Sep 17 00:00:00 2001 From: Leon Braungardt Date: Wed, 14 May 2025 11:01:04 +0200 Subject: [PATCH 3/6] Change read() function to use the added helper functions --- mmwave/dataloader/adc.py | 39 +++++++++++++++------------------------ 1 file changed, 15 insertions(+), 24 deletions(-) diff --git a/mmwave/dataloader/adc.py b/mmwave/dataloader/adc.py index 47d561db..84f8a4ff 100644 --- a/mmwave/dataloader/adc.py +++ b/mmwave/dataloader/adc.py @@ -170,34 +170,25 @@ def read(self, timeout=1): # Configure self.data_socket.settimeout(timeout) - # Frame buffer - ret_frame = np.zeros(UINT16_IN_FRAME, dtype=np.uint16) - - # Wait for start of next frame - while True: - packet_num, byte_count, packet_data = self._read_data_packet() - if byte_count % BYTES_IN_FRAME_CLIPPED == 0: - packets_read = 1 - ret_frame[0:UINT16_IN_PACKET] = packet_data - break - - # Read in the rest of the frame + # Read packets until a full frame is read while True: + # Remove incomplete frames from frame buffer which exceed a timeout + dropped_frames = self._delete_incomplete_frames(timeout_seconds=0.2) + if dropped_frames: + ids = ", ".join(str(f) for f in dropped_frames) + print(f"WARNING: Dropped Frame(s) {ids} since they weren't complete.") + + # Read UDP packet packet_num, byte_count, packet_data = self._read_data_packet() - packets_read += 1 - - if byte_count % BYTES_IN_FRAME_CLIPPED == 0: - self.lost_packets = PACKETS_IN_FRAME_CLIPPED - packets_read - return ret_frame - curr_idx = ((packet_num - 1) % PACKETS_IN_FRAME_CLIPPED) - try: - ret_frame[curr_idx * UINT16_IN_PACKET:(curr_idx + 1) * UINT16_IN_PACKET] = packet_data - except: - pass + # Place data from UDP packet in frame buffer + frame_num, frame_data = self._place_data_packet_in_frame_buffer( + byte_count=byte_count, + payload=packet_data + ) - if packets_read > PACKETS_IN_FRAME_CLIPPED: - packets_read = 0 + if frame_data is not None: + return frame_data def _send_command(self, cmd, length='0000', body='', timeout=1): """Helper function to send a single commmand to the FPGA From e78d7abedf11e855bd3a0d7a20afa89891758b21 Mon Sep 17 00:00:00 2001 From: Leon Braungardt Date: Wed, 14 May 2025 11:06:16 +0200 Subject: [PATCH 4/6] remove unneccessary "dynamic constants": BYTES_IN_FRAME_CLIPPED, PACKETS_IN_FRAME, PACKETS_IN_FRAME_CLIPPED, UINT16_IN_PACKET --- mmwave/dataloader/adc.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/mmwave/dataloader/adc.py b/mmwave/dataloader/adc.py index 84f8a4ff..ecf971e9 100644 --- a/mmwave/dataloader/adc.py +++ b/mmwave/dataloader/adc.py @@ -55,10 +55,6 @@ def __str__(self): # DYNAMIC BYTES_IN_FRAME = (ADC_PARAMS['chirps'] * ADC_PARAMS['rx'] * ADC_PARAMS['tx'] * ADC_PARAMS['IQ'] * ADC_PARAMS['samples'] * ADC_PARAMS['bytes']) -BYTES_IN_FRAME_CLIPPED = (BYTES_IN_FRAME // BYTES_IN_PACKET) * BYTES_IN_PACKET -PACKETS_IN_FRAME = BYTES_IN_FRAME / BYTES_IN_PACKET -PACKETS_IN_FRAME_CLIPPED = BYTES_IN_FRAME // BYTES_IN_PACKET -UINT16_IN_PACKET = BYTES_IN_PACKET // 2 UINT16_IN_FRAME = BYTES_IN_FRAME // 2 From f747a04b498ee27a4f3a39d54b44b942fdc6a5d2 Mon Sep 17 00:00:00 2001 From: Leon Braungardt Date: Sat, 17 May 2025 10:50:05 +0200 Subject: [PATCH 5/6] Only check for incomplete frames when a complete frame has been assembled and make it settable via "dynamic constant" --- mmwave/dataloader/adc.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/mmwave/dataloader/adc.py b/mmwave/dataloader/adc.py index ecf971e9..8971f3e2 100644 --- a/mmwave/dataloader/adc.py +++ b/mmwave/dataloader/adc.py @@ -56,6 +56,7 @@ def __str__(self): BYTES_IN_FRAME = (ADC_PARAMS['chirps'] * ADC_PARAMS['rx'] * ADC_PARAMS['tx'] * ADC_PARAMS['IQ'] * ADC_PARAMS['samples'] * ADC_PARAMS['bytes']) UINT16_IN_FRAME = BYTES_IN_FRAME // 2 +DELETE_INCOMPLETE_FRAMES_AFTER_SECONDS = 0.2 class DCA1000: @@ -168,12 +169,6 @@ def read(self, timeout=1): # Read packets until a full frame is read while True: - # Remove incomplete frames from frame buffer which exceed a timeout - dropped_frames = self._delete_incomplete_frames(timeout_seconds=0.2) - if dropped_frames: - ids = ", ".join(str(f) for f in dropped_frames) - print(f"WARNING: Dropped Frame(s) {ids} since they weren't complete.") - # Read UDP packet packet_num, byte_count, packet_data = self._read_data_packet() @@ -184,6 +179,12 @@ def read(self, timeout=1): ) if frame_data is not None: + # Remove incomplete frames from frame buffer which exceed a timeout + dropped_frames = self._delete_incomplete_frames(timeout_seconds=DELETE_INCOMPLETE_FRAMES_AFTER_SECONDS) + if dropped_frames: + ids = ", ".join(str(f) for f in dropped_frames) + print(f"WARNING: Dropped Frame(s) {ids} since they weren't complete.") + # Return the complete frame return frame_data def _send_command(self, cmd, length='0000', body='', timeout=1): From 22b47433eb754b473ccf6a97096cd52a3038b4ab Mon Sep 17 00:00:00 2001 From: Leon Braungardt Date: Sat, 17 May 2025 10:51:16 +0200 Subject: [PATCH 6/6] Increase timeout after which frame is deleted in order to cover framePeriods up to around 500ms --- mmwave/dataloader/adc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mmwave/dataloader/adc.py b/mmwave/dataloader/adc.py index 8971f3e2..e7591a6d 100644 --- a/mmwave/dataloader/adc.py +++ b/mmwave/dataloader/adc.py @@ -56,7 +56,7 @@ def __str__(self): BYTES_IN_FRAME = (ADC_PARAMS['chirps'] * ADC_PARAMS['rx'] * ADC_PARAMS['tx'] * ADC_PARAMS['IQ'] * ADC_PARAMS['samples'] * ADC_PARAMS['bytes']) UINT16_IN_FRAME = BYTES_IN_FRAME // 2 -DELETE_INCOMPLETE_FRAMES_AFTER_SECONDS = 0.2 +DELETE_INCOMPLETE_FRAMES_AFTER_SECONDS = 1.0 class DCA1000: