diff --git a/src/xradio/image/_util/_casacore/xds_to_casacore.py b/src/xradio/image/_util/_casacore/xds_to_casacore.py index 82bc9abf..7ffd442f 100644 --- a/src/xradio/image/_util/_casacore/xds_to_casacore.py +++ b/src/xradio/image/_util/_casacore/xds_to_casacore.py @@ -40,9 +40,48 @@ def _compute_direction_dict(xds: xr.Dataset) -> dict: [xds.sizes[dim] for dim in ("l", "m")], dtype=np.int32 ) direction["_image_axes"] = np.array([2, 3], dtype=np.int32) - direction["system"] = xds_dir["reference_direction"]["attrs"]["equinox"].upper() + if "equinox" in xds_dir["reference_direction"]["attrs"]: + direction["system"] = xds_dir["reference_direction"]["attrs"]["equinox"].upper() + # Allow J2000.0 for backward compatibility; it will be normalized below. + allowed_systems = { + "FK5", + "FK4", + "ICRS", + "GALACTIC", + "J2000", + "B1950", + "J2000.0", + } + if direction["system"] not in allowed_systems: + raise RuntimeError( + f"Unsupported direction equinox '{direction['system']}'. " + "Supported systems are: FK5, FK4, ICRS, GALACTIC, J2000, B1950." + ) + elif "frame" in xds_dir["reference_direction"]["attrs"]: + frame = xds_dir["reference_direction"]["attrs"]["frame"].upper() + frame_to_system = { + "FK5": "J2000", + "FK4": "B1950", + "ICRS": "ICRS", + "GALACTIC": "GALACTIC", + "J2000": "J2000", + "B1950": "B1950", + } + if frame not in frame_to_system: + raise RuntimeError( + f"Unsupported direction coordinate system frame '{frame}'. " + "Supported frames are: FK5, FK4, ICRS, GALACTIC, J2000, B1950." + ) + direction["system"] = frame_to_system[frame] + else: + raise RuntimeError( + "Cannot determine direction coordinate system frame. " + f"direction metadata is {xds_dir}" + ) if direction["system"] == "J2000.0": direction["system"] = "J2000" + elif direction["system"] == "B1950.0": + direction["system"] = "B1950" direction["projection"] = xds_dir["projection"] direction["projection_parameters"] = xds_dir["projection_parameters"] direction["units"] = [ diff --git a/tests/unit/image/test_image.py b/tests/unit/image/test_image.py index 56b57032..5bc18bcb 100644 --- a/tests/unit/image/test_image.py +++ b/tests/unit/image/test_image.py @@ -57,6 +57,88 @@ def clean_path_logic(text: str) -> str: return text +def _make_test_sky_xds_for_casa_coord_write(): + xds = make_empty_sky_image( + phase_center=np.array([0.2, -0.5]), + image_size=np.array([4, 4]), + cell_size=np.array([1e-4, 1e-4]), + frequency_coords=np.array([1.4e9]), + pol_coords=np.array(["I"]), + time_coords=np.array([51544.0]), + do_sky_coords=False, + direction_reference="fk5", + ) + xds["SKY"] = xr.DataArray( + np.zeros((1, 1, 1, 4, 4), dtype=np.float32), + dims=["time", "frequency", "polarization", "l", "m"], + ) + xds["SKY"].attrs["type"] = "sky" + xds.attrs["type"] = "image_dataset" + xds.attrs["data_groups"] = {"base": {"sky": "SKY"}} + return xds + + +def test_write_image_uses_reference_direction_frame_without_equinox(tmp_path): + xds = _make_test_sky_xds_for_casa_coord_write() + ref_dir_attrs = xds.attrs["coordinate_system_info"]["reference_direction"]["attrs"] + del ref_dir_attrs["equinox"] + outname = tmp_path / "frame_only.im" + + write_image(xds, str(outname), "casa") + + with open_image_ro(str(outname)) as im: + direction = im.coordinates().dict()["direction0"] + + assert direction["system"] == "J2000" + assert direction["conversionSystem"] == "J2000" + + +def test_write_image_requires_reference_direction_frame_or_equinox(tmp_path): + xds = _make_test_sky_xds_for_casa_coord_write() + ref_dir_attrs = xds.attrs["coordinate_system_info"]["reference_direction"]["attrs"] + del ref_dir_attrs["equinox"] + del ref_dir_attrs["frame"] + outname = tmp_path / "missing_frame.im" + + with pytest.raises( + RuntimeError, + match=r"Cannot determine direction coordinate system frame", + ): + write_image(xds, str(outname), "casa") + + +def test_write_image_uses_fk4_frame_to_set_b1950(tmp_path): + xds = _make_test_sky_xds_for_casa_coord_write() + ref_dir_attrs = xds.attrs["coordinate_system_info"]["reference_direction"]["attrs"] + # Ensure the frame is FK4 and that we rely on the frame (not equinox) mapping + ref_dir_attrs["frame"] = "FK4" + ref_dir_attrs.pop("equinox", None) + outname = tmp_path / "fk4_frame.im" + + write_image(xds, str(outname), "casa") + + with open_image_ro(str(outname)) as im: + direction = im.coordinates().dict()["direction0"] + + assert direction["system"] == "B1950" + assert direction["conversionSystem"] == "B1950" + + +def test_write_image_raises_for_unsupported_reference_direction_frame(tmp_path): + xds = _make_test_sky_xds_for_casa_coord_write() + ref_dir_attrs = xds.attrs["coordinate_system_info"]["reference_direction"]["attrs"] + # Set an unrecognized frame and remove equinox so that frame mapping is required + ref_dir_attrs["frame"] = "UNSUPPORTED" + ref_dir_attrs.pop("equinox", None) + outname = tmp_path / "unsupported_frame.im" + + with pytest.raises( + RuntimeError, + match=r"Unsupported direction coordinate system frame", + ): + write_image(xds, str(outname), "casa") + + def test_load_visibility_normalization_block_squeezes_spatial_axes(tmp_path): imagename = tmp_path / "synthetic.sumwt" data = np.arange(8, dtype=np.float32).reshape(4, 2, 1, 1)