Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 71 additions & 21 deletions crawl4ai/async_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,31 +38,69 @@ def __init__(
self.max_retries = max_retries
self.rate_limit_codes = rate_limit_codes or [429, 503]
self.domains: Dict[str, DomainState] = {}
self._domain_locks: Dict[str, asyncio.Lock] = {}

def get_domain(self, url: str) -> str:
return urlparse(url).netloc

def _get_domain_lock(self, domain: str) -> asyncio.Lock:
"""Get or create a per-domain lock for serializing requests."""
if domain not in self._domain_locks:
self._domain_locks[domain] = asyncio.Lock()
return self._domain_locks[domain]

async def wait_if_needed(self, url: str) -> None:
domain = self.get_domain(url)
state = self.domains.get(domain)
lock = self._get_domain_lock(domain)

# Serialize concurrent requests to the same domain so each
# task waits its proper turn instead of all reading the same
# last_request_time and firing together (issue #1095).
async with lock:
state = self.domains.get(domain)

if not state:
self.domains[domain] = DomainState()
state = self.domains[domain]

now = time.time()
if state.last_request_time:
wait_time = max(0, state.current_delay - (now - state.last_request_time))
if wait_time > 0:
await asyncio.sleep(wait_time)

if not state:
self.domains[domain] = DomainState()
state = self.domains[domain]
# Random delay within base range if no current delay
if state.current_delay == 0:
state.current_delay = random.uniform(*self.base_delay)

now = time.time()
if state.last_request_time:
wait_time = max(0, state.current_delay - (now - state.last_request_time))
if wait_time > 0:
await asyncio.sleep(wait_time)
state.last_request_time = time.time()

def _parse_retry_after(self, value: str) -> Optional[float]:
"""Parse a Retry-After header value into seconds.

Supports both delay-seconds (e.g. "5") and HTTP-date formats.
Returns None if the value cannot be parsed.
"""
# Try as integer seconds first
try:
return float(value)
except (ValueError, TypeError):
pass

# Random delay within base range if no current delay
if state.current_delay == 0:
state.current_delay = random.uniform(*self.base_delay)
# Try as HTTP-date (e.g. "Wed, 21 Oct 2015 07:28:00 GMT")
from email.utils import parsedate_to_datetime
try:
retry_date = parsedate_to_datetime(value)
from datetime import datetime, timezone
delay = (retry_date - datetime.now(timezone.utc)).total_seconds()
return max(0, delay)
except (ValueError, TypeError):
pass

state.last_request_time = time.time()
return None

def update_delay(self, url: str, status_code: int) -> bool:
def update_delay(self, url: str, status_code: int,
response_headers: Optional[Dict[str, str]] = None) -> bool:
domain = self.get_domain(url)
state = self.domains[domain]

Expand All @@ -71,10 +109,22 @@ def update_delay(self, url: str, status_code: int) -> bool:
if state.fail_count > self.max_retries:
return False

# Exponential backoff with random jitter
state.current_delay = min(
state.current_delay * 2 * random.uniform(0.75, 1.25), self.max_delay
)
# Use Retry-After header if available (issue #1095),
# otherwise fall back to exponential backoff with jitter.
retry_after = None
if response_headers:
raw = (response_headers.get("Retry-After")
or response_headers.get("retry-after"))
if raw:
retry_after = self._parse_retry_after(raw)

if retry_after is not None:
state.current_delay = min(retry_after, self.max_delay)
else:
# Exponential backoff with random jitter
state.current_delay = min(
state.current_delay * 2 * random.uniform(0.75, 1.25), self.max_delay
)
else:
# Gradually reduce delay on success
state.current_delay = max(
Expand Down Expand Up @@ -325,11 +375,11 @@ async def crawl_url(

# Handle rate limiting
if self.rate_limiter and result.status_code:
if not self.rate_limiter.update_delay(url, result.status_code):
if not self.rate_limiter.update_delay(url, result.status_code, result.response_headers):
error_message = f"Rate limit retry count exceeded for domain {urlparse(url).netloc}"
if self.monitor:
self.monitor.update_task(task_id, status=CrawlStatus.FAILED)

# Update status based on result
if not result.success:
error_message = result.error_message
Expand Down Expand Up @@ -691,7 +741,7 @@ async def crawl_url(
memory_usage = peak_memory = end_memory - start_memory

if self.rate_limiter and result.status_code:
if not self.rate_limiter.update_delay(url, result.status_code):
if not self.rate_limiter.update_delay(url, result.status_code, result.response_headers):
error_message = f"Rate limit retry count exceeded for domain {urlparse(url).netloc}"
if self.monitor:
self.monitor.update_task(task_id, status=CrawlStatus.FAILED)
Expand Down
13 changes: 10 additions & 3 deletions crawl4ai/deep_crawling/bff_strategy.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
# best_first_crawling_strategy.py
from __future__ import annotations

import asyncio
import logging
from datetime import datetime
from typing import AsyncGenerator, Optional, Set, Dict, List, Tuple, Any, Callable, Awaitable, Union
from typing import AsyncGenerator, Optional, Set, Dict, List, Tuple, Any, Callable, Awaitable, Union, TYPE_CHECKING
from urllib.parse import urlparse

from ..models import TraversalStats
from .filters import FilterChain
from .scorers import URLScorer
from . import DeepCrawlStrategy

from ..types import AsyncWebCrawler, CrawlerRunConfig, CrawlResult, RunManyReturn
from ..utils import normalize_url_for_deep_crawl

if TYPE_CHECKING:
from ..async_dispatcher import BaseDispatcher

from math import inf as infinity

# Configurable batch size for processing items from the priority queue
Expand Down Expand Up @@ -47,6 +51,8 @@ def __init__(
on_state_change: Optional[Callable[[Dict[str, Any]], Awaitable[None]]] = None,
# Optional cancellation callback - checked before each URL is processed
should_cancel: Optional[Callable[[], Union[bool, Awaitable[bool]]]] = None,
# Optional dispatcher for rate limiting control (issue #1095)
dispatcher: Optional[BaseDispatcher] = None,
):
self.max_depth = max_depth
self.filter_chain = filter_chain
Expand All @@ -69,6 +75,7 @@ def __init__(
self._on_state_change = on_state_change
self._should_cancel = should_cancel
self._last_state: Optional[Dict[str, Any]] = None
self.dispatcher = dispatcher
# Shadow list for queue items (only used when on_state_change is set)
self._queue_shadow: Optional[List[Tuple[float, int, str, Optional[str]]]] = None

Expand Down Expand Up @@ -275,7 +282,7 @@ async def _arun_best_first(
# Process the current batch of URLs.
urls = [item[2] for item in batch]
batch_config = config.clone(deep_crawl_strategy=None, stream=True)
stream_gen = await crawler.arun_many(urls=urls, config=batch_config)
stream_gen = await crawler.arun_many(urls=urls, config=batch_config, dispatcher=self.dispatcher)
async for result in stream_gen:
result_url = result.url
# Find the corresponding tuple from the batch.
Expand Down
18 changes: 13 additions & 5 deletions crawl4ai/deep_crawling/bfs_strategy.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,27 @@
# bfs_deep_crawl_strategy.py
from __future__ import annotations

import asyncio
import logging
from datetime import datetime
from typing import AsyncGenerator, Optional, Set, Dict, List, Tuple, Any, Callable, Awaitable, Union
from typing import AsyncGenerator, Optional, Set, Dict, List, Tuple, Any, Callable, Awaitable, Union, TYPE_CHECKING
from urllib.parse import urlparse

from ..models import TraversalStats
from .filters import FilterChain
from .scorers import URLScorer
from . import DeepCrawlStrategy
from . import DeepCrawlStrategy
from ..types import AsyncWebCrawler, CrawlerRunConfig, CrawlResult
from ..utils import normalize_url_for_deep_crawl, efficient_normalize_url_for_deep_crawl
from math import inf as infinity

if TYPE_CHECKING:
from ..async_dispatcher import BaseDispatcher

class BFSDeepCrawlStrategy(DeepCrawlStrategy):
"""
Breadth-First Search deep crawling strategy.

Core functions:
- arun: Main entry point; splits execution into batch or stream modes.
- link_discovery: Extracts, filters, and (if needed) scores the outgoing URLs.
Expand All @@ -36,6 +41,8 @@ def __init__(
on_state_change: Optional[Callable[[Dict[str, Any]], Awaitable[None]]] = None,
# Optional cancellation callback - checked before each URL is processed
should_cancel: Optional[Callable[[], Union[bool, Awaitable[bool]]]] = None,
# Optional dispatcher for rate limiting control (issue #1095)
dispatcher: Optional[BaseDispatcher] = None,
):
self.max_depth = max_depth
self.filter_chain = filter_chain
Expand All @@ -58,6 +65,7 @@ def __init__(
self._on_state_change = on_state_change
self._should_cancel = should_cancel
self._last_state: Optional[Dict[str, Any]] = None
self.dispatcher = dispatcher

async def can_process_url(self, url: str, depth: int) -> bool:
"""
Expand Down Expand Up @@ -251,7 +259,7 @@ async def _arun_batch(

# Clone the config to disable deep crawling recursion and enforce batch mode.
batch_config = config.clone(deep_crawl_strategy=None, stream=False)
batch_results = await crawler.arun_many(urls=urls, config=batch_config)
batch_results = await crawler.arun_many(urls=urls, config=batch_config, dispatcher=self.dispatcher)

for result in batch_results:
url = result.url
Expand Down Expand Up @@ -339,7 +347,7 @@ async def _arun_stream(
visited.update(urls)

stream_config = config.clone(deep_crawl_strategy=None, stream=True)
stream_gen = await crawler.arun_many(urls=urls, config=stream_config)
stream_gen = await crawler.arun_many(urls=urls, config=stream_config, dispatcher=self.dispatcher)

# Keep track of processed results for this batch
results_count = 0
Expand Down
4 changes: 2 additions & 2 deletions crawl4ai/deep_crawling/dfs_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ async def _arun_batch(

# Clone config to disable recursive deep crawling.
batch_config = config.clone(deep_crawl_strategy=None, stream=False)
url_results = await crawler.arun_many(urls=[url], config=batch_config)
url_results = await crawler.arun_many(urls=[url], config=batch_config, dispatcher=self.dispatcher)

for result in url_results:
result.metadata = result.metadata or {}
Expand Down Expand Up @@ -183,7 +183,7 @@ async def _arun_stream(
visited.add(url)

stream_config = config.clone(deep_crawl_strategy=None, stream=True)
stream_gen = await crawler.arun_many(urls=[url], config=stream_config)
stream_gen = await crawler.arun_many(urls=[url], config=stream_config, dispatcher=self.dispatcher)
async for result in stream_gen:
result.metadata = result.metadata or {}
result.metadata["depth"] = depth
Expand Down