Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 98 additions & 80 deletions src/caliscope/gui/presenters/intrinsic_calibration_presenter.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from caliscope.core.frame_selector import select_calibration_frames
from caliscope.core.point_data import ImagePoints
from caliscope.packets import FramePacket, PointPacket
from caliscope.recording import FramePacketPublisher, create_publisher
from caliscope.recording import create_publisher
from caliscope.task_manager.cancellation import CancellationToken
from caliscope.task_manager.task_handle import TaskHandle
from caliscope.task_manager.task_manager import TaskManager
Expand Down Expand Up @@ -56,11 +56,16 @@ class IntrinsicCalibrationPresenter(QObject):
submission of calibration to TaskManager. Exposes a display_queue for
the View's processing thread to consume directly (avoids GUI thread hop).

A single FramePacketPublisher lives across all states, enabling scrubbing
in READY/CALIBRATED states and collection during COLLECTING state.

Signals:
state_changed: Emitted when computed state changes. View updates UI.
calibration_complete: Emitted when calibration succeeds. Contains
a new CameraData with calibration results applied.
calibration_failed: Emitted when calibration fails. Contains error message.
frame_position_changed: Emitted when current frame changes (background thread,
Qt.AutoConnection queues to main thread).

Queue:
display_queue: View's processing thread reads FramePackets from here.
Expand All @@ -70,6 +75,7 @@ class IntrinsicCalibrationPresenter(QObject):
state_changed = Signal(PresenterState)
calibration_complete = Signal(CameraData) # Calibrated camera, ready to use
calibration_failed = Signal(str)
frame_position_changed = Signal(int) # Current frame index

