Skip to content
87 changes: 86 additions & 1 deletion astrodb_utils/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,65 @@ def coords_from_simbad(source):

return simbad_skycoord

def simbad_name_resolvable(source, ra, dec):
"""
Checks whether a given astronomical source name is resolvable in the SIMBAD database,
and retrieves alternate resolvable names at the specified coordinates.

Parameters:
----------
source : str
The name of the astronomical source to check in SIMBAD.
ra : float
Right ascension of the source (in degrees).
dec : float
Declination of the source (in degrees).

Returns:
-------
simbad_resolvable : bool
True if the provided source name matches an entry in SIMBAD; False otherwise.
alternate_names : list of str
List of alternate SIMBAD-resolvable names found at the given coordinates.

Notes:
-----
- Uses `Simbad.query_object()` to determine if the source name is directly resolvable.
- If successful, compares returned `main_id` and `matched_id` to the original source name.
- Uses `Simbad.query_region()` to identify alternate names at the specified coordinates.
- Logs debug information for traceability and warnings when resolution fails.
"""

#check name of match is SIMBAD resolvable
simbad_resolvable = False
#search SIMBAD for the source using its name
simbad_result_table = Simbad.query_object(source)
if simbad_result_table is None:
#name is not resolvable or not in SIMBAD database
logger.debug(f"SIMBAD returned no results for {source}")

elif len(simbad_result_table) == 1:
logger.debug(
f"simbad colnames: {simbad_result_table.colnames} \n simbad results \n {simbad_result_table}"
)
simbad_name = f"{simbad_result_table['main_id'][0]}"
other_name = f"{simbad_result_table['matched_id'][0]}"
logger.debug(f"SIMBAD name string: {simbad_name}")
if (simbad_name == source) or (source == other_name):
simbad_resolvable= True
else:
msg = f"Name not resolvable in SIMBAD for {source}"
logger.warning(msg)

#search SIMBAD using coordinates and return list of possible names that are resolvable in SIMBAD
simbad_coord_result = Simbad.query_region(SkyCoord(ra = ra, dec = dec, unit = "deg"))
print(f"Alternate names for {source} that are resolvable in SIMBAD: {simbad_coord_result['main_id']}")

return simbad_resolvable, simbad_coord_result['main_id']

# NAMES
def ingest_name(
db, source: str = None, other_name: str = None, raise_error: bool = None
db, source: str = None, other_name: str = None, raise_error: bool = None, use_simbad: bool = True
):
"""
This function ingests an other name into the Names table
Expand All @@ -229,6 +284,13 @@ def ingest_name(
Name of the source different than that found in source table
raise_error: bool
Raise an error if name was not ingested
ra: float
Right ascensions of sources. Decimal degrees.
dec: float
Declinations of sources. Decimal degrees.
use_simbad: bool
True (default): Use Simbad to resolve the source name if it is not found in the database
False: Do not use Simbad to resolve the source name. Or when internet is unavailable

Returns
-------
Expand All @@ -240,6 +302,18 @@ def ingest_name(
"""
source = strip_unicode_dashes(source)
other_name = strip_unicode_dashes(other_name)
if use_simbad:
#check if name is resolvable in SIMBAD
logger.debug(f"{source}: Checking if name is resolvable in SIMBAD")
resolvable = simbad_name_resolvable(source = source)
if not resolvable:
msg1= f"{source} not resolvable in SIMBAD."
msg2 = f"Use the simbad_name_resolvable function to find possible names"
exit_function(msg1+ msg2, raise_error)
else:
logger.info(f"{source} is resolvable in SIMBAD.")


name_data = [{"source": source, "other_name": other_name}]
try:
with db.engine.connect() as conn:
Expand Down Expand Up @@ -331,6 +405,17 @@ def ingest_source(
)
exit_function(msg, raise_error)
return

#Check if source name is resolvable in SIMBAD
logger.debug(f"{source}: Checking if name is resolvable in SIMBAD")
resolvable = simbad_name_resolvable(source = source, ra = ra, dec = dec)
if not resolvable:
msg1= f"{source} not resolvable in SIMBAD."
msg2 = f"Some alternative names for {source}: {resolvable[1]}"
exit_function(msg1+ msg2, raise_error)
else:
logger.info(f"{source} is resolvable in SIMBAD.")


# Find out if source is already in database or not
if search_db:
Expand Down
46 changes: 46 additions & 0 deletions tests/test_sources.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import math
import re

import astropy.units as u
import pytest
Expand All @@ -12,6 +13,7 @@
ingest_name,
ingest_source,
strip_unicode_dashes,
simbad_name_resolvable
)


Expand Down Expand Up @@ -262,3 +264,47 @@ def test_ingest_name(db):
def test_strip_unicode_dashes(input, expected):
result = strip_unicode_dashes(input)
assert result == expected

@pytest.mark.parametrize('input,ra, dec, expected, expected_names', [
#2 cases whose names are NOT in the database
("Apple", 144.395292,29.528028, False,
["2MASSI J0937347+293142", "WISE J093742.35+293220.7"]),

("Banana", 3.9888,4.2511, False,
["HD 1160", "HD 1160B", "HD 1160C", "TYC 5-669-1"]),

#2 cases whose names are in the database
("2MASS J07222760-0540384",110.6149995,-5.677333, True,
["2MASS J07222760-0540384",
"[BGM2011b] J072227.56-054034.4",
"[BGM2011b] J072227.79-054034.7",
"[BGM2011b] J072227.64-054033.0",
"WISE J072227.27-054029.9",
"[BGM2011b] J072227.02-054039.2",
"[BGM2011b] J072227.28-054030.0",
"[BGM2011b] J072227.34-054026.8",
"[BGM2011b] J072226.97-054029.4",
"UCAC3 169-73338",
"TYC 4829-156-1"]),

("ULAS J000734.90+011247.1", 1.8957, 1.2132, True,
["ULAS J000734.90+011247.1",
"2MASS J00073939+0113049",
"SDSS J000741.43+011209.4",
"SDSS J000729.23+011346.4",
"2MASS J00073597+0110548"])
])

def test_simbad_resolvable_names(input,ra,dec,expected, expected_names):
result = simbad_name_resolvable(input, ra, dec)

#SIMBAD returns actual names with weird white spaces... so remove all white spaces from both lists of names
# Remove all spaces from actual names
actual_names_list = set([re.sub(r'\s+', '', str(name)) for name in result[1]])

# Remove all spaces from expected names
expected_names_list = set([re.sub(r'\s+', '', name) for name in expected_names])

assert result[0] == expected
assert actual_names_list == expected_names_list