diff --git a/crawl4ai/async_dispatcher.py b/crawl4ai/async_dispatcher.py index 1d6a236b7..86c09c171 100644 --- a/crawl4ai/async_dispatcher.py +++ b/crawl4ai/async_dispatcher.py @@ -38,31 +38,69 @@ def __init__( self.max_retries = max_retries self.rate_limit_codes = rate_limit_codes or [429, 503] self.domains: Dict[str, DomainState] = {} + self._domain_locks: Dict[str, asyncio.Lock] = {} def get_domain(self, url: str) -> str: return urlparse(url).netloc + def _get_domain_lock(self, domain: str) -> asyncio.Lock: + """Get or create a per-domain lock for serializing requests.""" + if domain not in self._domain_locks: + self._domain_locks[domain] = asyncio.Lock() + return self._domain_locks[domain] + async def wait_if_needed(self, url: str) -> None: domain = self.get_domain(url) - state = self.domains.get(domain) + lock = self._get_domain_lock(domain) + + # Serialize concurrent requests to the same domain so each + # task waits its proper turn instead of all reading the same + # last_request_time and firing together (issue #1095). + async with lock: + state = self.domains.get(domain) + + if not state: + self.domains[domain] = DomainState() + state = self.domains[domain] + + now = time.time() + if state.last_request_time: + wait_time = max(0, state.current_delay - (now - state.last_request_time)) + if wait_time > 0: + await asyncio.sleep(wait_time) - if not state: - self.domains[domain] = DomainState() - state = self.domains[domain] + # Random delay within base range if no current delay + if state.current_delay == 0: + state.current_delay = random.uniform(*self.base_delay) - now = time.time() - if state.last_request_time: - wait_time = max(0, state.current_delay - (now - state.last_request_time)) - if wait_time > 0: - await asyncio.sleep(wait_time) + state.last_request_time = time.time() + + def _parse_retry_after(self, value: str) -> Optional[float]: + """Parse a Retry-After header value into seconds. + + Supports both delay-seconds (e.g. "5") and HTTP-date formats. + Returns None if the value cannot be parsed. + """ + # Try as integer seconds first + try: + return float(value) + except (ValueError, TypeError): + pass - # Random delay within base range if no current delay - if state.current_delay == 0: - state.current_delay = random.uniform(*self.base_delay) + # Try as HTTP-date (e.g. "Wed, 21 Oct 2015 07:28:00 GMT") + from email.utils import parsedate_to_datetime + try: + retry_date = parsedate_to_datetime(value) + from datetime import datetime, timezone + delay = (retry_date - datetime.now(timezone.utc)).total_seconds() + return max(0, delay) + except (ValueError, TypeError): + pass - state.last_request_time = time.time() + return None - def update_delay(self, url: str, status_code: int) -> bool: + def update_delay(self, url: str, status_code: int, + response_headers: Optional[Dict[str, str]] = None) -> bool: domain = self.get_domain(url) state = self.domains[domain] @@ -71,10 +109,22 @@ def update_delay(self, url: str, status_code: int) -> bool: if state.fail_count > self.max_retries: return False - # Exponential backoff with random jitter - state.current_delay = min( - state.current_delay * 2 * random.uniform(0.75, 1.25), self.max_delay - ) + # Use Retry-After header if available (issue #1095), + # otherwise fall back to exponential backoff with jitter. + retry_after = None + if response_headers: + raw = (response_headers.get("Retry-After") + or response_headers.get("retry-after")) + if raw: + retry_after = self._parse_retry_after(raw) + + if retry_after is not None: + state.current_delay = min(retry_after, self.max_delay) + else: + # Exponential backoff with random jitter + state.current_delay = min( + state.current_delay * 2 * random.uniform(0.75, 1.25), self.max_delay + ) else: # Gradually reduce delay on success state.current_delay = max( @@ -325,11 +375,11 @@ async def crawl_url( # Handle rate limiting if self.rate_limiter and result.status_code: - if not self.rate_limiter.update_delay(url, result.status_code): + if not self.rate_limiter.update_delay(url, result.status_code, result.response_headers): error_message = f"Rate limit retry count exceeded for domain {urlparse(url).netloc}" if self.monitor: self.monitor.update_task(task_id, status=CrawlStatus.FAILED) - + # Update status based on result if not result.success: error_message = result.error_message @@ -691,7 +741,7 @@ async def crawl_url( memory_usage = peak_memory = end_memory - start_memory if self.rate_limiter and result.status_code: - if not self.rate_limiter.update_delay(url, result.status_code): + if not self.rate_limiter.update_delay(url, result.status_code, result.response_headers): error_message = f"Rate limit retry count exceeded for domain {urlparse(url).netloc}" if self.monitor: self.monitor.update_task(task_id, status=CrawlStatus.FAILED) diff --git a/crawl4ai/deep_crawling/bff_strategy.py b/crawl4ai/deep_crawling/bff_strategy.py index 072ec2f9a..d914c3591 100644 --- a/crawl4ai/deep_crawling/bff_strategy.py +++ b/crawl4ai/deep_crawling/bff_strategy.py @@ -1,18 +1,22 @@ # best_first_crawling_strategy.py +from __future__ import annotations + import asyncio import logging from datetime import datetime -from typing import AsyncGenerator, Optional, Set, Dict, List, Tuple, Any, Callable, Awaitable, Union +from typing import AsyncGenerator, Optional, Set, Dict, List, Tuple, Any, Callable, Awaitable, Union, TYPE_CHECKING from urllib.parse import urlparse from ..models import TraversalStats from .filters import FilterChain from .scorers import URLScorer from . import DeepCrawlStrategy - from ..types import AsyncWebCrawler, CrawlerRunConfig, CrawlResult, RunManyReturn from ..utils import normalize_url_for_deep_crawl +if TYPE_CHECKING: + from ..async_dispatcher import BaseDispatcher + from math import inf as infinity # Configurable batch size for processing items from the priority queue @@ -47,6 +51,8 @@ def __init__( on_state_change: Optional[Callable[[Dict[str, Any]], Awaitable[None]]] = None, # Optional cancellation callback - checked before each URL is processed should_cancel: Optional[Callable[[], Union[bool, Awaitable[bool]]]] = None, + # Optional dispatcher for rate limiting control (issue #1095) + dispatcher: Optional[BaseDispatcher] = None, ): self.max_depth = max_depth self.filter_chain = filter_chain @@ -69,6 +75,7 @@ def __init__( self._on_state_change = on_state_change self._should_cancel = should_cancel self._last_state: Optional[Dict[str, Any]] = None + self.dispatcher = dispatcher # Shadow list for queue items (only used when on_state_change is set) self._queue_shadow: Optional[List[Tuple[float, int, str, Optional[str]]]] = None @@ -275,7 +282,7 @@ async def _arun_best_first( # Process the current batch of URLs. urls = [item[2] for item in batch] batch_config = config.clone(deep_crawl_strategy=None, stream=True) - stream_gen = await crawler.arun_many(urls=urls, config=batch_config) + stream_gen = await crawler.arun_many(urls=urls, config=batch_config, dispatcher=self.dispatcher) async for result in stream_gen: result_url = result.url # Find the corresponding tuple from the batch. diff --git a/crawl4ai/deep_crawling/bfs_strategy.py b/crawl4ai/deep_crawling/bfs_strategy.py index dfb759272..0e60b2b58 100644 --- a/crawl4ai/deep_crawling/bfs_strategy.py +++ b/crawl4ai/deep_crawling/bfs_strategy.py @@ -1,22 +1,27 @@ # bfs_deep_crawl_strategy.py +from __future__ import annotations + import asyncio import logging from datetime import datetime -from typing import AsyncGenerator, Optional, Set, Dict, List, Tuple, Any, Callable, Awaitable, Union +from typing import AsyncGenerator, Optional, Set, Dict, List, Tuple, Any, Callable, Awaitable, Union, TYPE_CHECKING from urllib.parse import urlparse from ..models import TraversalStats from .filters import FilterChain from .scorers import URLScorer -from . import DeepCrawlStrategy +from . import DeepCrawlStrategy from ..types import AsyncWebCrawler, CrawlerRunConfig, CrawlResult from ..utils import normalize_url_for_deep_crawl, efficient_normalize_url_for_deep_crawl from math import inf as infinity +if TYPE_CHECKING: + from ..async_dispatcher import BaseDispatcher + class BFSDeepCrawlStrategy(DeepCrawlStrategy): """ Breadth-First Search deep crawling strategy. - + Core functions: - arun: Main entry point; splits execution into batch or stream modes. - link_discovery: Extracts, filters, and (if needed) scores the outgoing URLs. @@ -36,6 +41,8 @@ def __init__( on_state_change: Optional[Callable[[Dict[str, Any]], Awaitable[None]]] = None, # Optional cancellation callback - checked before each URL is processed should_cancel: Optional[Callable[[], Union[bool, Awaitable[bool]]]] = None, + # Optional dispatcher for rate limiting control (issue #1095) + dispatcher: Optional[BaseDispatcher] = None, ): self.max_depth = max_depth self.filter_chain = filter_chain @@ -58,6 +65,7 @@ def __init__( self._on_state_change = on_state_change self._should_cancel = should_cancel self._last_state: Optional[Dict[str, Any]] = None + self.dispatcher = dispatcher async def can_process_url(self, url: str, depth: int) -> bool: """ @@ -251,7 +259,7 @@ async def _arun_batch( # Clone the config to disable deep crawling recursion and enforce batch mode. batch_config = config.clone(deep_crawl_strategy=None, stream=False) - batch_results = await crawler.arun_many(urls=urls, config=batch_config) + batch_results = await crawler.arun_many(urls=urls, config=batch_config, dispatcher=self.dispatcher) for result in batch_results: url = result.url @@ -339,7 +347,7 @@ async def _arun_stream( visited.update(urls) stream_config = config.clone(deep_crawl_strategy=None, stream=True) - stream_gen = await crawler.arun_many(urls=urls, config=stream_config) + stream_gen = await crawler.arun_many(urls=urls, config=stream_config, dispatcher=self.dispatcher) # Keep track of processed results for this batch results_count = 0 diff --git a/crawl4ai/deep_crawling/dfs_strategy.py b/crawl4ai/deep_crawling/dfs_strategy.py index 3e4987f25..61cd8abe2 100644 --- a/crawl4ai/deep_crawling/dfs_strategy.py +++ b/crawl4ai/deep_crawling/dfs_strategy.py @@ -75,7 +75,7 @@ async def _arun_batch( # Clone config to disable recursive deep crawling. batch_config = config.clone(deep_crawl_strategy=None, stream=False) - url_results = await crawler.arun_many(urls=[url], config=batch_config) + url_results = await crawler.arun_many(urls=[url], config=batch_config, dispatcher=self.dispatcher) for result in url_results: result.metadata = result.metadata or {} @@ -183,7 +183,7 @@ async def _arun_stream( visited.add(url) stream_config = config.clone(deep_crawl_strategy=None, stream=True) - stream_gen = await crawler.arun_many(urls=[url], config=stream_config) + stream_gen = await crawler.arun_many(urls=[url], config=stream_config, dispatcher=self.dispatcher) async for result in stream_gen: result.metadata = result.metadata or {} result.metadata["depth"] = depth