def __init__(
self,
Expand Down Expand Up @@ -104,18 +110,41 @@ def __init__(
self._calibrated_camera: CameraData | None = None
self._calibration_task: TaskHandle | None = None

# Display queue for View consumption
self._display_queue: Queue[FramePacket | None] = Queue()

# Collection state
self._publisher: FramePacketPublisher | None = None
self._stream_handle: TaskHandle | None = None
self._frame_queue: Queue[FramePacket] = Queue() # From FramePacketPublisher
self._display_queue: Queue[FramePacket | None] = Queue() # For View consumption
self._consumer_thread: Thread | None = None
self._stop_event = Event()
self._is_collecting = False

# Load initial frame so View has something to display
# Single publisher for all states (scrubbing + collection)
self._publisher = create_publisher(
video_directory=self._video_path.parent,
port=self._camera.port,
rotation_count=self._camera.rotation_count,
tracker=self._tracker,
break_on_last=False, # Pause at end instead of exit
)
self._frame_queue: Queue[FramePacket] = Queue()
self._publisher.subscribe(self._frame_queue)

# Start publisher worker (will read first frame, then we pause)
self._stream_handle = self._task_manager.submit(
self._publisher.play_worker,
name=f"Publisher port {self._port}",
)
self._publisher.pause() # Immediately pause for scrubbing mode

# Guaranteed initial frame display (don't rely on thread timing)
self._load_initial_frame()

# Consumer thread runs continuously
self._stop_event = Event()
self._consumer_thread = Thread(target=self._consume_frames, daemon=True)
self._consumer_thread.start()

# Position tracking
self._current_frame_index: int = 0

@property
def state(self) -> PresenterState:
"""Compute current state from internal reality - never stale."""
Expand Down Expand Up @@ -148,6 +177,16 @@ def camera(self) -> CameraData:
"""Access original camera data for View's display setup."""
return self._camera

@property
def frame_count(self) -> int:
"""Total frames in video."""
return self._publisher.last_frame_index + 1

@property
def current_frame_index(self) -> int:
"""Current frame position."""
return self._current_frame_index

def refresh_display(self) -> None:
"""Put a fresh frame on the display queue.

Expand All @@ -156,6 +195,16 @@ def refresh_display(self) -> None:
"""
self._load_initial_frame()

def seek_to(self, frame_index: int) -> None:
"""Seek to frame. Works in READY/CALIBRATED states via publisher's jump_to."""
if self.state not in (PresenterState.READY, PresenterState.CALIBRATED):
return

frame_index = max(0, min(frame_index, self.frame_count - 1))
self._publisher.jump_to(
frame_index, exact=True
) # exact=False would cause Fast seek, skipping between keyframes

def _load_initial_frame(self) -> None:
"""Read first frame from video and put on display queue."""
cap = cv2.VideoCapture(str(self._video_path))
Expand All @@ -179,122 +228,83 @@ def _load_initial_frame(self) -> None:
def start_calibration(self) -> None:
"""Start collecting calibration frames.

Creates FramePacketPublisher with tracker, subscribes to queue,
and starts playback. Transitions to COLLECTING state.
Resets to beginning and unpauses the publisher.
"""
if self.state not in (PresenterState.READY, PresenterState.CALIBRATED):
logger.warning(f"Cannot start calibration in state {self.state}")
return

logger.info(f"Starting calibration collection for port {self._port}")

# Set collecting FIRST to block concurrent seeks
self._is_collecting = True
self._emit_state_changed()

# Clear previous attempt's data
self._collected_points.clear()
self._calibrated_camera = None
self._calibration_task = None
self._stream_handle = None

# Create publisher with tracker
self._publisher = create_publisher(
video_directory=self._video_path.parent,
port=self._camera.port,
rotation_count=self._camera.rotation_count,
tracker=self._tracker,
break_on_last=True,
)
self._publisher.subscribe(self._frame_queue)

# Start consumer thread
self._stop_event.clear()
self._consumer_thread = Thread(target=self._consume_frames, daemon=True)
self._consumer_thread.start()

# Mark as collecting before starting playback
self._is_collecting = True
self._emit_state_changed()

# Start playback via TaskManager
self._stream_handle = self._task_manager.submit(
self._publisher.play_worker,
name=f"Publisher port {self._port}",
)
# Reset to beginning and start playback
self._publisher.jump_to(0, exact=True)
self._publisher.unpause()

def stop_calibration(self) -> None:
"""Stop collection and return to READY state.

Stops the stream, clears accumulated data, and resets state.
Pauses playback and clears accumulated data.
"""
if self.state != PresenterState.COLLECTING:
logger.warning(f"Cannot stop calibration in state {self.state}")
return

logger.info(f"Stopping calibration collection for port {self._port}")

# Signal consumer to stop
self._stop_event.set()

# Stop stream playback via TaskHandle
if self._stream_handle is not None:
self._stream_handle.cancel()

# Wait for consumer thread
if self._consumer_thread is not None:
self._consumer_thread.join(timeout=2.0)
self._consumer_thread = None

# Clean up publisher
if self._publisher is not None:
self._publisher.unsubscribe(self._frame_queue)
self._publisher.close()
self._publisher = None

# Drain queue
while not self._frame_queue.empty():
try:
self._frame_queue.get_nowait()
except Empty:
break

# Clear state
self._publisher.pause()
self._collected_points.clear()
self._is_collecting = False
self._emit_state_changed()

# Reload initial frame for display
self._load_initial_frame()

def _consume_frames(self) -> None:
"""Pull frames from queue, accumulate points, emit for display.
"""Pull frames from queue, accumulate points when collecting, emit for display.

Runs in a separate thread. Exits when stop_event is set or
end-of-stream packet is received.
Runs continuously across all states. Exits when stop_event is set or
publisher is cancelled externally.
"""
logger.debug(f"Consumer thread started for port {self._port}")

while not self._stop_event.is_set():
# Exit if publisher was cancelled externally
if self._stream_handle is not None and self._stream_handle.state == TaskState.CANCELLED:
break

try:
packet: FramePacket = self._frame_queue.get(timeout=0.1)
except Empty:
continue

# End of stream signal
# Skip end-of-stream markers
if packet.frame_index == -1:
logger.info(f"End of stream reached for port {self._port}")
self._on_collection_complete()
break
continue

# Accumulate points (store frame_index with PointPacket)
# Skip empty detections - PointPacket exists but has no points
if packet.points is not None and len(packet.points.point_id) > 0:
# Accumulate points only during collection
if self._is_collecting and packet.points is not None and len(packet.points.point_id) > 0:
self._collected_points.append((packet.frame_index, packet.points))

# Put on display queue for View's processing thread
# Always emit for display
self._display_queue.put(packet)
self._current_frame_index = packet.frame_index
self.frame_position_changed.emit(packet.frame_index)

# Detect end of video during collection
if self._is_collecting and packet.frame_index >= self._publisher.last_frame_index:
self._on_collection_complete()

logger.debug(f"Consumer thread exiting for port {self._port}")

def _on_collection_complete(self) -> None:
"""Called when video playback finishes. Submits calibration task."""
self._publisher.pause()
self._is_collecting = False

if len(self._collected_points) == 0:
Expand Down Expand Up @@ -392,8 +402,8 @@ def _on_calibration_complete(self, result: IntrinsicCalibrationResult) -> None:
self.calibration_complete.emit(self._calibrated_camera)
self._emit_state_changed()

# Reload initial frame so View has something to display/undistort
self._load_initial_frame()
# Jump to first frame so View can display with undistortion
self._publisher.jump_to(0, exact=True)

def _on_calibration_failed(self, exc_type: str, message: str) -> None:
"""Handle calibration failure."""
Expand All @@ -409,7 +419,15 @@ def _emit_state_changed(self) -> None:

def cleanup(self) -> None:
"""Clean up resources. Call before discarding presenter."""
# Stop consumer thread
self._stop_event.set()
if self._consumer_thread is not None:
self._consumer_thread.join(timeout=2.0)

# Cancel publisher worker
if self._stream_handle is not None:
self._stream_handle.cancel()
if self.state == PresenterState.COLLECTING:
self.stop_calibration()

# Clean up publisher
self._publisher.unsubscribe(self._frame_queue)
self._publisher.close()
Loading
Loading