Skip to content
41 changes: 40 additions & 1 deletion src/xradio/image/_util/_casacore/xds_to_casacore.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,48 @@ def _compute_direction_dict(xds: xr.Dataset) -> dict:
[xds.sizes[dim] for dim in ("l", "m")], dtype=np.int32
)
direction["_image_axes"] = np.array([2, 3], dtype=np.int32)
direction["system"] = xds_dir["reference_direction"]["attrs"]["equinox"].upper()
if "equinox" in xds_dir["reference_direction"]["attrs"]:
direction["system"] = xds_dir["reference_direction"]["attrs"]["equinox"].upper()
# Allow J2000.0 for backward compatibility; it will be normalized below.
allowed_systems = {
"FK5",
"FK4",
"ICRS",
"GALACTIC",
"J2000",
"B1950",
"J2000.0",
}
if direction["system"] not in allowed_systems:
raise RuntimeError(
f"Unsupported direction equinox '{direction['system']}'. "
"Supported systems are: FK5, FK4, ICRS, GALACTIC, J2000, B1950."
)
elif "frame" in xds_dir["reference_direction"]["attrs"]:
frame = xds_dir["reference_direction"]["attrs"]["frame"].upper()
frame_to_system = {
"FK5": "J2000",
"FK4": "B1950",
"ICRS": "ICRS",
"GALACTIC": "GALACTIC",
"J2000": "J2000",
"B1950": "B1950",
}
if frame not in frame_to_system:
raise RuntimeError(
f"Unsupported direction coordinate system frame '{frame}'. "
"Supported frames are: FK5, FK4, ICRS, GALACTIC, J2000, B1950."
)
direction["system"] = frame_to_system[frame]
else:
raise RuntimeError(
"Cannot determine direction coordinate system frame. "
f"direction metadata is {xds_dir}"
)
if direction["system"] == "J2000.0":
direction["system"] = "J2000"
elif direction["system"] == "B1950.0":
direction["system"] = "B1950"
direction["projection"] = xds_dir["projection"]
direction["projection_parameters"] = xds_dir["projection_parameters"]
direction["units"] = [
Expand Down
82 changes: 82 additions & 0 deletions tests/unit/image/test_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,88 @@ def clean_path_logic(text: str) -> str:
return text


def _make_test_sky_xds_for_casa_coord_write():
xds = make_empty_sky_image(
phase_center=np.array([0.2, -0.5]),
image_size=np.array([4, 4]),
cell_size=np.array([1e-4, 1e-4]),
frequency_coords=np.array([1.4e9]),
pol_coords=np.array(["I"]),
time_coords=np.array([51544.0]),
do_sky_coords=False,
direction_reference="fk5",
)
xds["SKY"] = xr.DataArray(
np.zeros((1, 1, 1, 4, 4), dtype=np.float32),
dims=["time", "frequency", "polarization", "l", "m"],
)
xds["SKY"].attrs["type"] = "sky"
xds.attrs["type"] = "image_dataset"
xds.attrs["data_groups"] = {"base": {"sky": "SKY"}}
return xds


def test_write_image_uses_reference_direction_frame_without_equinox(tmp_path):
xds = _make_test_sky_xds_for_casa_coord_write()
ref_dir_attrs = xds.attrs["coordinate_system_info"]["reference_direction"]["attrs"]
del ref_dir_attrs["equinox"]
outname = tmp_path / "frame_only.im"

write_image(xds, str(outname), "casa")

with open_image_ro(str(outname)) as im:
direction = im.coordinates().dict()["direction0"]

assert direction["system"] == "J2000"
assert direction["conversionSystem"] == "J2000"


def test_write_image_requires_reference_direction_frame_or_equinox(tmp_path):
xds = _make_test_sky_xds_for_casa_coord_write()
ref_dir_attrs = xds.attrs["coordinate_system_info"]["reference_direction"]["attrs"]
del ref_dir_attrs["equinox"]
del ref_dir_attrs["frame"]
outname = tmp_path / "missing_frame.im"

with pytest.raises(
RuntimeError,
match=r"Cannot determine direction coordinate system frame",
):
write_image(xds, str(outname), "casa")


def test_write_image_uses_fk4_frame_to_set_b1950(tmp_path):
xds = _make_test_sky_xds_for_casa_coord_write()
ref_dir_attrs = xds.attrs["coordinate_system_info"]["reference_direction"]["attrs"]
# Ensure the frame is FK4 and that we rely on the frame (not equinox) mapping
ref_dir_attrs["frame"] = "FK4"
ref_dir_attrs.pop("equinox", None)
outname = tmp_path / "fk4_frame.im"

write_image(xds, str(outname), "casa")

with open_image_ro(str(outname)) as im:
direction = im.coordinates().dict()["direction0"]

assert direction["system"] == "B1950"
assert direction["conversionSystem"] == "B1950"


def test_write_image_raises_for_unsupported_reference_direction_frame(tmp_path):
xds = _make_test_sky_xds_for_casa_coord_write()
ref_dir_attrs = xds.attrs["coordinate_system_info"]["reference_direction"]["attrs"]
# Set an unrecognized frame and remove equinox so that frame mapping is required
ref_dir_attrs["frame"] = "UNSUPPORTED"
ref_dir_attrs.pop("equinox", None)
outname = tmp_path / "unsupported_frame.im"

with pytest.raises(
RuntimeError,
match=r"Unsupported direction coordinate system frame",
):
write_image(xds, str(outname), "casa")


def test_load_visibility_normalization_block_squeezes_spatial_axes(tmp_path):
imagename = tmp_path / "synthetic.sumwt"
data = np.arange(8, dtype=np.float32).reshape(4, 2, 1, 1)
Expand Down
Loading