diff --git a/.gitignore b/.gitignore index 6db82cf..308dc74 100644 --- a/.gitignore +++ b/.gitignore @@ -105,6 +105,10 @@ ENV/ .vscode/ .idea/ +# Editor swap files +*.swp +*.asv + # Custom paper/ out/ diff --git a/docs/source/apidocs/visionsim.utils.rst b/docs/source/apidocs/visionsim.utils.rst index d430a0a..6474144 100644 --- a/docs/source/apidocs/visionsim.utils.rst +++ b/docs/source/apidocs/visionsim.utils.rst @@ -12,6 +12,14 @@ visionsim.utils.color module :show-inheritance: :undoc-members: +visionsim.utils.imgproc module +------------------------------ + +.. automodule:: visionsim.utils.imgproc + :members: + :show-inheritance: + :undoc-members: + visionsim.utils.progress module ------------------------------- diff --git a/examples/quickstart.sh b/examples/quickstart.sh index 601fb89..6f80fbc 100644 --- a/examples/quickstart.sh +++ b/examples/quickstart.sh @@ -2,7 +2,7 @@ visionsim blender.render-animation lego.blend quickstart/lego-gt/ --render-config.keyframe-multiplier=5.0 visionsim ffmpeg.animate --input-dir=quickstart/lego-gt/ --outfile=quickstart/preview.mp4 --step=5 --fps=25 --force -visionsim interpolate.frames --input-dir=quickstart/lego-gt/ --output-dir=quickstart/lego-interp/ --n=32 +visionsim interpolate.dataset --input-dir=quickstart/lego-gt/ --output-dir=quickstart/lego-interp/ --n=32 visionsim emulate.rgb --input-dir=quickstart/lego-interp/ --output-dir=quickstart/lego-rgb25fps/ --chunk-size=160 --readout-std=0 visionsim emulate.spad --input-dir=quickstart/lego-interp/ --output-dir=quickstart/lego-spc4kHz/ visionsim emulate.events --input-dir=quickstart/lego-gt/ --output-dir=quickstart/lego-dvs125fps/ --fps=125 diff --git a/tasks.py b/tasks.py index 20f0f54..0a7394f 100644 --- a/tasks.py +++ b/tasks.py @@ -146,7 +146,7 @@ def build_docs(c, preview=False, full=False): for i, n in enumerate((25, 50, 100, 200)): for cmd in ( f"visionsim blender.render-animation lego.blend interpolation/lego-{n:04}/ --keyframe-multiplier={n / 100} --width=320 --height=320", - f"visionsim interpolate.frames interpolation/lego-{n:04}/ -o interpolation/lego{n:04}-interp/ -n={int(64 / 2**i)}", + f"visionsim interpolate.dataset interpolation/lego-{n:04}/ -o interpolation/lego{n:04}-interp/ -n={int(64 / 2**i)}", f"gifski $(ls -1a interpolation/lego{n:04}-interp/frames/*.png | sed -n '1~8p') --fps 25 -o {DOCS_STATIC}/lego{n:04}-interp.gif", ): _run(c, cmd, echo=True, warn=True) diff --git a/visionsim/cli/__init__.py b/visionsim/cli/__init__.py index 409c510..31cbda6 100644 --- a/visionsim/cli/__init__.py +++ b/visionsim/cli/__init__.py @@ -8,8 +8,9 @@ import shlex import subprocess import sys +from functools import lru_cache from pathlib import Path -from typing import overload +from typing import Any, Literal, overload import tyro from natsort import natsorted @@ -81,6 +82,13 @@ def _validate_directories( return input_path, output_path +@lru_cache +def _log_once(value: Any, msg: str, level: Literal["debug", "info", "warning", "error", "critical"] = "warning") -> Any: + """Log a message once per unique value, returns the value.""" + getattr(_log, level)(msg) + return value + + def _run( command: list[str] | str, shell: bool = False, diff --git a/visionsim/cli/emulate.py b/visionsim/cli/emulate.py index 46dcedf..c6493d9 100644 --- a/visionsim/cli/emulate.py +++ b/visionsim/cli/emulate.py @@ -3,7 +3,7 @@ import math import shutil from pathlib import Path -from typing import Any +from typing import Any, Literal import numpy as np @@ -11,29 +11,42 @@ def spad( input_dir: Path, output_dir: Path, - pattern: str | None = None, - factor: float = 1.0, + flux_gain: float = 1.0, + bitplanes: int = 1, + bitdepth: int | None = None, + force_gray: bool = False, seed: int = 2147483647, + pattern: str | None = None, max_size: int = 1000, force: bool = False, ) -> None: - """Perform bernoulli sampling on linearized RGB frames to yield binary frames + """Perform binomial sampling on linearized RGB frames to yield (summed) single photon frames + + This will save numpy files which may be bitpacked (when bitplanes == 1) and may have different dtypes + depending on the number of summed bitplanes. The shape of the output arrays will be (max_size, h, w, c) or (remainder, h, w, c) + where remainder = len(dataset) % max_size, where the width dimension is ceil(width / 8) when bitpacked. + + If the input contains alpha channel (determined by the last dimension of the input images), it will be stripped. Args: input_dir: directory in which to look for frames - output_dir: directory in which to save binary frames - pattern: used to find source image files to convert to binary frames, + output_dir: directory in which to save single photon frames + pattern: used to find source image files to convert to single photon frames, not needed when ``input_dir`` points to a valid dataset. - factor: multiplicative factor controlling dynamic range of output + flux_gain: multiplicative factor controlling dynamic range of output + bitplanes: number of summed binary measurements + bitdepth: if set, ``bitplanes`` will be overridden to ``2**bitdepth - 1`` + force_gray: to disable RGB sensing even if the input images are color seed: random seed to use while sampling, ensures reproducibility max_size: maximum number of frames per output array before rolling over to new file force: if true, overwrite output file(s) if present, else throw error """ from numpy.lib.format import open_memmap + from visionsim.cli import _log, _log_once from visionsim.dataset import Dataset, Metadata from visionsim.emulate.spc import emulate_spc - from visionsim.utils.color import srgb_to_linearrgb + from visionsim.utils.color import rgb_to_grayscale, srgb_to_linearrgb from visionsim.utils.progress import ElapsedProgress if input_dir.resolve() == output_dir.resolve(): @@ -48,6 +61,17 @@ def spad( else: dataset = Dataset.from_path(input_dir) + if bitdepth is not None: + _log.info(f"Overriding bitplanes to {2**bitdepth - 1} since bitdepth is set to {bitdepth}.") + bitplanes = 2**bitdepth - 1 + + # Map bitplanes to the smallest uint type that can hold it (minimum 8 bits) + out_dtype = next( + dtype + for limit, dtype in [(8, np.uint8), (16, np.uint16), (32, np.uint32), (64, np.uint64)] + if bitplanes <= 2**limit - 1 + ) + rng = np.random.default_rng(int(seed)) output_dir.mkdir(exist_ok=True, parents=True) transforms: list[dict[str, Any]] = [] @@ -64,33 +88,41 @@ def spad( else: data = data.astype(float) / 255.0 - # Default to bitpacking width - binary_img = emulate_spc(data, factor=factor, rng=rng) * 255 - binary_img = binary_img.astype(np.uint8) >= 128 - binary_img = np.packbits(binary_img, axis=1) + if len(data.shape) == 3 and data.shape[-1] in (2, 4): # LA/RGBA + _log_once(data.shape, "Alpha channel detected, ignoring it.", "info") + data = data[..., :-1] + + if force_gray: + data = rgb_to_grayscale(data) + + imgs = emulate_spc(data, flux_gain=flux_gain, bitplanes=bitplanes, rng=rng) offset = i % max_size file_path = output_dir / f"{i // max_size:04}.npy" transform["file_path"] = file_path.name - transform["bitpack_dim"] = 2 + transform["bitplanes"] = bitplanes transform["offset"] = offset h, w, c = data.shape + if bitplanes == 1: + # Default to bitpacking width + imgs = imgs >= 0.5 + imgs = np.packbits(imgs, axis=1) + transform["bitpack_dim"] = 2 + w = math.ceil(transform.get("w", w) / 8) + else: + w = transform.get("w", w) + if not file_path.exists(): data = open_memmap( file_path, mode="w+", - dtype=np.uint8, - shape=( - min(max_size, remainder), - transform.get("h", h), - math.ceil(transform.get("w", w) / 8), - transform.get("c", c), - ), + dtype=out_dtype, + shape=(min(max_size, remainder), transform.get("h", h), w, c), ) - data[offset] = binary_img + data[offset] = imgs else: - open_memmap(file_path)[offset] = binary_img + open_memmap(file_path)[offset] = imgs transforms.append(transform) progress.update(task, advance=1) @@ -136,6 +168,7 @@ def events( from visionsim.dataset import Dataset from visionsim.emulate.dvs import EventEmulator + from visionsim.utils.color import rgb_to_grayscale from visionsim.utils.progress import ElapsedProgress if input_dir.resolve() == output_dir.resolve(): @@ -171,10 +204,7 @@ def events( task = progress.add_task("Writing DVS data...", total=len(dataset)) for idx, (frame, _) in enumerate(dataset): # type: ignore - # Manually grayscale as we've already converted to floating point pixel values - # Values from http://en.wikipedia.org/wiki/Grayscale - r, g, b, *_ = np.transpose(frame, (2, 0, 1)) - luma = 0.0722 * b + 0.7152 * g + 0.2126 * r + luma = rgb_to_grayscale(frame) events = emulator.generate_events(luma, idx / int(fps)) if events is not None: @@ -198,10 +228,16 @@ def rgb( input_dir: Path, output_dir: Path, chunk_size: int = 10, - factor: float = 1.0, - readout_std: float = 20.0, - fwc: int | None = None, - duplicate: float = 1.0, + shutter_frac: float = 1.0, + readout_std: float = 16.0, + fwc: float | None = None, + flux_gain: float = 2.0**12, + iso_gain: float = 1.0, + adc_bitdepth: int = 12, + mosaic: bool = False, + demosaic: Literal["off", "bilinear", "MHC04"] = "MHC04", + denoise_sigma: float = 0.0, + sharpen_weight: float = 0.0, pattern: str | None = None, force: bool = False, ) -> None: @@ -211,11 +247,16 @@ def rgb( input_dir: directory in which to look for frames output_dir: directory in which to save binary frames chunk_size: number of consecutive frames to average together - factor: multiply image's linear intensity by this weight - readout_std: standard deviation of gaussian read noise - fwc: full well capacity of sensor in arbitrary units (relative to factor & chunk_size) - duplicate: when chunk size is too small, this model is ill-suited and creates unrealistic noise. - This parameter artificially increases the chunk size by using each input image ``duplicate`` number of times + shutter_frac: fraction of inter-frame duration shutter is active (0 to 1) + readout_std: standard deviation of gaussian read noise in photoelectrons + fwc: full well capacity of sensor in photoelectrons + flux_gain: factor to scale the input images before Poisson simulation + iso_gain: gain for photo-electron reading after Poisson rng + adc_bitdepth: ADC bitdepth + mosaic: implement mosaiced R-/G-/B- pixels or an innately 3-channel sensor + demosaic: demosaicing method (default Malvar et al.'s method) + denoise_sigma: Gaussian blur with this sigma will be used (default 0.0 disables this) + sharpen_weight: weight used in sharpening (default 0.0 disables this) pattern: used to find source image files to convert to rgb frames, not needed when ``input_dir`` points to a valid dataset. force: if true, overwrite output file(s) if present @@ -223,11 +264,12 @@ def rgb( import imageio.v3 as iio import more_itertools as mitertools + from visionsim.cli import _log_once from visionsim.dataset import Dataset, Metadata from visionsim.emulate.rgb import emulate_rgb_from_sequence from visionsim.interpolate.pose import pose_interp from visionsim.simulate.blender import INDEX_PADDING, ITEMS_PER_SUBFOLDER - from visionsim.utils.color import srgb_to_linearrgb + from visionsim.utils.color import linearrgb_to_srgb, srgb_to_linearrgb from visionsim.utils.progress import ElapsedProgress if input_dir.resolve() == output_dir.resolve(): @@ -260,27 +302,39 @@ def rgb( # Assume images have been tonemapped and undo mapping imgs = srgb_to_linearrgb(imgs) + if len(imgs.shape) == 4 and imgs.shape[-1] in (2, 4): # LA/RGBA + _log_once(imgs.shape, "Alpha channel detected, ignoring it.", "info") + imgs = imgs[..., :-1] + rgb_img = emulate_rgb_from_sequence( - imgs * duplicate, + imgs, readout_std=readout_std, - fwc=fwc or (chunk_size * duplicate), - factor=factor, + fwc=fwc or np.inf, + shutter_frac=shutter_frac, + flux_gain=flux_gain, + iso_gain=iso_gain, + adc_bitdepth=adc_bitdepth, + mosaic=mosaic, + demosaic=demosaic, + denoise_sigma=denoise_sigma, + sharpen_weight=sharpen_weight, ) if not pattern: # We checked that there's only a single camera, just re-use any transforms dict (transform, *_), transforms_iter = mitertools.spy(transforms_iter) poses = np.array([t["transform_matrix"] for t in transforms_iter]) - transform["transform_matrix"] = pose_interp(poses, k=np.clip(len(poses) - 1, 2, 3))(0.5) + + if len(poses) > 1: + transform["transform_matrix"] = pose_interp(poses, k=min(len(poses) - 1, 3))(0.5) + else: + transform["transform_matrix"] = poses[0] + transform["file_path"] = outpath.relative_to(output_dir) transforms.append(transform) - # TODO: Alpha and grayscale? - # if rgb_img.shape[-1] == 1: - # rgb_img = np.repeat(rgb_img, 3, axis=-1) - outpath.parent.mkdir(exist_ok=True, parents=True) - iio.imwrite(outpath, (rgb_img * 255).astype(np.uint8)) + iio.imwrite(outpath, (linearrgb_to_srgb(rgb_img) * 255).astype(np.uint8)) progress.update(task, advance=chunk_size) if not pattern: diff --git a/visionsim/cli/ffmpeg.py b/visionsim/cli/ffmpeg.py index e83e474..0b12441 100644 --- a/visionsim/cli/ffmpeg.py +++ b/visionsim/cli/ffmpeg.py @@ -1,14 +1,14 @@ from __future__ import annotations from pathlib import Path -from typing import cast +from typing import Any, cast def animate( input_dir: Path, pattern: str | None = None, outfile: Path = Path("out.mp4"), - fps: int = 25, + fps: float | None = None, crf: int = 22, vcodec: str = "libx264", step: int = 1, @@ -20,13 +20,15 @@ def animate( """Combine generated frames into an MP4 using ffmpeg wizardry. This is roughly equivalent to running the "image2" demuxer in ffmpeg, with the added benefit of being able to - skip frames using a step size, strip alpha channels from PNGs, and automatically handling the case where the input frames are numpy arrays. + skip frames using a step size, strip alpha channels from PNGs, and automatically handling the case where the + input frames are numpy arrays. If using a dataset as input, the ``bitplanes`` attribute will be used to normalize + the data to uint8 for visualization. Args: input_dir: directory in which to look for frames, pattern: If provided search for files matching this pattern. Otherwise, look for a valid dataset in the input directory. outfile: where to save generated mp4 - fps: frames per second in video + fps: frames per second in video, if None, will be inferred from dataset crf: constant rate factor for video encoding (0-51), lower is better quality but more memory vcodec: video codec to use (either libx264 or libx265) step: drop some frames when making video, use frames 0+step*n @@ -41,7 +43,7 @@ def animate( import numpy as np from rich.progress import track - from visionsim.cli import _run + from visionsim.cli import _log, _log_once, _run from visionsim.dataset import Dataset if _run("ffmpeg -version", hide=True).returncode != 0: @@ -57,6 +59,28 @@ def animate( else: dataset = Dataset.from_path(input_dir) + # Infer fps from dataset if not provided + if fps is None: + if dataset.cameras: + framerates = set(c.fps for c in dataset.cameras) + if len(framerates) != 1: + raise ValueError(f"All cameras must have the same fps. Found: {framerates}") + fps = next(iter(framerates)) + + if fps: + if fps > 240.0: + _log.warning( + f"Inferred frame rate of {fps} fps may cause issues with some codecs. " + "Consider manually setting a lower fps and using a step size to reduce the number of frames." + ) + else: + _log.warning("No fps found in dataset, using default frame rate of 25 fps.") + fps = 25.0 + else: + _log.info("No cameras found in dataset, using default frame rate of 25 fps.") + fps = 25.0 + + # Ensure all files are of the same type exts = {p.suffix for p in dataset.paths} if len(exts) != 1: @@ -86,9 +110,17 @@ def animate( # Iterate over dataset with step and extract frame from npy if neccesary if ext.lower() == ".npy": for i, idx in enumerate(track(range(0, len(dataset), step), description="Extracting frames")): - data, transform = dataset[idx] - if cast(dict, transform).get("bitpack_dim"): - data = np.array(data * 255).astype(np.uint8) + data, transform = cast(tuple[np.ndarray, dict[str, Any]], dataset[idx]) + + if transform.get("bitplanes"): + _log_once(transform.get("bitplanes"), "Bitplanes detected, normalizing to uint8.", "info") + data = np.array(data / transform["bitplanes"] * 255).astype(np.uint8) + + # Handle grayscale images by repeating the channel + if data.ndim == 3 and data.shape[-1] != 3: # type: ignore + data = np.repeat(data, 3, axis=-1) + + # TODO: Maybe tonemap/invert_response iio.imwrite(tmpdirname / f"{i:09}.png", data) else: for i, p in enumerate(dataset.paths[::step]): diff --git a/visionsim/cli/interpolate.py b/visionsim/cli/interpolate.py index eb6316c..79794ef 100644 --- a/visionsim/cli/interpolate.py +++ b/visionsim/cli/interpolate.py @@ -87,7 +87,7 @@ def dataset( if method.lower() == "rife": rife( - input_dir, + dataset.root, output_dir, input_files=dataset.paths, exp=np.log2(n).astype(int), @@ -102,9 +102,14 @@ def dataset( _log.info("Interpolating poses...") interp_poses = interpolate_poses(dataset.poses, n=n) interp_paths = natsorted(output_dir.glob("**/*.png")) + camera = next(iter(dataset.cameras)) + + if camera.fps: + camera.fps *= n + Metadata.from_frames( frames=[ dict(file_path=p.relative_to(output_dir), transform_matrix=m) for p, m in zip(interp_paths, interp_poses) ], - camera=next(iter(dataset.cameras)), + camera=camera, ).save(output_dir / "transforms.json") diff --git a/visionsim/dataset/models.py b/visionsim/dataset/models.py index ce20f8e..a288831 100644 --- a/visionsim/dataset/models.py +++ b/visionsim/dataset/models.py @@ -68,6 +68,8 @@ class Data(BaseModel): """path to data, usually an image or ndarray file""" bitpack_dim: int | None = None """dimension that has been bitpacked""" + bitplanes: int | None = None + """number of summed bitplanes in image""" class Frame(Camera, Data): @@ -83,8 +85,8 @@ class Frame(Camera, Data): class Metadata(Camera): """A superset of the `Nerfstudio `_ - ``transforms.json`` format with a few additional fields such as additional data paths (eg: flow/segmentation) - and a channels dimension.""" + ``transforms.json`` format which enables use of numpy arrays for single photon data, and allows for additional + data paths (eg: flow/segmentation) and metadata attributes such as a channels dimension.""" _REQUIRED_FIELDS: ClassVar[tuple[str, ...]] = ("fl_x", "fl_y", "cx", "cy", "h", "w") _data_types: set[str] diff --git a/visionsim/emulate/rgb.py b/visionsim/emulate/rgb.py index c08c2e3..703a0f0 100644 --- a/visionsim/emulate/rgb.py +++ b/visionsim/emulate/rgb.py @@ -4,57 +4,96 @@ import numpy as np import numpy.typing as npt +from scipy.ndimage import gaussian_filter +from typing_extensions import Literal -from visionsim.utils.color import linearrgb_to_srgb +from visionsim.utils.color import raw_bayer_to_rgb, rgb_to_raw_bayer +from visionsim.utils.imgproc import unsharp_mask def emulate_rgb_from_sequence( sequence: npt.ArrayLike, - readout_std: float = 20.0, - fwc: float = 500.0, - bitdepth: int = 12, - factor: float = 1.0, + shutter_frac: float = 1.0, + readout_std: float = 0.0, + fwc: float = 10000.0, + adc_bitdepth: int = 12, + flux_gain: float = 2**12, + iso_gain: float = 1.0, + mosaic: bool = False, + demosaic: Literal["off", "bilinear", "MHC04"] = "off", + denoise_sigma: float = 0.0, + sharpen_weight: float = 0.0, rng: np.random.Generator | None = None, ) -> npt.NDArray: - """Emulates a conventional RGB camera from a sequence of intensity frames. + """Emulates a conventional RGB camera [1]_ from a sequence of intensity frames. + + For demosaicing details see :func:`raw_bayer_to_rgb `. Note: Motion-blur is approximated by averaging consecutive ground truth frames, this can be done more efficiently if optical flow is available. - See `emulate_rgb_from_flow` for more. + See :func:`emulate_rgb_from_flow ` for more. Args: sequence (npt.ArrayLike): Input sequence of linear-intensity frames, can be a collection of frames, or np/torch array with time as the first dimension. - readout_std (float, optional): Standard deviation of zero mean Gaussian read noise. Defaults to 20.0. - fwc (float, optional): Full well capacity, used for normalization. Defaults to 500.0. - bitdepth (int, optional): Resolution of ADC in bits. Defaults to 12. - factor (float, optional): Scaling factor to control intensity of output RGB image. Defaults to 1.0. + shutter_frac (float, optional): fraction of inter-frame duration the shutter is active. Range [0, 1] + readout_std (float, optional): Standard deviation of zero mean Gaussian read noise. Defaults to 0.0. + fwc (float, optional): Full well capacity, used for normalization. Defaults to 10000.0. + adc_bitdepth (int, optional): Resolution of ADC in bits. Defaults to 12. + flux_gain (float, optional): factor to scale the input [0, 1] image _before_ Poisson sampling + iso_gain (float, optional): factor to scale the photo-electron reading _after_ Poisson sampling + mosaic (bool, optional): implement one array with mosaiced R-/G-/B-sensitive pixels or an innately 3-channel sensor + demosaic (string, optional): demosaicing method to use if "mosaic" is set (default "off") + denoise_sigma (float, optional): Gaussian blur kernel sigma (disabled if 0.0) + sharpen_weight (float, optional): sharpening weight (disabled if 0.0) rng (np.random.Generator, optional): Optional random number generator. Defaults to none. Returns: - Quantized sRGB patch is returned + npt.NDArray: Quantized linear-intensity RGB patch as floating point array (range [0, 1]) + + References: + .. [1] S. W. Hasinoff, F. Durand, and W. T. Freeman, + “Noise-optimal capture for high dynamic range photography,” + CVPR 2010. """ - # Get sum of linear-intensity frames. - sequence = np.array(sequence) - burst_size = len(sequence) - patch = np.sum(sequence, axis=0) * factor + # Get mean of linear-intensity frames. + burst_size = int(max(1, np.ceil(len(sequence) * shutter_frac))) + sequence = np.array(sequence[:burst_size]) + patch = np.mean(sequence, axis=0) + + # Convert to raw bayer if mosaic, this just samples the color channels + if mosaic: + patch = rgb_to_raw_bayer(patch) + + # So far patch is GT motion blurred frame, without any noise or gain applied + # we need to scale it by flux gain and shutter fraction to get the correct amount of light + patch *= flux_gain * shutter_frac - # Perform poisson sampling and add zero-mean gaussian read noise + # Roughly translating the model in Eqs. (1,2) and Fig. 1 of Hasinoff et al. + # Perform poisson sampling rng = np.random.default_rng() if rng is None else rng - patch = cast(npt.NDArray, rng.poisson(patch)).astype(float) - patch += rng.normal(0, readout_std * burst_size / 255.0, size=patch.shape) + patch = cast(np.ndarray, rng.poisson(patch)).astype(float) - # Normalize by full well capacity, clip highlights, and quantize to 12-bits - patch = np.clip(patch / fwc, 0, 1.0) - patch = np.round(patch * (2**bitdepth - 1)) / (2**bitdepth - 1) + # Clip to full-well capacity, add readout noise, apply ISO gain + patch = np.clip(patch, 0, fwc) + patch += rng.normal(0, readout_std, size=patch.shape) + patch *= iso_gain + # Assume perfect quantization in ADC + patch = np.round(np.clip(patch, 0, (2**adc_bitdepth - 1))) + patch = patch / (2**adc_bitdepth - 1) - # Multiply by gain to keep constant(-ish) brightness - patch *= fwc / (burst_size * factor) + # De-mosaicing: necessary if data is mosaiced, so can't be `None`. + # ("off" is not a no-op, it still creates a full 3-channel image from 1, + # albeit a bad one) + if mosaic: + patch = raw_bayer_to_rgb(patch, method=demosaic) - # Convert to sRGB color space for viewing and quantize to 8-bits - patch = linearrgb_to_srgb(patch) - patch = np.round(patch * 2**8) / 2**8 + # De-noising and sharpening + if denoise_sigma != 0.0: + patch = gaussian_filter(patch, denoise_sigma) + if sharpen_weight != 0.0: + patch = unsharp_mask(patch, sigma=max(1, denoise_sigma), amount=sharpen_weight) return patch diff --git a/visionsim/emulate/spc.py b/visionsim/emulate/spc.py index ca86cda..429620a 100644 --- a/visionsim/emulate/spc.py +++ b/visionsim/emulate/spc.py @@ -8,21 +8,25 @@ def emulate_spc( - img: npt.NDArray[np.floating], factor: float = 1.0, rng: np.random.Generator | None = None + img: npt.NDArray[np.floating], + flux_gain: float = 1.0, + bitplanes: int = 1, + rng: np.random.Generator | None = None, ) -> npt.NDArray[np.integer]: """Perform bernoulli sampling on linearized RGB frames to yield binary frames. Args: - img (npt.ArrayLike): Linear intensity image to sample binary frame from. - factor (float, optional): Arbitrary corrective brightness factor. Defaults to 1.0. + img (npt.NDArray[np.floating]): Linear intensity image to sample binary frame from. + flux_gain(float, optional): scale factor to convert img in [0, 1] range to other flux levels. Defaults to 1.0. + bitplanes (int, optional): when bitplanes > 1, this represents a binomial SPAD sensor that sums binary samples internally + for each measurement, operating at framerate ``1 / bitplanes`` of a binary SPAD sensor. Defaults to 1. rng (np.random.Generator, optional): Optional random number generator. Defaults to none. Returns: - Binary single photon frame + npt.NDArray[np.integer]: binomial-distributed single-photon frame """ - # Perform bernoulli sampling (equivalent to binomial w/ n=1) rng = np.random.default_rng() if rng is None else rng - return rng.binomial(cast(npt.NDArray[np.integer], 1), 1.0 - np.exp(-img * factor)) + return rng.binomial(cast(npt.NDArray[np.integer], bitplanes), 1.0 - np.exp(-img * flux_gain)) def spc_avg_to_rgb( @@ -49,7 +53,7 @@ def spc_avg_to_rgb( Defaults to None (no normalization/clipping). Returns: - Conventional intensity image + torch.Tensor | npt.NDArray: Conventional intensity image """ module = torch if torch.is_tensor(mean_binary_patch) else np intensity = -module.log(module.clip(1 - mean_binary_patch, epsilon, 1)) / factor diff --git a/visionsim/interpolate/pose.py b/visionsim/interpolate/pose.py index 7c370e3..ae7d634 100644 --- a/visionsim/interpolate/pose.py +++ b/visionsim/interpolate/pose.py @@ -1,5 +1,6 @@ from __future__ import annotations +import logging from typing import Literal, cast import numpy as np @@ -9,6 +10,8 @@ from visionsim.types import Matrix4x4 +logger = logging.getLogger(__name__) + class pose_interp: """Linearly interpolate between 4x4 (or 3x4) transformation matrices by interpolating it's components""" @@ -29,6 +32,10 @@ def __init__(self, transforms: npt.ArrayLike, ts: npt.ArrayLike | None = None, k self.transforms = np.array(transforms) self.ts = np.linspace(0, 1, len(self.transforms)) if ts is None else np.array(ts) self.determinants = np.linalg.det(self.transforms[:, :3, :3]) + + if k >= len(self.transforms): + logger.warning(f"spline degree {k} >= #poses ({len(self.transforms)}), therefore downgrading it") + k = min(len(self.transforms) - 1, k) # for small chunk_sizes self.k = k if normalize: diff --git a/visionsim/simulate/schema.py b/visionsim/simulate/schema.py index 451ab98..5c0dbbb 100644 --- a/visionsim/simulate/schema.py +++ b/visionsim/simulate/schema.py @@ -14,6 +14,7 @@ SqliteDatabase, TextField, ) +from playhouse.migrate import SqliteMigrator, migrate from playhouse.shortcuts import ThreadSafeDatabaseMetadata from playhouse.sqlite_ext import JSONField from typing_extensions import Self @@ -73,6 +74,7 @@ class _Data(_BaseModel): path = TextField() bitpack_dim = IntegerField(null=True) + bitplanes = IntegerField(null=True) class _Frame(_BaseModel): @@ -91,7 +93,7 @@ class _Metadata: """The ``.db`` equivalent of :class:`models.Metadata `""" def __init__(self, path: str | os.PathLike) -> None: - """Initialize a metadata instance. + """Initialize a metadata instance. Unlike :meth:`load`, this method does not automatically apply any migrations. Args: path (str | os.PathLike): Metadata dataset path @@ -100,26 +102,23 @@ def __init__(self, path: str | os.PathLike) -> None: @classmethod def load(cls, path: str | os.PathLike) -> Self: - """Same as :meth:`__init__`, added to better mirror :class:`models.Metadata `""" + """Initialize a metadata instance and apply any necessary migrations.""" instance = cls(path) instance._migrate() return instance def _migrate(self) -> None: """Apply any necessary database migrations.""" - # TODO: Implement database migrations if needed. Below is an example of how to do it for a new "date" field in _Camera model. - - # from playhouse.migrate import SqliteMigrator, migrate - - # db = SqliteDatabase(self.path, pragmas=_DEFAULT_PRAGMAS) - # with db.connection_context(): - # columns = db.get_columns(_Camera._meta.table_name) - # column_names = [c.name for c in columns] - - # if "date" not in column_names: - # migrator = SqliteMigrator(db) - # with db.atomic(): - # migrate(migrator.add_column(_Camera._meta.table_name, "date", _Camera.date)) + db = SqliteDatabase(self.path, pragmas=_DEFAULT_PRAGMAS) + with db.connection_context(): + table_name = _Data._meta.table_name # type: ignore + columns = db.get_columns(table_name) + column_names = [c.name for c in columns] + + if "bitplanes" not in column_names: + migrator = SqliteMigrator(db) + with db.atomic(): + migrate(migrator.add_column(table_name, "bitplanes", _Data.bitplanes)) @classmethod def from_dense_transforms(cls, path: str | os.PathLike, transforms: Iterator[dict[str, Any]]) -> Self: diff --git a/visionsim/utils/color.py b/visionsim/utils/color.py index f9aaae4..6bc2e9a 100644 --- a/visionsim/utils/color.py +++ b/visionsim/utils/color.py @@ -1,8 +1,11 @@ from __future__ import annotations +from typing import Literal + import numpy as np import numpy.typing as npt import torch +from scipy.ndimage import correlate def srgb_to_linearrgb(img: torch.Tensor | npt.NDArray[np.floating]) -> torch.Tensor | npt.NDArray[np.floating]: @@ -13,7 +16,7 @@ def srgb_to_linearrgb(img: torch.Tensor | npt.NDArray[np.floating]) -> torch.Ten img (torch.Tensor | npt.NDArray): Image to un-tonemap. Returns: - linear rgb image. + torch.Tensor | npt.NDArray: linear rgb image. """ # https://github.com/blender/blender/blob/master/source/blender/blenlib/intern/math_color.c module, img = (torch, img.clone()) if torch.is_tensor(img) else (np, np.copy(img)) @@ -30,7 +33,7 @@ def linearrgb_to_srgb(img: torch.Tensor | npt.NDArray) -> torch.Tensor | npt.NDA img (torch.Tensor | npt.NDArray): Image to tonemap. Returns: - tonemapped rgb image. + torch.Tensor | npt.NDArray: tonemapped rgb image. """ # https://github.com/blender/blender/blob/master/source/blender/blenlib/intern/math_color.c module, img = (torch, img.clone()) if torch.is_tensor(img) else (np, np.copy(img)) @@ -39,3 +42,186 @@ def linearrgb_to_srgb(img: torch.Tensor | npt.NDArray) -> torch.Tensor | npt.NDA img[mask] = img[mask] * 12.92 # type: ignore img[~mask] = module.clip(1.055 * img[~mask] ** (1.0 / 2.4) - 0.055, 0.0, 1.0) return img + + +def rgb_to_grayscale(img: npt.NDArray[np.floating]) -> npt.NDArray[np.floating]: + """Performs RGB to grayscale color space conversion. + + Note: + If there is 4 channels, it is assumed that the last channel is alpha and it is preserved. + + Args: + img (npt.NDArray): Image to convert, expected to be in HWC format and normalized to [0, 1]. + + Returns: + npt.NDArray: grayscale image as HWC where C is 1 or 2 (if alpha channel is present). + """ + # Values from http://en.wikipedia.org/wiki/Grayscale + r, g, b, *a = img.transpose(2, 0, 1) + luma = 0.0722 * b + 0.7152 * g + 0.2126 * r + + if len(a): + luma = np.stack([luma, *a], axis=-1) + if luma.ndim < 3: + luma = luma[..., None] + + return luma + + +def rgb_to_raw_bayer(rgb: npt.NDArray, cfa_pattern: Literal["rggb"] = "rggb") -> npt.NDArray: + """Bayer Mosaicing + + Realizes a mosaiced CFA image as would be sampled by a real Bayer-patterned sensor + by simply subsampling the RGB image. + + Args: + rgb (npt.NDArray): hypothetical true RGB signal + cfa_pattern (Literal["rggb"]): Bayer pattern (only RGGB implemented for now) + + Returns: + npt.NDArray: CFA mosaiced data + """ + if cfa_pattern == "rggb": + raw = np.copy(rgb[:, :, 0]) + raw[::2, 1::2] = rgb[::2, 1::2, 1] + raw[1::2, ::2] = rgb[1::2, ::2, 1] + raw[1::2, 1::2] = rgb[1::2, 1::2, 2] + else: + raise NotImplementedError + return raw + + +def raw_bayer_to_rgb( + raw: npt.NDArray, + cfa_pattern: Literal["rggb"] = "rggb", + method: Literal["off", "bilinear", "MHC04"] = "bilinear", +) -> npt.NDArray: + """Bayer Demosaicing + + Convolution-based implementation as suggested by Malvar et al. [1]. + + Bilinear is simpler but visually not as nice as Malvar et al.'s method. + + Alternative implementations are also available from OpenCV: + rgb = cv2.cvtColor(, cv2.COLOR_BAYER_BG2BGR)[:,:,::-1], + and the colour-demosaicing library (https://pypi.org/project/colour-demosaicing): + rgb = demosaicing_CFA_Bayer_bilinear(raw, pattern="RGGB") + rgb = demosaicing_CFA_Bayer_Malvar2004(raw, pattern="RGGB"), + which appear to give similar results but could run faster (not benchmarked). + + Args: + raw (npt.NDArray): input array (has to be exactly 2D) + cfa_pattern (Literal["rggb"]): Bayer pattern (only RGGB implemented for now) + + Returns: + npt.NDArray: demosaiced RGB image + + References: + .. [1] Malvar et al. (2004), "High-quality linear interpolation for demosaicing of Bayer-patterned color images", + ICASSP 2004. + """ + + if cfa_pattern == "rggb": + if any(((L % 2) != 0) for L in raw.shape): + raise RuntimeError(f"Expected even-length raw array, got {(raw.shape)}") + + if torch.is_tensor(raw): + raw = raw.numpy() + + if not np.issubdtype(raw.dtype, np.floating): + raise RuntimeError(f"Expected floating-point array, got {raw.dtype}") + + R, G1, G2, B = raw[::2, ::2], raw[::2, 1::2], raw[1::2, ::2], raw[1::2, 1::2] + + rgb = np.zeros(raw.shape + (3,), dtype=raw.dtype) + rgb[::2, ::2, 0] = R + rgb[::2, 1::2, 1] = G1 + rgb[1::2, ::2, 1] = G2 + rgb[1::2, 1::2, 2] = B + + if method == "off": + return rgb + + if method in ["bilinear", "MHC04"]: + h_avg_fw_x = np.array([[0.0, 0.5, 0.5]]) + h_avg_bw_x = np.fliplr(h_avg_fw_x) + + rgb[::2, ::2, 1] = 0.5 * ( + correlate(G1, h_avg_bw_x, mode="nearest") + correlate(G2, h_avg_bw_x.T, mode="nearest") + ) + rgb[::2, ::2, 2] = correlate(correlate(B, h_avg_bw_x, mode="nearest"), h_avg_bw_x.T, mode="nearest") + + rgb[::2, 1::2, 0] = correlate(R, h_avg_fw_x, mode="nearest") + rgb[::2, 1::2, 2] = correlate(B, h_avg_bw_x.T, mode="nearest") + rgb[1::2, ::2, 0] = correlate(R, h_avg_fw_x.T, mode="nearest") + rgb[1::2, ::2, 2] = correlate(B, h_avg_bw_x, mode="nearest") + + rgb[1::2, 1::2, 0] = correlate(correlate(R, h_avg_fw_x, mode="nearest"), h_avg_fw_x.T, mode="nearest") + rgb[1::2, 1::2, 1] = 0.5 * ( + correlate(G1, h_avg_fw_x.T, mode="nearest") + correlate(G2, h_avg_fw_x, mode="nearest") + ) + + if method == "bilinear": + return rgb + + if method == "MHC04": + # Kernels in Fig. 2 of Malvar et al., along with weight factors for combination. + # This implementation is a little idiosyncratic because it realizes the 2D convolution + # via a sequence of 1D convolutions, probably not worth it in hindsight... + h_avg_nhood_x = np.array([[0.5, 0.0, 0.5]]) + + av_R: npt.NDArray = correlate( + correlate(R, h_avg_nhood_x, mode="nearest"), h_avg_nhood_x.T, mode="nearest" + ) + diff_R = R - av_R + + av_B: npt.NDArray = correlate( + correlate(B, h_avg_nhood_x, mode="nearest"), h_avg_nhood_x.T, mode="nearest" + ) + diff_B = B - av_B + + rgb[::2, ::2, 1] += (1 / 2) * diff_R + rgb[::2, ::2, 2] += (3 / 4) * diff_B + rgb[1::2, 1::2, 1] += (1 / 2) * diff_B + rgb[1::2, 1::2, 0] += (3 / 4) * diff_R + + av_G2_at_G1: npt.NDArray = correlate( + correlate(G2, h_avg_fw_x, mode="nearest"), h_avg_bw_x.T, mode="nearest" + ) + + av_G1_at_G1_R = correlate(G1, h_avg_nhood_x, mode="nearest") - correlate( + G1, 0.5 * h_avg_nhood_x.T, mode="nearest" + ) + av_G1_R: npt.NDArray = (1 / 5) * (2 * av_G1_at_G1_R + 4 * av_G2_at_G1) + diff_G1_R = G1 - av_G1_R + rgb[::2, 1::2, 0] += (5 / 8) * diff_G1_R + + av_G1_at_G1_B = correlate(G1, h_avg_nhood_x.T, mode="nearest") - correlate( + G1, 0.5 * h_avg_nhood_x, mode="nearest" + ) + av_G1_B = (1 / 5) * (2 * av_G1_at_G1_B + 4 * av_G2_at_G1) + diff_G1_B = G1 - av_G1_B + rgb[::2, 1::2, 2] += (5 / 8) * diff_G1_B + + av_G1_at_G2: npt.NDArray = correlate( + correlate(G1, h_avg_bw_x, mode="nearest"), h_avg_fw_x.T, mode="nearest" + ) + + av_G2_at_G2_R = correlate(G2, h_avg_nhood_x.T, mode="nearest") - correlate( + G2, 0.5 * h_avg_nhood_x, mode="nearest" + ) + av_G2_R = (1 / 5) * (4 * av_G1_at_G2 + 2 * av_G2_at_G2_R) + diff_G2_R = G2 - av_G2_R + rgb[1::2, ::2, 0] += (5 / 8) * diff_G2_R + + av_G2_at_G2_B = correlate(G2, h_avg_nhood_x, mode="nearest") - correlate( + G2, 0.5 * h_avg_nhood_x.T, mode="nearest" + ) + av_G2_B = (1 / 5) * (4 * av_G1_at_G2 + 2 * av_G2_at_G2_B) + diff_G2_B = G2 - av_G2_B + rgb[1::2, ::2, 2] += (5 / 8) * diff_G2_B + + return rgb + else: + raise NotImplementedError + return rgb diff --git a/visionsim/utils/imgproc.py b/visionsim/utils/imgproc.py new file mode 100644 index 0000000..ff4aa32 --- /dev/null +++ b/visionsim/utils/imgproc.py @@ -0,0 +1,58 @@ +from __future__ import annotations + +import numpy as np +import numpy.typing as npt +from scipy.ndimage import gaussian_filter + + +def unsharp_mask( + img: npt.ArrayLike, + sigma: float = 1.0, + amount: float = 1.0, +) -> npt.NDArray: + """Unsharp-masking to sharpen an image + + Borrows interface from scikit-image's version: + + + Args: + img (npt.ArrayLike, required): input image, can be 2-, 3-, or 4-channel + sigma (float, default = 1.0): controls extent of blurring + amount (float, default = 1.0): controls how much details are amplified by + + Returns: + output (npt.NDArray): img with unsharp mask applied, + same shape and dtype as img + + References: + .. [1] Wikipedia. Unsharp masking. + """ + + def _unsharp_mask1(img): + img_smooth = gaussian_filter(img, sigma) + if np.issubdtype(img.dtype, np.floating): + np.clip(img + (amount * (img - img_smooth)), 0, 1, out=img) + else: + # work with copy and convert back + if not isinstance(img, np.uint8): + raise NotImplementedError("_unsharp_mask1 expects (float | uint8)") + img_float = (1.0 / 255) * img.astype(np.float) + img_smooth_float = (1.0 / 255) * img_smooth.astype(np.float) + np.clip(img_float + (amount * (img_float - img_smooth_float)), 0, 1, out=img_float) + img[...] = (255 * img_float).astype(np.uint8) + return + + has_alpha = (img.ndim == 3) and (img.shape[2] in [2, 4]) + if has_alpha: + alpha_channel = img[:, :, -1:] + img = img[:, :, :-1] + has_color = (img.ndim == 3) and (img.shape[2] == 3) + if has_color: + # do each channel individually + for c in range(3): + _unsharp_mask1(img[:, :, c]) + else: + _unsharp_mask1(img) + if has_alpha: + img = np.concatenate((img, alpha_channel), axis=-1) + return img