Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions src/caliscope/core/intrinsic_calibrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,24 @@

from caliscope.cameras.camera_array import CameraData
from caliscope.packets import FramePacket
from caliscope.recording import FramePacketPublisher
from caliscope.recording import FramePacketStreamer

logger = logging.getLogger(__name__)


class IntrinsicCalibrator:
"""
Takes a FramePacketPublisher and determines a CameraData object from it.
Publisher needs to have a charuco tracker assigned to it.
Takes a FramePacketStreamer and determines a CameraData object from it.
Streamer needs to have a charuco tracker assigned to it.
"""

def __init__(self, camera_data: CameraData, publisher: FramePacketPublisher):
def __init__(self, camera_data: CameraData, streamer: FramePacketStreamer):
self.camera = camera_data # pointer needed to update params
self.publisher = publisher
self.streamer = streamer
self.initialize_point_history()

self.frame_packet_q = Queue()
self.publisher.subscribe(self.frame_packet_q)
self.streamer.subscribe(self.frame_packet_q)

# The following group of parameters relate to the autopopulation of the calibrator
self.grid_history_q = Queue() # for passing ids, img_loc used in calibration
Expand All @@ -50,15 +50,15 @@ def harvest_worker():

self.add_frame_packet(frame_packet)

logger.info(f"Harvest frames successfully ended in calibrator for port {self.publisher.port}")
logger.info(f"Harvest frames successfully ended in calibrator for port {self.streamer.port}")

self.harvest_thread = Thread(target=harvest_worker, args=[], daemon=True)
self.harvest_thread.start()

def stop(self):
logger.info("Beginning to stop intrinsic calibrator")
self.stop_event.set()
self.publisher.unsubscribe(self.frame_packet_q)
self.streamer.unsubscribe(self.frame_packet_q)
self.frame_packet_q.put(-1)

@property
Expand Down Expand Up @@ -115,7 +115,7 @@ def add_frame_packet(self, frame_packet: FramePacket):
self.auto_pop_frame_wait = max(self.auto_pop_frame_wait - 1, 0)

logger.debug(f"Current index is {index}")
if index == self.publisher.last_frame_index:
if index == self.streamer.last_frame_index:
# end of stream, so stop auto pop and backfill to hit grid target
logger.info("End of autopop detected...")
self.auto_store_data.clear()
Expand Down Expand Up @@ -212,8 +212,8 @@ def calibrate_camera(self):

logger.info(f"Calibrating camera {self.camera.port}....")

width = self.publisher.size[0]
height = self.publisher.size[1]
width = self.streamer.size[0]
height = self.streamer.size[1]

if self.camera.fisheye:
# Convert to float32 and RESHAPE to add the required channel dimension
Expand Down
108 changes: 69 additions & 39 deletions src/caliscope/gui/presenters/intrinsic_calibration_presenter.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from enum import Enum, auto
from pathlib import Path
from queue import Empty, Queue
from threading import Event, Thread
from threading import Event, Lock, Thread

import cv2
import numpy as np
Expand All @@ -23,10 +23,10 @@
IntrinsicCalibrationResult,
calibrate_intrinsics,
)
from caliscope.core.frame_selector import select_calibration_frames
from caliscope.core.frame_selector import FrameSelectionResult, select_calibration_frames
from caliscope.core.point_data import ImagePoints
from caliscope.packets import FramePacket, PointPacket
from caliscope.recording import create_publisher
from caliscope.recording import create_streamer
from caliscope.task_manager.cancellation import CancellationToken
from caliscope.task_manager.task_handle import TaskHandle
from caliscope.task_manager.task_manager import TaskManager
Expand Down Expand Up @@ -56,7 +56,7 @@ class IntrinsicCalibrationPresenter(QObject):
submission of calibration to TaskManager. Exposes a display_queue for
the View's processing thread to consume directly (avoids GUI thread hop).

A single FramePacketPublisher lives across all states, enabling scrubbing
A single FramePacketStreamer lives across all states, enabling scrubbing
in READY/CALIBRATED states and collection during COLLECTING state.

Signals:
Expand Down Expand Up @@ -109,30 +109,34 @@ def __init__(
self._collected_points: list[tuple[int, PointPacket]] = []
self._calibrated_camera: CameraData | None = None
self._calibration_task: TaskHandle | None = None
self._selection_result: FrameSelectionResult | None = None

# Lock for thread-safe access to collected_points from View
self._overlay_lock = Lock()

# Display queue for View consumption
self._display_queue: Queue[FramePacket | None] = Queue()

# Collection state
self._is_collecting = False

# Single publisher for all states (scrubbing + collection)
self._publisher = create_publisher(
# Single streamer for all states (scrubbing + collection)
self._streamer = create_streamer(
video_directory=self._video_path.parent,
port=self._camera.port,
rotation_count=self._camera.rotation_count,
tracker=self._tracker,
break_on_last=False, # Pause at end instead of exit
end_behavior="pause", # Pause at end for interactive scrubbing
)
self._frame_queue: Queue[FramePacket] = Queue()
self._publisher.subscribe(self._frame_queue)
self._streamer.subscribe(self._frame_queue)

# Start publisher worker (will read first frame, then we pause)
# Start streamer worker (will read first frame, then we pause)
self._stream_handle = self._task_manager.submit(
self._publisher.play_worker,
name=f"Publisher port {self._port}",
self._streamer.play_worker,
name=f"Streamer port {self._port}",
)
self._publisher.pause() # Immediately pause for scrubbing mode
self._streamer.pause() # Immediately pause for scrubbing mode

# Guaranteed initial frame display (don't rely on thread timing)
self._load_initial_frame()
Expand Down Expand Up @@ -180,13 +184,31 @@ def camera(self) -> CameraData:
@property
def frame_count(self) -> int:
"""Total frames in video."""
return self._publisher.last_frame_index + 1
return self._streamer.last_frame_index + 1

@property
def current_frame_index(self) -> int:
"""Current frame position."""
return self._current_frame_index

@property
def collected_points(self) -> list[tuple[int, PointPacket]]:
"""Thread-safe access to accumulated points for overlay rendering."""
with self._overlay_lock:
return list(self._collected_points)

@property
def selected_frame_indices(self) -> list[int] | None:
"""Frame indices used in calibration, or None if not yet calibrated."""
if self._selection_result is None:
return None
return self._selection_result.selected_frames

@property
def board_connectivity(self) -> set[tuple[int, int]]:
"""Point ID pairs that should be connected to form grid."""
return self._tracker.get_connected_points()

def refresh_display(self) -> None:
"""Put a fresh frame on the display queue.

Expand All @@ -196,14 +218,14 @@ def refresh_display(self) -> None:
self._load_initial_frame()

def seek_to(self, frame_index: int) -> None:
"""Seek to frame. Works in READY/CALIBRATED states via publisher's jump_to."""
"""Seek to frame. Works in READY/CALIBRATED states via streamer's seek_to."""
if self.state not in (PresenterState.READY, PresenterState.CALIBRATED):
return

frame_index = max(0, min(frame_index, self.frame_count - 1))
self._publisher.jump_to(
frame_index, exact=True
) # exact=False would cause Fast seek, skipping between keyframes
self._streamer.seek_to(
frame_index, precise=True
) # precise=False would cause Fast seek, skipping between keyframes

def _load_initial_frame(self) -> None:
"""Read first frame from video and put on display queue."""
Expand All @@ -228,26 +250,29 @@ def _load_initial_frame(self) -> None:
def start_calibration(self) -> None:
"""Start collecting calibration frames.

Resets to beginning and unpauses the publisher.
Resets to beginning and unpauses the streamer.
"""
if self.state not in (PresenterState.READY, PresenterState.CALIBRATED):
logger.warning(f"Cannot start calibration in state {self.state}")
return

logger.info(f"Starting calibration collection for port {self._port}")

# Set collecting FIRST to block concurrent seeks
self._is_collecting = True
self._emit_state_changed()

# Clear previous attempt's data
self._collected_points.clear()
# Clear previous calibration data BEFORE setting collecting flag
# (state is computed: CALIBRATED check comes before COLLECTING check)
with self._overlay_lock:
self._collected_points.clear()
self._calibrated_camera = None
self._calibration_task = None
self._selection_result = None

# Now set collecting and emit state change
self._is_collecting = True
self._emit_state_changed()

# Reset to beginning and start playback
self._publisher.jump_to(0, exact=True)
self._publisher.unpause()
self._streamer.seek_to(0, precise=True)
self._streamer.unpause()

def stop_calibration(self) -> None:
"""Stop collection and return to READY state.
Expand All @@ -260,21 +285,22 @@ def stop_calibration(self) -> None:

logger.info(f"Stopping calibration collection for port {self._port}")

self._publisher.pause()
self._collected_points.clear()
self._streamer.pause()
with self._overlay_lock:
self._collected_points.clear()
self._is_collecting = False
self._emit_state_changed()

def _consume_frames(self) -> None:
"""Pull frames from queue, accumulate points when collecting, emit for display.

Runs continuously across all states. Exits when stop_event is set or
publisher is cancelled externally.
streamer is cancelled externally.
"""
logger.debug(f"Consumer thread started for port {self._port}")

while not self._stop_event.is_set():
# Exit if publisher was cancelled externally
# Exit if streamer was cancelled externally
if self._stream_handle is not None and self._stream_handle.state == TaskState.CANCELLED:
break

Expand All @@ -289,22 +315,23 @@ def _consume_frames(self) -> None:

# Accumulate points only during collection
if self._is_collecting and packet.points is not None and len(packet.points.point_id) > 0:
self._collected_points.append((packet.frame_index, packet.points))
with self._overlay_lock:
self._collected_points.append((packet.frame_index, packet.points))

# Always emit for display
self._display_queue.put(packet)
self._current_frame_index = packet.frame_index
self.frame_position_changed.emit(packet.frame_index)

# Detect end of video during collection
if self._is_collecting and packet.frame_index >= self._publisher.last_frame_index:
if self._is_collecting and packet.frame_index >= self._streamer.last_frame_index:
self._on_collection_complete()

logger.debug(f"Consumer thread exiting for port {self._port}")

def _on_collection_complete(self) -> None:
"""Called when video playback finishes. Submits calibration task."""
self._publisher.pause()
self._streamer.pause()
self._is_collecting = False

if len(self._collected_points) == 0:
Expand Down Expand Up @@ -335,6 +362,9 @@ def _on_collection_complete(self) -> None:

logger.info(f"Selected {len(selection_result.selected_frames)} frames for calibration")

# Store selection result for overlay rendering
self._selection_result = selection_result

# Submit calibration to TaskManager
def calibration_worker(token: CancellationToken, handle: TaskHandle) -> IntrinsicCalibrationResult:
return calibrate_intrinsics(
Expand Down Expand Up @@ -402,8 +432,8 @@ def _on_calibration_complete(self, result: IntrinsicCalibrationResult) -> None:
self.calibration_complete.emit(self._calibrated_camera)
self._emit_state_changed()

# Jump to first frame so View can display with undistortion
self._publisher.jump_to(0, exact=True)
# Seek to first frame so View can display with undistortion
self._streamer.seek_to(0, precise=True)

def _on_calibration_failed(self, exc_type: str, message: str) -> None:
"""Handle calibration failure."""
Expand All @@ -424,10 +454,10 @@ def cleanup(self) -> None:
if self._consumer_thread is not None:
self._consumer_thread.join(timeout=2.0)

# Cancel publisher worker
# Cancel streamer worker
if self._stream_handle is not None:
self._stream_handle.cancel()

# Clean up publisher
self._publisher.unsubscribe(self._frame_queue)
self._publisher.close()
# Clean up streamer
self._streamer.unsubscribe(self._frame_queue)
self._streamer.close()
Loading
Loading