Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions checkdmarc/spf.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,17 +437,22 @@ def query_spf_record(
)
spf_record = None
for record in answers:
if record == "Undecodable characters":
raise UndecodableCharactersInTXTRecord(
"A TXT record contains undecodable characters."
)
# https://datatracker.ietf.org/doc/html/rfc7208#section-4.5
#
# Starting with the set of records that were returned by the lookup,
# discard records that do not begin with a version section of exactly
# "v=spf1". Note that the version section is terminated by either an
# SP character or the end of the record. As an example, a record with
# a version section of "v=spf10" does not match and is discarded.

# Check for undecodable characters
if record == "Undecodable characters":
# We can't determine if this is an SPF record due to encoding issues
warnings.append(
"A TXT record with undecodable characters was skipped."
)
continue

if record.strip('"').startswith(txt_prefix):
spf_txt_records.append(record)
elif record.startswith(txt_prefix):
Expand Down Expand Up @@ -1235,14 +1240,16 @@ def get_spf_record(
:exc:`checkdmarc.spf.SPFTooManyDNSLookups`
"""
domain = normalize_domain(domain)
record = query_spf_record(
query_result = query_spf_record(
domain,
nameservers=nameservers,
resolver=resolver,
timeout=timeout,
timeout_retries=timeout_retries,
)
record = record["record"]
record = query_result["record"]
query_warnings = query_result.get("warnings", [])

parsed_record = parse_spf_record(
record,
domain,
Expand All @@ -1252,6 +1259,11 @@ def get_spf_record(
timeout_retries=timeout_retries,
)
parsed_record["record"] = record

# Merge warnings from query_spf_record with warnings from parse_spf_record
if query_warnings:
parsed_record["warnings"] = query_warnings + parsed_record.get("warnings", [])

return parsed_record


Expand Down
32 changes: 32 additions & 0 deletions tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import os
import unittest
from unittest.mock import patch

import checkdmarc
import checkdmarc.bimi
Expand Down Expand Up @@ -561,6 +562,37 @@ def testSPFBrokenExpModifierMacro(self):
domain,
)

def testUndecodableCharactersInNonSPFRecord(self):
"""Non-SPF TXT records with undecodable characters should be ignored with a warning"""
domain = "example.com"

# Mock query_dns to return:
# 1. An undecodable non-SPF TXT record
# 2. A valid SPF record
with patch('checkdmarc.spf.query_dns') as mock_query_dns:
# First call for SPF type records (returns empty)
# Second call for TXT records (returns undecodable + valid SPF)
mock_query_dns.side_effect = [
[], # No SPF type records
[
"Undecodable characters", # TXT record with undecodable chars
'"v=spf1 include:spf.smtp2go.com -all"' # Valid SPF record
]
]

# This should succeed and return the valid SPF record
result = checkdmarc.spf.get_spf_record(domain)

# Verify the SPF record was found
self.assertIsNotNone(result["record"])
self.assertIn("v=spf1", result["record"])

# Verify a warning was added for the undecodable record
self.assertTrue(len(result["warnings"]) > 0)
self.assertTrue(
any("TXT record" in w and "undecodable" in w.lower() for w in result["warnings"])
)


if __name__ == "__main__":
unittest.main(verbosity=2)
Loading