diff --git a/src/caliscope/core/intrinsic_calibrator.py b/src/caliscope/core/intrinsic_calibrator.py index 7f0ac9db..2b47e1a3 100644 --- a/src/caliscope/core/intrinsic_calibrator.py +++ b/src/caliscope/core/intrinsic_calibrator.py @@ -8,24 +8,24 @@ from caliscope.cameras.camera_array import CameraData from caliscope.packets import FramePacket -from caliscope.recording import FramePacketPublisher +from caliscope.recording import FramePacketStreamer logger = logging.getLogger(__name__) class IntrinsicCalibrator: """ - Takes a FramePacketPublisher and determines a CameraData object from it. - Publisher needs to have a charuco tracker assigned to it. + Takes a FramePacketStreamer and determines a CameraData object from it. + Streamer needs to have a charuco tracker assigned to it. """ - def __init__(self, camera_data: CameraData, publisher: FramePacketPublisher): + def __init__(self, camera_data: CameraData, streamer: FramePacketStreamer): self.camera = camera_data # pointer needed to update params - self.publisher = publisher + self.streamer = streamer self.initialize_point_history() self.frame_packet_q = Queue() - self.publisher.subscribe(self.frame_packet_q) + self.streamer.subscribe(self.frame_packet_q) # The following group of parameters relate to the autopopulation of the calibrator self.grid_history_q = Queue() # for passing ids, img_loc used in calibration @@ -50,7 +50,7 @@ def harvest_worker(): self.add_frame_packet(frame_packet) - logger.info(f"Harvest frames successfully ended in calibrator for port {self.publisher.port}") + logger.info(f"Harvest frames successfully ended in calibrator for port {self.streamer.port}") self.harvest_thread = Thread(target=harvest_worker, args=[], daemon=True) self.harvest_thread.start() @@ -58,7 +58,7 @@ def harvest_worker(): def stop(self): logger.info("Beginning to stop intrinsic calibrator") self.stop_event.set() - self.publisher.unsubscribe(self.frame_packet_q) + self.streamer.unsubscribe(self.frame_packet_q) self.frame_packet_q.put(-1) @property @@ -115,7 +115,7 @@ def add_frame_packet(self, frame_packet: FramePacket): self.auto_pop_frame_wait = max(self.auto_pop_frame_wait - 1, 0) logger.debug(f"Current index is {index}") - if index == self.publisher.last_frame_index: + if index == self.streamer.last_frame_index: # end of stream, so stop auto pop and backfill to hit grid target logger.info("End of autopop detected...") self.auto_store_data.clear() @@ -212,8 +212,8 @@ def calibrate_camera(self): logger.info(f"Calibrating camera {self.camera.port}....") - width = self.publisher.size[0] - height = self.publisher.size[1] + width = self.streamer.size[0] + height = self.streamer.size[1] if self.camera.fisheye: # Convert to float32 and RESHAPE to add the required channel dimension diff --git a/src/caliscope/gui/presenters/intrinsic_calibration_presenter.py b/src/caliscope/gui/presenters/intrinsic_calibration_presenter.py index c3e078aa..b574dec5 100644 --- a/src/caliscope/gui/presenters/intrinsic_calibration_presenter.py +++ b/src/caliscope/gui/presenters/intrinsic_calibration_presenter.py @@ -12,7 +12,7 @@ from enum import Enum, auto from pathlib import Path from queue import Empty, Queue -from threading import Event, Thread +from threading import Event, Lock, Thread import cv2 import numpy as np @@ -23,10 +23,10 @@ IntrinsicCalibrationResult, calibrate_intrinsics, ) -from caliscope.core.frame_selector import select_calibration_frames +from caliscope.core.frame_selector import FrameSelectionResult, select_calibration_frames from caliscope.core.point_data import ImagePoints from caliscope.packets import FramePacket, PointPacket -from caliscope.recording import create_publisher +from caliscope.recording import create_streamer from caliscope.task_manager.cancellation import CancellationToken from caliscope.task_manager.task_handle import TaskHandle from caliscope.task_manager.task_manager import TaskManager @@ -56,7 +56,7 @@ class IntrinsicCalibrationPresenter(QObject): submission of calibration to TaskManager. Exposes a display_queue for the View's processing thread to consume directly (avoids GUI thread hop). - A single FramePacketPublisher lives across all states, enabling scrubbing + A single FramePacketStreamer lives across all states, enabling scrubbing in READY/CALIBRATED states and collection during COLLECTING state. Signals: @@ -109,6 +109,10 @@ def __init__( self._collected_points: list[tuple[int, PointPacket]] = [] self._calibrated_camera: CameraData | None = None self._calibration_task: TaskHandle | None = None + self._selection_result: FrameSelectionResult | None = None + + # Lock for thread-safe access to collected_points from View + self._overlay_lock = Lock() # Display queue for View consumption self._display_queue: Queue[FramePacket | None] = Queue() @@ -116,23 +120,23 @@ def __init__( # Collection state self._is_collecting = False - # Single publisher for all states (scrubbing + collection) - self._publisher = create_publisher( + # Single streamer for all states (scrubbing + collection) + self._streamer = create_streamer( video_directory=self._video_path.parent, port=self._camera.port, rotation_count=self._camera.rotation_count, tracker=self._tracker, - break_on_last=False, # Pause at end instead of exit + end_behavior="pause", # Pause at end for interactive scrubbing ) self._frame_queue: Queue[FramePacket] = Queue() - self._publisher.subscribe(self._frame_queue) + self._streamer.subscribe(self._frame_queue) - # Start publisher worker (will read first frame, then we pause) + # Start streamer worker (will read first frame, then we pause) self._stream_handle = self._task_manager.submit( - self._publisher.play_worker, - name=f"Publisher port {self._port}", + self._streamer.play_worker, + name=f"Streamer port {self._port}", ) - self._publisher.pause() # Immediately pause for scrubbing mode + self._streamer.pause() # Immediately pause for scrubbing mode # Guaranteed initial frame display (don't rely on thread timing) self._load_initial_frame() @@ -180,13 +184,31 @@ def camera(self) -> CameraData: @property def frame_count(self) -> int: """Total frames in video.""" - return self._publisher.last_frame_index + 1 + return self._streamer.last_frame_index + 1 @property def current_frame_index(self) -> int: """Current frame position.""" return self._current_frame_index + @property + def collected_points(self) -> list[tuple[int, PointPacket]]: + """Thread-safe access to accumulated points for overlay rendering.""" + with self._overlay_lock: + return list(self._collected_points) + + @property + def selected_frame_indices(self) -> list[int] | None: + """Frame indices used in calibration, or None if not yet calibrated.""" + if self._selection_result is None: + return None + return self._selection_result.selected_frames + + @property + def board_connectivity(self) -> set[tuple[int, int]]: + """Point ID pairs that should be connected to form grid.""" + return self._tracker.get_connected_points() + def refresh_display(self) -> None: """Put a fresh frame on the display queue. @@ -196,14 +218,14 @@ def refresh_display(self) -> None: self._load_initial_frame() def seek_to(self, frame_index: int) -> None: - """Seek to frame. Works in READY/CALIBRATED states via publisher's jump_to.""" + """Seek to frame. Works in READY/CALIBRATED states via streamer's seek_to.""" if self.state not in (PresenterState.READY, PresenterState.CALIBRATED): return frame_index = max(0, min(frame_index, self.frame_count - 1)) - self._publisher.jump_to( - frame_index, exact=True - ) # exact=False would cause Fast seek, skipping between keyframes + self._streamer.seek_to( + frame_index, precise=True + ) # precise=False would cause Fast seek, skipping between keyframes def _load_initial_frame(self) -> None: """Read first frame from video and put on display queue.""" @@ -228,7 +250,7 @@ def _load_initial_frame(self) -> None: def start_calibration(self) -> None: """Start collecting calibration frames. - Resets to beginning and unpauses the publisher. + Resets to beginning and unpauses the streamer. """ if self.state not in (PresenterState.READY, PresenterState.CALIBRATED): logger.warning(f"Cannot start calibration in state {self.state}") @@ -236,18 +258,21 @@ def start_calibration(self) -> None: logger.info(f"Starting calibration collection for port {self._port}") - # Set collecting FIRST to block concurrent seeks - self._is_collecting = True - self._emit_state_changed() - - # Clear previous attempt's data - self._collected_points.clear() + # Clear previous calibration data BEFORE setting collecting flag + # (state is computed: CALIBRATED check comes before COLLECTING check) + with self._overlay_lock: + self._collected_points.clear() self._calibrated_camera = None self._calibration_task = None + self._selection_result = None + + # Now set collecting and emit state change + self._is_collecting = True + self._emit_state_changed() # Reset to beginning and start playback - self._publisher.jump_to(0, exact=True) - self._publisher.unpause() + self._streamer.seek_to(0, precise=True) + self._streamer.unpause() def stop_calibration(self) -> None: """Stop collection and return to READY state. @@ -260,8 +285,9 @@ def stop_calibration(self) -> None: logger.info(f"Stopping calibration collection for port {self._port}") - self._publisher.pause() - self._collected_points.clear() + self._streamer.pause() + with self._overlay_lock: + self._collected_points.clear() self._is_collecting = False self._emit_state_changed() @@ -269,12 +295,12 @@ def _consume_frames(self) -> None: """Pull frames from queue, accumulate points when collecting, emit for display. Runs continuously across all states. Exits when stop_event is set or - publisher is cancelled externally. + streamer is cancelled externally. """ logger.debug(f"Consumer thread started for port {self._port}") while not self._stop_event.is_set(): - # Exit if publisher was cancelled externally + # Exit if streamer was cancelled externally if self._stream_handle is not None and self._stream_handle.state == TaskState.CANCELLED: break @@ -289,7 +315,8 @@ def _consume_frames(self) -> None: # Accumulate points only during collection if self._is_collecting and packet.points is not None and len(packet.points.point_id) > 0: - self._collected_points.append((packet.frame_index, packet.points)) + with self._overlay_lock: + self._collected_points.append((packet.frame_index, packet.points)) # Always emit for display self._display_queue.put(packet) @@ -297,14 +324,14 @@ def _consume_frames(self) -> None: self.frame_position_changed.emit(packet.frame_index) # Detect end of video during collection - if self._is_collecting and packet.frame_index >= self._publisher.last_frame_index: + if self._is_collecting and packet.frame_index >= self._streamer.last_frame_index: self._on_collection_complete() logger.debug(f"Consumer thread exiting for port {self._port}") def _on_collection_complete(self) -> None: """Called when video playback finishes. Submits calibration task.""" - self._publisher.pause() + self._streamer.pause() self._is_collecting = False if len(self._collected_points) == 0: @@ -335,6 +362,9 @@ def _on_collection_complete(self) -> None: logger.info(f"Selected {len(selection_result.selected_frames)} frames for calibration") + # Store selection result for overlay rendering + self._selection_result = selection_result + # Submit calibration to TaskManager def calibration_worker(token: CancellationToken, handle: TaskHandle) -> IntrinsicCalibrationResult: return calibrate_intrinsics( @@ -402,8 +432,8 @@ def _on_calibration_complete(self, result: IntrinsicCalibrationResult) -> None: self.calibration_complete.emit(self._calibrated_camera) self._emit_state_changed() - # Jump to first frame so View can display with undistortion - self._publisher.jump_to(0, exact=True) + # Seek to first frame so View can display with undistortion + self._streamer.seek_to(0, precise=True) def _on_calibration_failed(self, exc_type: str, message: str) -> None: """Handle calibration failure.""" @@ -424,10 +454,10 @@ def cleanup(self) -> None: if self._consumer_thread is not None: self._consumer_thread.join(timeout=2.0) - # Cancel publisher worker + # Cancel streamer worker if self._stream_handle is not None: self._stream_handle.cancel() - # Clean up publisher - self._publisher.unsubscribe(self._frame_queue) - self._publisher.close() + # Clean up streamer + self._streamer.unsubscribe(self._frame_queue) + self._streamer.close() diff --git a/src/caliscope/gui/views/intrinsic_calibration_dev_view.py b/src/caliscope/gui/views/intrinsic_calibration_dev_view.py index d5643863..e32c6fcb 100644 --- a/src/caliscope/gui/views/intrinsic_calibration_dev_view.py +++ b/src/caliscope/gui/views/intrinsic_calibration_dev_view.py @@ -6,9 +6,13 @@ """ import logging +from dataclasses import dataclass from queue import Empty, Queue from threading import Event +from typing import Any +import cv2 +from numpy.typing import NDArray from PySide6.QtCore import Qt, QThread, Signal from PySide6.QtGui import QPixmap from PySide6.QtWidgets import ( @@ -32,34 +36,62 @@ IntrinsicCalibrationPresenter, PresenterState, ) -from caliscope.packets import FramePacket +from caliscope.packets import FramePacket, PointPacket logger = logging.getLogger(__name__) -class FrameProcessingThread(QThread): +@dataclass +class OverlaySettings: + """User-toggleable overlay visibility.""" + + show_current_points: bool = True + show_accumulated: bool = True + show_selected_grids: bool = True + + +class FrameRenderThread(QThread): """Processes raw frames for display - runs off GUI thread. Reads directly from Presenter's display_queue (no intermediate signal). Applies display transforms and emits QPixmaps for the GUI thread. + Handles overlay rendering for point visualization layers. """ pixmap_ready = Signal(QPixmap) + # Overlay colors (BGR format for OpenCV) + CURRENT_POINTS_COLOR = (0, 0, 220) # Red + ACCUMULATED_COLOR = (128, 128, 0) # Teal + SELECTED_GRIDS_COLOR = (255, 200, 0) # Bright cyan + def __init__( self, display_queue: Queue[FramePacket | None], camera: CameraData, + presenter: IntrinsicCalibrationPresenter, pixmap_edge_length: int = 500, parent: QThread | None = None, ): super().__init__(parent) self._display_queue = display_queue self._camera = camera + self._presenter = presenter self._pixmap_edge_length = pixmap_edge_length self._undistort_enabled = False self._visualizer: LensModelVisualizer | None = None self._keep_running = Event() + self._overlay_settings = OverlaySettings() + + # Cache last packet for re-rendering when overlay settings change + self._last_packet: FramePacket | None = None + + # Compute overlay sizes based on image dimensions + width = camera.size[0] + self._accumulated_radius = max(4, width // 400) + self._grid_line_thickness = max(2, width // 600) + self._current_point_radius = max(5, width // 300) + self._current_point_thickness = max(2, width // 500) def set_undistort(self, enabled: bool, calibrated_camera: CameraData | None) -> None: """Enable/disable undistortion.""" @@ -70,6 +102,17 @@ def set_undistort(self, enabled: bool, calibrated_camera: CameraData | None) -> self._visualizer = LensModelVisualizer(calibrated_camera) logger.info(f"Created LensModelVisualizer for port {calibrated_camera.port}") + def set_overlay_visibility( + self, + current_points: bool, + accumulated: bool, + selected_grids: bool, + ) -> None: + """Configure which overlay layers to show.""" + self._overlay_settings.show_current_points = current_points + self._overlay_settings.show_accumulated = accumulated + self._overlay_settings.show_selected_grids = selected_grids + @property def shows_boundary(self) -> bool: """True if the visualizer draws the original frame boundary.""" @@ -81,10 +124,115 @@ def stop(self) -> None: """Signal thread to stop.""" self._keep_running.clear() + def rerender_cached(self) -> None: + """Re-render the last packet with current overlay settings. + + Call this when overlay visibility changes instead of requesting + a new frame from the presenter. + """ + if self._last_packet is not None: + self._render_packet(self._last_packet) + + def _draw_current_points(self, frame: NDArray[Any], points: PointPacket) -> NDArray[Any]: + """Draw current frame's detected points as red circles.""" + for x, y in points.img_loc: + cv2.circle( + frame, + (int(x), int(y)), + self._current_point_radius, + self.CURRENT_POINTS_COLOR, + self._current_point_thickness, + ) + return frame + + def _draw_accumulated(self, frame: NDArray[Any]) -> NDArray[Any]: + """Draw accumulated points as semi-transparent teal circles.""" + collected = self._presenter.collected_points + if not collected: + return frame + + overlay = frame.copy() + for _, points in collected: + for x, y in points.img_loc: + cv2.circle( + overlay, + (int(x), int(y)), + self._accumulated_radius, + self.ACCUMULATED_COLOR, + -1, + ) + + return cv2.addWeighted(overlay, 0.5, frame, 0.5, 0) + + def _draw_selected_grids(self, frame: NDArray[Any]) -> NDArray[Any]: + """Draw grids for ALL selected calibration frames at once (coverage map).""" + selected = self._presenter.selected_frame_indices + if selected is None: + return frame + + selected_set = set(selected) + connectivity = self._presenter.board_connectivity + + # Draw grids for all selected frames simultaneously + for frame_idx, points in self._presenter.collected_points: + if frame_idx not in selected_set: + continue + + id_to_loc = {int(pid): (int(x), int(y)) for pid, (x, y) in zip(points.point_id, points.img_loc)} + + for id_a, id_b in connectivity: + if id_a in id_to_loc and id_b in id_to_loc: + cv2.line( + frame, + id_to_loc[id_a], + id_to_loc[id_b], + self.SELECTED_GRIDS_COLOR, + self._grid_line_thickness, + ) + + return frame + + def _render_packet(self, packet: FramePacket) -> None: + """Render a packet with current overlay settings and emit pixmap.""" + if packet.frame is None: + return + + # Start with raw frame (View owns all rendering) + frame = packet.frame.copy() + + # Layer 1: Accumulated points (behind current) + if self._overlay_settings.show_accumulated: + frame = self._draw_accumulated(frame) + + # Layer 2: Selected board grids (coverage map) + if self._overlay_settings.show_selected_grids: + frame = self._draw_selected_grids(frame) + + # Layer 3: Current frame points (on top) + if self._overlay_settings.show_current_points and packet.points is not None: + frame = self._draw_current_points(frame, packet.points) + + # Undistortion + if self._undistort_enabled and self._visualizer is not None: + frame = self._visualizer.undistort(frame) + + frame = resize_to_square(frame) + frame = apply_rotation(frame, self._camera.rotation_count) + + image = cv2_to_qlabel(frame) + pixmap = QPixmap.fromImage(image) + pixmap = pixmap.scaled( + self._pixmap_edge_length, + self._pixmap_edge_length, + Qt.AspectRatioMode.KeepAspectRatio, + ) + + self.pixmap_ready.emit(pixmap) + def run(self) -> None: - """Main processing loop - reads directly from Presenter's queue.""" + """Main render loop - reads directly from Presenter's queue.""" self._keep_running.set() - logger.debug(f"Frame processing thread started for port {self._camera.port}") + logger.debug(f"Frame render thread started for port {self._camera.port}") while self._keep_running.is_set(): try: @@ -100,28 +248,12 @@ def run(self) -> None: if packet.frame is None: continue - # Processing pipeline (mirrors PlaybackFrameEmitter) - frame = packet.frame_with_points - if frame is None: - continue - - if self._undistort_enabled and self._visualizer is not None: - frame = self._visualizer.undistort(frame) + # Cache for re-rendering on overlay toggle + self._last_packet = packet - frame = resize_to_square(frame) - frame = apply_rotation(frame, self._camera.rotation_count) + self._render_packet(packet) - image = cv2_to_qlabel(frame) - pixmap = QPixmap.fromImage(image) - pixmap = pixmap.scaled( - self._pixmap_edge_length, - self._pixmap_edge_length, - Qt.AspectRatioMode.KeepAspectRatio, - ) - - self.pixmap_ready.emit(pixmap) - - logger.debug(f"Frame processing thread exiting for port {self._camera.port}") + logger.debug(f"Frame render thread exiting for port {self._camera.port}") class IntrinsicCalibrationDevView(QWidget): @@ -144,7 +276,7 @@ def __init__( self._user_dragging = False self._setup_ui() - self._setup_processing_thread() + self._setup_render_thread() self._connect_signals() # Initial UI state @@ -201,14 +333,36 @@ def _setup_ui(self) -> None: layout.addLayout(controls) - def _setup_processing_thread(self) -> None: - """Create and start the frame processing thread.""" - self._processing_thread = FrameProcessingThread( + # Overlay controls row + overlay_row = QHBoxLayout() + + self._current_points_cb = QCheckBox("Current Points") + self._current_points_cb.setChecked(True) + self._current_points_cb.toggled.connect(self._on_overlay_toggled) + overlay_row.addWidget(self._current_points_cb) + + self._accumulated_cb = QCheckBox("All Points") + self._accumulated_cb.setChecked(True) + self._accumulated_cb.toggled.connect(self._on_overlay_toggled) + overlay_row.addWidget(self._accumulated_cb) + + self._grids_cb = QCheckBox("Selected Grids") + self._grids_cb.setChecked(True) + self._grids_cb.setEnabled(False) # Enable after calibration + self._grids_cb.toggled.connect(self._on_overlay_toggled) + overlay_row.addWidget(self._grids_cb) + + layout.addLayout(overlay_row) + + def _setup_render_thread(self) -> None: + """Create and start the frame render thread.""" + self._render_thread = FrameRenderThread( display_queue=self._presenter.display_queue, camera=self._presenter.camera, + presenter=self._presenter, ) - self._processing_thread.pixmap_ready.connect(self._on_pixmap_ready) - self._processing_thread.start() + self._render_thread.pixmap_ready.connect(self._on_pixmap_ready) + self._render_thread.start() def _connect_signals(self) -> None: """Connect presenter signals to view slots.""" @@ -283,22 +437,34 @@ def _on_calibrate_clicked(self) -> None: if state == PresenterState.COLLECTING: self._presenter.stop_calibration() elif state in (PresenterState.READY, PresenterState.CALIBRATED): - # Reset undistort when starting new calibration + # Reset display state when starting new calibration self._undistort_checkbox.setChecked(False) + self._grids_cb.setChecked(True) + self._grids_cb.setEnabled(False) self._presenter.start_calibration() def _on_undistort_toggled(self, checked: bool) -> None: """Handle undistort checkbox toggle.""" - self._processing_thread.set_undistort(checked, self._presenter.calibrated_camera) + self._render_thread.set_undistort(checked, self._presenter.calibrated_camera) # Show/hide boundary legend based on whether boundary is drawn - if self._processing_thread.shows_boundary: + if self._render_thread.shows_boundary: self._boundary_legend.show() else: self._boundary_legend.hide() - # Request fresh frame to show undistort effect - self._presenter.refresh_display() + # Re-render cached frame with new settings (don't jump to frame 0) + self._render_thread.rerender_cached() + + def _on_overlay_toggled(self) -> None: + """Handle overlay checkbox toggles.""" + self._render_thread.set_overlay_visibility( + current_points=self._current_points_cb.isChecked(), + accumulated=self._accumulated_cb.isChecked(), + selected_grids=self._grids_cb.isChecked(), + ) + # Re-render cached frame with new settings (don't jump to frame 0) + self._render_thread.rerender_cached() def _on_calibration_complete(self, calibrated_camera: CameraData) -> None: """Handle successful calibration.""" @@ -309,12 +475,15 @@ def _on_calibration_complete(self, calibrated_camera: CameraData) -> None: # Auto-enable undistortion to show calibration effect self._undistort_checkbox.setChecked(True) + # Enable selected grids overlay now that selection is available + self._grids_cb.setEnabled(True) + def _on_calibration_failed(self, error_msg: str) -> None: """Handle calibration failure.""" self._status_label.setText(f"Status: FAILED - {error_msg}") def closeEvent(self, event) -> None: """Clean up on close.""" - self._processing_thread.stop() - self._processing_thread.wait(2000) + self._render_thread.stop() + self._render_thread.wait(2000) super().closeEvent(event) diff --git a/src/caliscope/managers/synchronized_stream_manager.py b/src/caliscope/managers/synchronized_stream_manager.py index a2678253..7ee95ee3 100644 --- a/src/caliscope/managers/synchronized_stream_manager.py +++ b/src/caliscope/managers/synchronized_stream_manager.py @@ -7,7 +7,7 @@ from caliscope.cameras.camera_array import CameraData from caliscope.cameras.synchronizer import Synchronizer from caliscope.tracker import Tracker -from caliscope.recording import FramePacketPublisher, create_publisher +from caliscope.recording import FramePacketStreamer, create_streamer from caliscope.recording.video_recorder import VideoRecorder from caliscope.trackers.charuco_tracker import CharucoTracker @@ -27,7 +27,7 @@ class SynchronizedStreamManager: See CLAUDE.md "Planned Refactor: SynchronizedStreamManager" for architecture. Current responsibilities: - - Create FramePacketPublisher per camera + - Create FramePacketStreamer per camera - Create Synchronizer for frame alignment - Create VideoRecorder for output """ @@ -48,7 +48,7 @@ def __init__( self.load_video_properties() # Initialized lazily in process_streams() - self.publishers: dict[int, FramePacketPublisher] = {} + self.streamers: dict[int, FramePacketStreamer] = {} self.synchronizer: Synchronizer | None = None self.recorder: VideoRecorder | None = None @@ -62,21 +62,21 @@ def process_streams(self, fps_target: int | None = None, include_video: bool = T if fps_target is None: fps_target = int(self.mean_fps) - # Create publishers with fps_target - self.publishers = {} + # Create streamers with fps_target + self.streamers = {} for camera in self.all_camera_data.values(): - publisher = create_publisher( + streamer = create_streamer( video_directory=self.recording_dir, port=camera.port, rotation_count=camera.rotation_count, tracker=self.tracker, fps_target=fps_target, - break_on_last=True, + end_behavior="stop", # Stop at end for batch processing ) - self.publishers[camera.port] = publisher + self.streamers[camera.port] = streamer - logger.info(f"Creating synchronizer based off of publishers: {self.publishers}") - self.synchronizer = Synchronizer(self.publishers) + logger.info(f"Creating synchronizer based off of streamers: {self.streamers}") + self.synchronizer = Synchronizer(self.streamers) self.recorder = VideoRecorder(self.synchronizer, suffix=self.subfolder_name) logger.info(f"beginning to create recording for files saved to {self.output_dir}") @@ -87,9 +87,9 @@ def process_streams(self, fps_target: int | None = None, include_video: bool = T store_point_history=True, ) - logger.info(f"About to start playing video publishers. Publishers: {self.publishers}") - for port, publisher in self.publishers.items(): - publisher.start() + logger.info(f"About to start playing video streamers: {self.streamers}") + for port, streamer in self.streamers.items(): + streamer.start() def load_video_properties(self): fps = [] diff --git a/src/caliscope/recording/__init__.py b/src/caliscope/recording/__init__.py index bbfa342b..04958734 100644 --- a/src/caliscope/recording/__init__.py +++ b/src/caliscope/recording/__init__.py @@ -1,15 +1,15 @@ -"""Recording module - video I/O, timing, and publishing.""" +"""Recording module - video I/O, timing, and streaming.""" -from caliscope.recording.frame_packet_publisher import ( - FramePacketPublisher, - create_publisher, +from caliscope.recording.frame_packet_streamer import ( + FramePacketStreamer, + create_streamer, ) from caliscope.recording.frame_source import FrameSource from caliscope.recording.frame_timestamps import FrameTimestamps __all__ = [ - "FramePacketPublisher", + "FramePacketStreamer", "FrameSource", "FrameTimestamps", - "create_publisher", + "create_streamer", ] diff --git a/src/caliscope/recording/frame_packet_publisher.py b/src/caliscope/recording/frame_packet_streamer.py similarity index 72% rename from src/caliscope/recording/frame_packet_publisher.py rename to src/caliscope/recording/frame_packet_streamer.py index 3e281dfe..55f0e930 100644 --- a/src/caliscope/recording/frame_packet_publisher.py +++ b/src/caliscope/recording/frame_packet_streamer.py @@ -1,8 +1,25 @@ -"""Publisher for FramePackets from recorded video. +"""Streamer for FramePackets from recorded video. -FramePacketPublisher wraps a FrameSource and FrameTimestamps, adding threading, +FramePacketStreamer wraps a FrameSource and FrameTimestamps, adding threading, pub/sub broadcasting, and optional tracking. It's the streaming layer on top of the raw video I/O. + +Seeking and Pause Handling (lessons learned): +--------------------------------------------- +1. **Seek failure must not kill the streamer**: When get_frame() returns None + (e.g., PyAV can't decode near EOF), we must set skip_read=True to stay at + the current position. Otherwise, the next read_frame() call has undefined + behavior after a seek and will likely return None, causing premature exit. + +2. **Condition variable for responsive pause loop**: The pause loop uses + _seek_condition.wait() instead of plain sleep. This allows seek_to() calls + to wake the loop immediately via notify(), making scrubbing responsive. + Without this, there's up to 0.1s latency on each seek while paused. + +3. **last_frame_index uses minimum of timestamps and source**: FrameTimestamps + may have entries for frames that aren't actually accessible in the video + file. We use min(timestamps, source) to ensure we don't try to seek beyond + what's actually readable. """ import logging @@ -10,6 +27,7 @@ from queue import Queue from threading import Condition, Event, Lock, Thread from time import perf_counter, sleep +from typing import Literal import numpy as np @@ -23,8 +41,8 @@ logger = logging.getLogger(__name__) -class FramePacketPublisher: - """Publishes FramePackets from recorded video to subscriber queues. +class FramePacketStreamer: + """Streams FramePackets from recorded video to subscriber queues. Wraps FrameSource (video I/O) and FrameTimestamps (timing), adding: - Pub/sub broadcasting to multiple queues @@ -47,9 +65,9 @@ def __init__( rotation_count: int = 0, tracker: Tracker | None = None, fps_target: float | None = None, - break_on_last: bool = True, + end_behavior: Literal["stop", "pause"] = "stop", ) -> None: - """Initialize the publisher. + """Initialize the streamer. Args: frame_source: Video I/O wrapper (provides frames). @@ -57,13 +75,14 @@ def __init__( rotation_count: Camera rotation (0, 1, 2, 3 for 0/90/180/270 degrees). tracker: Optional tracker for landmark detection. fps_target: Target playback FPS. None = unlimited (as fast as possible). - break_on_last: If True, stop at last frame. If False, pause at last frame. + end_behavior: What to do at last frame. "stop" broadcasts EOF and exits, + "pause" auto-pauses for interactive scrubbing. """ self._frame_source = frame_source self._frame_timestamps = frame_timestamps self._rotation_count = rotation_count self._tracker = tracker - self._break_on_last = break_on_last + self._end_behavior = end_behavior # FPS targeting self._fps_target = fps_target @@ -80,10 +99,10 @@ def __init__( self._pause_event = Event() self._pause_event.clear() - # Jump request: (frame_index, exact) - latest wins - self._pending_jump: tuple[int, bool] | None = None - self._jump_lock = Lock() - self._jump_condition = Condition(self._jump_lock) + # Seek request: (frame_index, exact) - latest wins + self._pending_seek: tuple[int, bool] | None = None + self._seek_lock = Lock() + self._seek_condition = Condition(self._seek_lock) # Current position self._frame_index = frame_timestamps.start_frame_index @@ -119,8 +138,13 @@ def start_frame_index(self) -> int: @property def last_frame_index(self) -> int: - """Last valid frame index.""" - return self._frame_timestamps.last_frame_index + """Last valid frame index (minimum of timestamps and source). + + FrameTimestamps may have entries for frames that aren't actually + accessible in the video file. We use min() to ensure we don't try + to seek beyond what's actually readable. + """ + return min(self._frame_timestamps.last_frame_index, self._frame_source.last_frame_index) @property def frame_index(self) -> int: @@ -139,15 +163,15 @@ def frame_time(self) -> float: def subscribe(self, queue: Queue) -> None: """Add a queue to receive FramePackets. - Thread-safe. Notifies waiting publisher if this is the first subscriber. + Thread-safe. Notifies waiting streamer if this is the first subscriber. """ with self._subscriber_condition: if queue not in self._subscribers: - logger.info(f"Adding subscriber to publisher at port {self.port}") + logger.info(f"Adding subscriber to streamer at port {self.port}") self._subscribers.append(queue) self._subscriber_condition.notify() else: - logger.warning(f"Attempted duplicate subscription at port {self.port}") + logger.warning(f"Attempted duplicate subscription to streamer at port {self.port}") def unsubscribe(self, queue: Queue) -> None: """Remove a queue from receiving FramePackets. @@ -156,7 +180,7 @@ def unsubscribe(self, queue: Queue) -> None: """ with self._subscriber_lock: if queue in self._subscribers: - logger.info(f"Removing subscriber from publisher at port {self.port}") + logger.info(f"Removing subscriber from streamer at port {self.port}") self._subscribers.remove(queue) else: logger.warning(f"Attempted to unsubscribe non-existent queue at port {self.port}") @@ -196,41 +220,41 @@ def _wait_for_subscribers(self, token: CancellationToken) -> bool: def pause(self) -> None: """Pause playback.""" - logger.info(f"Pausing publisher at port {self.port}") + logger.info(f"Pausing streamer at port {self.port}") self._pause_event.set() def unpause(self) -> None: """Resume playback.""" - logger.info(f"Unpausing publisher at port {self.port}") + logger.info(f"Unpausing streamer at port {self.port}") self._pause_event.clear() - def jump_to(self, frame_index: int, exact: bool = True) -> None: + def seek_to(self, frame_index: int, precise: bool = True) -> None: """Request a seek to the specified frame. Args: frame_index: Target frame to seek to. - exact: If True, decode to exact frame. If False, seek to nearest - keyframe only (faster for scrubbing). + precise: If True, decode to exact frame (slower). If False, seek to + nearest keyframe only (faster for scrubbing). - Note: If multiple jump requests arrive before processing, only the - latest is honored (previous pending jumps are dropped). + Note: If multiple seek requests arrive before processing, only the + latest is honored (previous pending seeks are dropped). """ - with self._jump_condition: - logger.info(f"Setting pending jump to frame {frame_index} (exact={exact})") - self._pending_jump = (frame_index, exact) - self._jump_condition.notify() - - def _has_pending_jump(self) -> bool: - """Check for pending jump request.""" - with self._jump_lock: - return self._pending_jump is not None - - def _take_pending_jump(self) -> tuple[int, bool] | None: - """Take and clear the pending jump request.""" - with self._jump_lock: - jump = self._pending_jump - self._pending_jump = None - return jump + with self._seek_condition: + logger.info(f"Setting pending seek to frame {frame_index} (precise={precise})") + self._pending_seek = (frame_index, precise) + self._seek_condition.notify() + + def _has_pending_seek(self) -> bool: + """Check for pending seek request.""" + with self._seek_lock: + return self._pending_seek is not None + + def _take_pending_seek(self) -> tuple[int, bool] | None: + """Take and clear the pending seek request.""" + with self._seek_lock: + seek = self._pending_seek + self._pending_seek = None + return seek # ------------------------------------------------------------------------- # FPS Timing @@ -256,7 +280,7 @@ def _wait_to_next_frame(self) -> float: def start(self) -> None: """Start playback in a new thread. Call stop() to terminate.""" - logger.info(f"Starting publisher for port {self.port}") + logger.info(f"Starting streamer for port {self.port}") self._internal_token = CancellationToken() self._thread = Thread( target=self.play_worker, @@ -272,7 +296,7 @@ def stop(self) -> None: if self._thread is not None: self._thread.join(timeout=2.0) self._thread = None - logger.info(f"Stopped publisher for port {self.port}") + logger.info(f"Stopped streamer for port {self.port}") def close(self) -> None: """Release all resources.""" @@ -357,7 +381,7 @@ def play_worker(self, token: CancellationToken, handle: TaskHandle | None = None # Handle last frame if self._frame_index == self.last_frame_index: - if self._break_on_last: + if self._end_behavior == "stop": logger.info(f"Reached last frame at port {self.port}") eof_packet = FramePacket( port=self.port, @@ -372,47 +396,57 @@ def play_worker(self, token: CancellationToken, handle: TaskHandle | None = None # Auto-pause at end for interactive mode self._pause_event.set() - # Handle pause (check for jumps while paused) + # Handle pause (check for seeks while paused) while self._pause_event.is_set() and not token.is_cancelled: - if self._has_pending_jump(): + if self._has_pending_seek(): break - token.sleep_unless_cancelled(0.1) + # Wait on condition variable - wakes immediately when seek_to() calls notify() + with self._seek_condition: + self._seek_condition.wait(timeout=0.1) - # Process pending jump - pending = self._take_pending_jump() + # Process pending seek + pending = self._take_pending_seek() if pending is not None: - target_index, exact = pending - logger.info(f"Processing jump to frame {target_index} (exact={exact}) at port {self.port}") + target_index, precise = pending + logger.info(f"Processing seek to frame {target_index} (precise={precise}) at port {self.port}") - if exact: + if precise: frame = self._frame_source.get_frame(target_index) if frame is not None: current_frame = frame self._frame_index = target_index skip_read = True + else: + # Seek failed - stay at current position, don't call read_frame() + logger.warning(f"Seek to frame {target_index} failed at port {self.port}") + skip_read = True else: - frame, actual_index = self._frame_source.get_frame_fast(target_index) + frame, actual_index = self._frame_source.get_nearest_keyframe(target_index) if frame is not None: current_frame = frame self._frame_index = actual_index skip_read = True + else: + # Fast seek failed - stay at current position + logger.warning(f"Fast seek to frame {target_index} failed at port {self.port}") + skip_read = True else: - # Increment for next iteration (only if not jumping) + # Increment for next iteration (only if not seeking) self._frame_index += 1 finally: - logger.info(f"Publisher worker exiting for port {self.port}") + logger.info(f"Streamer worker exiting for port {self.port}") -def create_publisher( +def create_streamer( video_directory: Path, port: int, rotation_count: int = 0, tracker: Tracker | None = None, fps_target: float | None = None, - break_on_last: bool = True, -) -> FramePacketPublisher: - """Factory function to create a FramePacketPublisher. + end_behavior: Literal["stop", "pause"] = "stop", +) -> FramePacketStreamer: + """Factory function to create a FramePacketStreamer. Convenience function that handles FrameSource and FrameTimestamps creation. @@ -422,10 +456,10 @@ def create_publisher( rotation_count: Camera rotation (0, 1, 2, 3). tracker: Optional tracker for landmark detection. fps_target: Target FPS. None = unlimited. - break_on_last: If True, stop at last frame. If False, pause. + end_behavior: What to do at last frame. "stop" or "pause". Returns: - Configured FramePacketPublisher ready to start(). + Configured FramePacketStreamer ready to start(). """ frame_source = FrameSource(video_directory, port) @@ -435,11 +469,11 @@ def create_publisher( else: frame_timestamps = FrameTimestamps.inferred(frame_source.fps, frame_source.frame_count) - return FramePacketPublisher( + return FramePacketStreamer( frame_source=frame_source, frame_timestamps=frame_timestamps, rotation_count=rotation_count, tracker=tracker, fps_target=fps_target, - break_on_last=break_on_last, + end_behavior=end_behavior, ) diff --git a/src/caliscope/recording/frame_source.py b/src/caliscope/recording/frame_source.py index 03394c6c..d4695ccc 100644 --- a/src/caliscope/recording/frame_source.py +++ b/src/caliscope/recording/frame_source.py @@ -5,6 +5,26 @@ This is infrastructure (I/O), not domain logic - hence placement in recording/ rather than core/. + +PyAV/FFmpeg Quirks (lessons learned): +------------------------------------- +1. **PTS-to-frame-index calculation**: Must use `round()` not `int()`. + The formula `frame.pts * time_base * fps` has floating point precision + issues that cause `int()` to truncate incorrectly near frame boundaries. + Example: PTS 285696 * time_base * fps = 557.8125 → int() gives 557, but + round() correctly gives 558. + +2. **Metadata frame count vs actual frames**: The container's frame count + (from duration or stream.frames) may not match actually accessible frames. + We scan keyframes at init to find the true last accessible frame index. + +3. **skip_frame="NONKEY" corrupts decoder state**: After using skip_frame + to scan keyframes, non-keyframe decoding fails silently. The container + must be closed and reopened to restore normal behavior. + +4. **Seeking near EOF**: PyAV's seek can fail silently near end of video, + especially with B-frame encoded content. The keyframe index helps us + know which frames are actually reachable. """ import logging @@ -29,7 +49,7 @@ class FrameSource: Typical usage: one owner thread, or explicit external synchronization. - Note: get_frame() and get_frame_fast() invalidate the sequential read + Note: get_frame() and get_nearest_keyframe() invalidate the sequential read position. After calling either, read_frame() behavior is undefined until the internal iterator is naturally exhausted or a new FrameSource is created. """ @@ -66,14 +86,26 @@ def __init__(self, video_directory: Path, port: int) -> None: self.fps = float(self._video_stream.average_rate) self.size = (self._video_stream.width, self._video_stream.height) - # Compute frame count - stream.frames may be 0 if unknown + # Build keyframe index and find actual last frame + # This solves PyAV/FFmpeg issues where metadata frame count doesn't match + # accessible frames via seeking (especially near EOF with B-frames) + self._keyframe_pts: list[int] = [] + self._keyframe_indices: list[int] = [] + self._actual_last_frame_index = self._build_frame_index() + + # frame_count from metadata (may differ from actual accessible frames) frame_count = self._video_stream.frames if frame_count == 0 and self._container.duration is not None: - # duration is in microseconds duration_seconds = self._container.duration / 1_000_000 frame_count = int(duration_seconds * self.fps) self.frame_count = frame_count + logger.debug( + f"FrameSource for port {port}: metadata says {frame_count} frames, " + f"actual last accessible frame is {self._actual_last_frame_index}, " + f"found {len(self._keyframe_pts)} keyframes" + ) + # Mark as successfully initialized - must be last line of __init__ # If init fails, _closed won't exist and __del__ won't warn self._closed = False @@ -85,8 +117,65 @@ def start_frame_index(self) -> int: @property def last_frame_index(self) -> int: - """Last valid frame index (frame_count - 1).""" - return self.frame_count - 1 + """Last valid frame index (actual accessible frame, not metadata estimate).""" + return self._actual_last_frame_index + + def _build_frame_index(self) -> int: + """Scan video to build keyframe index and find actual last frame. + + Uses skip_frame="NONKEY" to quickly iterate keyframes only. + Records keyframe positions for potential future use in smarter seeking. + + Returns: + Actual last accessible frame index. + + Note: + Called during __init__ before _container could be set to None. + Reopens container after scan because skip_frame corrupts decoder state. + """ + # Container is guaranteed valid here - called from __init__ after open + assert self._container is not None + + # Set decoder to skip non-keyframes for fast scanning + self._video_stream.codec_context.skip_frame = "NONKEY" + + max_frame_idx = 0 + try: + for frame in self._container.decode(self._video_stream): + if frame.pts is None: + continue + frame_idx = round(frame.pts * self._time_base * self.fps) + self._keyframe_pts.append(frame.pts) + self._keyframe_indices.append(frame_idx) + max_frame_idx = max(max_frame_idx, frame_idx) + except Exception as e: + logger.warning(f"Error during keyframe scan: {e}") + + # Close and reopen container - skip_frame corrupts decoder state + # such that non-keyframes become inaccessible after scanning + self._container.close() + self._container = av.open(str(self.video_path)) + self._video_stream = self._container.streams.video[0] + self._frame_iterator = None + + # Find actual last accessible frame by seeking from last keyframe + # The last keyframe may not be the last accessible frame + if self._keyframe_indices: + last_keyframe = self._keyframe_indices[-1] + target_pts = int(last_keyframe / self.fps / self._time_base) + self._container.seek(target_pts, stream=self._video_stream) + + # Decode forward from last keyframe to find true last frame + for frame in self._container.decode(self._video_stream): + if frame.pts is None: + continue + frame_idx = round(frame.pts * self._time_base * self.fps) + max_frame_idx = max(max_frame_idx, frame_idx) + + # Reset to beginning for normal use + self._container.seek(0, stream=self._video_stream) + + return max_frame_idx def get_frame(self, frame_index: int) -> np.ndarray | None: """Seek to exact frame and return it as BGR numpy array. @@ -121,14 +210,14 @@ def get_frame(self, frame_index: int) -> np.ndarray | None: for frame in self._container.decode(self._video_stream): if frame.pts is None: continue # Skip frames without PTS - frame_idx = int(frame.pts * self._time_base * self.fps) + frame_idx = round(frame.pts * self._time_base * self.fps) if frame_idx >= frame_index: return frame.to_ndarray(format="bgr24") return None - def get_frame_fast(self, frame_index: int) -> tuple[np.ndarray | None, int]: - """Seek to nearest keyframe and return it with actual index. + def get_nearest_keyframe(self, frame_index: int) -> tuple[np.ndarray | None, int]: + """Seek to nearest keyframe at or before target. Fast seeking for scrubbing - returns the keyframe at or before target, without decoding forward to the exact frame. O(1) complexity for @@ -163,7 +252,7 @@ def get_frame_fast(self, frame_index: int) -> tuple[np.ndarray | None, int]: for frame in self._container.decode(self._video_stream): if frame.pts is None: continue - actual_idx = int(frame.pts * self._time_base * self.fps) + actual_idx = round(frame.pts * self._time_base * self.fps) return frame.to_ndarray(format="bgr24"), actual_idx return None, -1 @@ -178,7 +267,7 @@ def read_frame(self) -> np.ndarray | None: Frame as BGR numpy array, or None at end of file. Note: - Position is undefined after get_frame() or get_frame_fast() calls. + Position is undefined after get_frame() or get_nearest_keyframe() calls. For predictable sequential reading, use a fresh FrameSource or read all frames without seeking. """ diff --git a/tests/test_frame_source.py b/tests/test_frame_source.py index 2e31a057..b42f4dbf 100644 --- a/tests/test_frame_source.py +++ b/tests/test_frame_source.py @@ -36,9 +36,13 @@ def test_start_frame_index_is_zero(self, frame_source: FrameSource) -> None: """start_frame_index is always 0 for raw video.""" assert frame_source.start_frame_index == 0 - def test_last_frame_index(self, frame_source: FrameSource) -> None: - """last_frame_index is frame_count - 1.""" - assert frame_source.last_frame_index == frame_source.frame_count - 1 + def test_last_frame_index_is_accessible(self, frame_source: FrameSource) -> None: + """last_frame_index returns an accessible frame, not metadata estimate.""" + # The actual last frame should be accessible + frame = frame_source.get_frame(frame_source.last_frame_index) + assert frame is not None + # Should not exceed metadata estimate + assert frame_source.last_frame_index <= frame_source.frame_count - 1 def test_port_stored(self, frame_source: FrameSource) -> None: """port is stored from constructor.""" @@ -137,47 +141,47 @@ def test_get_frame_different_positions_return_different_frames(self, frame_sourc assert not np.array_equal(frame_start, frame_middle) -class TestFastFrameAccess: - """Test get_frame_fast() keyframe seeking.""" +class TestKeyframeAccess: + """Test get_nearest_keyframe() keyframe seeking.""" - def test_get_frame_fast_returns_tuple(self, frame_source: FrameSource) -> None: - """get_frame_fast() returns (frame, actual_index) tuple.""" - result = frame_source.get_frame_fast(10) + def test_get_nearest_keyframe_returns_tuple(self, frame_source: FrameSource) -> None: + """get_nearest_keyframe() returns (frame, actual_index) tuple.""" + result = frame_source.get_nearest_keyframe(10) assert isinstance(result, tuple) assert len(result) == 2 frame, actual_idx = result assert frame is not None assert isinstance(actual_idx, int) - def test_get_frame_fast_actual_index_at_or_before_target(self, frame_source: FrameSource) -> None: - """get_frame_fast() returns keyframe at or before target.""" + def test_get_nearest_keyframe_actual_index_at_or_before_target(self, frame_source: FrameSource) -> None: + """get_nearest_keyframe() returns keyframe at or before target.""" target = frame_source.frame_count // 2 - frame, actual_idx = frame_source.get_frame_fast(target) + frame, actual_idx = frame_source.get_nearest_keyframe(target) assert frame is not None # Actual index should be at or before target (keyframe) assert actual_idx <= target - def test_get_frame_fast_returns_valid_frame(self, frame_source: FrameSource) -> None: - """get_frame_fast() returns valid BGR array.""" - frame, _ = frame_source.get_frame_fast(10) + def test_get_nearest_keyframe_returns_valid_frame(self, frame_source: FrameSource) -> None: + """get_nearest_keyframe() returns valid BGR array.""" + frame, _ = frame_source.get_nearest_keyframe(10) assert frame is not None assert isinstance(frame, np.ndarray) assert frame.ndim == 3 assert frame.shape[2] == 3 - def test_get_frame_fast_beyond_end_returns_none_and_minus_one(self, frame_source: FrameSource) -> None: - """get_frame_fast() beyond video length returns (None, -1). + def test_get_nearest_keyframe_beyond_end_returns_none_and_minus_one(self, frame_source: FrameSource) -> None: + """get_nearest_keyframe() beyond video length returns (None, -1). Bounds checking prevents PyAV's wrap-around behavior. """ beyond = frame_source.frame_count + 100 - frame, actual_idx = frame_source.get_frame_fast(beyond) + frame, actual_idx = frame_source.get_nearest_keyframe(beyond) assert frame is None assert actual_idx == -1 - def test_get_frame_fast_negative_index_returns_none_and_minus_one(self, frame_source: FrameSource) -> None: - """get_frame_fast() with negative index returns (None, -1).""" - frame, actual_idx = frame_source.get_frame_fast(-1) + def test_get_nearest_keyframe_negative_index_returns_none_and_minus_one(self, frame_source: FrameSource) -> None: + """get_nearest_keyframe() with negative index returns (None, -1).""" + frame, actual_idx = frame_source.get_nearest_keyframe(-1) assert frame is None assert actual_idx == -1 @@ -222,7 +226,7 @@ def test_read_after_close_returns_none(self, frame_source: FrameSource) -> None: frame_source.close() assert frame_source.read_frame() is None assert frame_source.get_frame(0) is None - frame, idx = frame_source.get_frame_fast(0) + frame, idx = frame_source.get_nearest_keyframe(0) assert frame is None assert idx == -1 @@ -274,6 +278,6 @@ def test_negative_frame_index_returns_none(self, frame_source: FrameSource) -> N frame = source.get_frame(50) print(f"Frame 50 shape: {frame.shape if frame is not None else None}") - # Test fast seek - frame, idx = source.get_frame_fast(50) - print(f"Fast seek to 50: got frame at index {idx}") + # Test keyframe seek + frame, idx = source.get_nearest_keyframe(50) + print(f"Keyframe seek to 50: got frame at index {idx}") diff --git a/tests/test_intrinsic_calibrator.py b/tests/test_intrinsic_calibrator.py index 52c17b0f..801d0f79 100644 --- a/tests/test_intrinsic_calibrator.py +++ b/tests/test_intrinsic_calibrator.py @@ -10,7 +10,7 @@ from caliscope.core.intrinsic_calibrator import IntrinsicCalibrator from caliscope.cameras.camera_array import CameraData from caliscope.helper import copy_contents_to_clean_dest -from caliscope.recording import create_publisher +from caliscope.recording import create_streamer from caliscope.trackers.charuco_tracker import CharucoTracker logger = logging.getLogger(__name__) @@ -27,26 +27,26 @@ def test_intrinsic_calibrator(tmp_path: Path): charuco_tracker = CharucoTracker(charuco) - publisher = create_publisher( + streamer = create_streamer( video_directory=recording_directory, port=1, tracker=charuco_tracker, ) - camera = CameraData(port=0, size=publisher.size) # fresh camera for calibration + camera = CameraData(port=0, size=streamer.size) # fresh camera for calibration assert camera.rotation is None assert camera.translation is None assert camera.matrix is None assert camera.distortions is None - intrinsic_calibrator = IntrinsicCalibrator(camera, publisher) + intrinsic_calibrator = IntrinsicCalibrator(camera, streamer) frame_q = Queue() - publisher.subscribe(frame_q) + streamer.subscribe(frame_q) - publisher.start() - publisher.pause() + streamer.start() + streamer.pause() packet = frame_q.get() # pull off frame 0 to clear queue @@ -56,15 +56,15 @@ def test_intrinsic_calibrator(tmp_path: Path): test_frames = [3, 5, 7, 9, 20, 25] for i in test_frames: - publisher.jump_to(i) + streamer.seek_to(i) packet = frame_q.get() assert i == packet.frame_index logger.info(packet.frame_index) intrinsic_calibrator.add_frame_packet(packet) intrinsic_calibrator.add_calibration_frame_index(packet.frame_index) - publisher.unpause() - publisher.stop() + streamer.unpause() + streamer.stop() intrinsic_calibrator.stop_event.set() logger.info(camera.get_display_data()) @@ -92,27 +92,27 @@ def test_autopopulate_data(tmp_path: Path): charuco_tracker = CharucoTracker(charuco) - publisher = create_publisher( + streamer = create_streamer( video_directory=recording_directory, port=1, tracker=charuco_tracker, fps_target=100, # Fast playback for autopopulation ) - camera = CameraData(port=0, size=publisher.size) # fresh camera for calibration + camera = CameraData(port=0, size=streamer.size) # fresh camera for calibration assert camera.rotation is None assert camera.translation is None assert camera.matrix is None assert camera.distortions is None - intrinsic_calibrator = IntrinsicCalibrator(camera, publisher) + intrinsic_calibrator = IntrinsicCalibrator(camera, streamer) # handy way to peek into what is going on frame_q = Queue() - publisher.subscribe(frame_q) - publisher.start() - publisher.pause() + streamer.subscribe(frame_q) + streamer.start() + streamer.pause() _ = frame_q.get() # pull off frame 0 to clear queue @@ -125,8 +125,8 @@ def test_autopopulate_data(tmp_path: Path): target_grid_count=target_grid_count, ) - publisher.jump_to(0) - publisher.unpause() + streamer.seek_to(0) + streamer.unpause() while intrinsic_calibrator.auto_store_data.is_set(): actual_grid_count = len(intrinsic_calibrator.calibration_frame_indices) diff --git a/tests/test_stream.py b/tests/test_stream.py index 75e5e9cb..f47c41be 100644 --- a/tests/test_stream.py +++ b/tests/test_stream.py @@ -5,36 +5,36 @@ from caliscope import __root__ from caliscope.core.charuco import Charuco -from caliscope.recording import create_publisher +from caliscope.recording import create_streamer from caliscope.trackers.charuco_tracker import CharucoTracker logger = logging.getLogger(__name__) -def test_publisher(): +def test_streamer(): recording_directory = Path(__root__, "tests", "sessions", "post_monocal", "calibration", "extrinsic") charuco = Charuco(4, 5, 11, 8.5, aruco_scale=0.75, square_size_overide_cm=5.25, inverted=True) charuco_tracker = CharucoTracker(charuco) - publisher = create_publisher( + streamer = create_streamer( video_directory=recording_directory, port=1, tracker=charuco_tracker, fps_target=6, ) frame_q = Queue() - publisher.subscribe(frame_q) - publisher.start() - publisher.pause() + streamer.subscribe(frame_q) + streamer.start() + streamer.pause() sleep(1) - logger.info(f"Publisher frame index is {publisher.frame_index}") + logger.info(f"Publisher frame index is {streamer.frame_index}") sleep(1) - logger.info(f"Publisher frame index is {publisher.frame_index}") + logger.info(f"Publisher frame index is {streamer.frame_index}") - publisher.unpause() + streamer.unpause() while True: frame_packet = frame_q.get() @@ -42,29 +42,29 @@ def test_publisher(): if frame_packet.frame is None: break - if publisher.frame_index == 10: + if streamer.frame_index == 10: logger.info("Testing pause/unpause functionality") - publisher.pause() + streamer.pause() sleep(0.5) - test_index = publisher.frame_index + test_index = streamer.frame_index sleep(0.5) - # make sure that publisher doesn't advance with pause - assert test_index == publisher.frame_index - publisher.unpause() + # make sure that streamer doesn't advance with pause + assert test_index == streamer.frame_index + streamer.unpause() - if publisher.frame_index == 15: + if streamer.frame_index == 15: logger.info("Testing ability to jump forward") target_frame = 20 - publisher.pause() + streamer.pause() sleep(1) # need to make sure fps_target wait plays out - publisher.jump_to(target_frame) + streamer.seek_to(target_frame) sleep(1) # need to make sure fps_target wait plays out # frame_index should match the jump target (the frame we just displayed) - assert publisher.frame_index == 20 + assert streamer.frame_index == 20 logger.info(f"After attempting to jump to target frame {target_frame}") - publisher.unpause() + streamer.unpause() if __name__ == "__main__": - test_publisher() + test_streamer() diff --git a/tests/test_triangulator.py b/tests/test_triangulator.py index 5c53e350..cea4f562 100644 --- a/tests/test_triangulator.py +++ b/tests/test_triangulator.py @@ -18,7 +18,7 @@ from caliscope import __root__ from caliscope.cameras.synchronizer import Synchronizer from caliscope.helper import copy_contents_to_clean_dest -from caliscope.recording import create_publisher +from caliscope.recording import create_streamer from caliscope.trackers.charuco_tracker import CharucoTracker from caliscope.triangulate.sync_packet_triangulator import SyncPacketTriangulator from caliscope import persistence @@ -41,11 +41,11 @@ def test_triangulator(tmp_path: Path): charuco = persistence.load_charuco(tmp_path / "charuco.toml") charuco_tracker = CharucoTracker(charuco) - logger.info("Creating publishers based on calibration recordings") + logger.info("Creating streamers based on calibration recordings") - publishers = {} + streamers = {} for port, camera in camera_array.cameras.items(): - publishers[port] = create_publisher( + streamers[port] = create_streamer( video_directory=recording_directory, port=port, rotation_count=camera.rotation_count, @@ -54,7 +54,7 @@ def test_triangulator(tmp_path: Path): ) logger.info("Creating Synchronizer") - syncr = Synchronizer(publishers) + syncr = Synchronizer(streamers) #### Basic code for interfacing with in-progress RealTimeTriangulator #### Just run off of saved point_data.csv for development/testing @@ -65,7 +65,7 @@ def test_triangulator(tmp_path: Path): tracker_name=charuco_tracker.name, ) - for port, publisher in publishers.items(): + for port, publisher in streamers.items(): publisher.start() while real_time_triangulator.running: