diff --git a/Gemfile.lock b/Gemfile.lock index 82554da..9abe5e4 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -3,6 +3,7 @@ PATH specs: langfuse (0.1.0) base64 (~> 0.2) + concurrent-ruby (~> 1.2) faraday (~> 2.0) faraday-retry (~> 2.0) mustache (~> 1.1) @@ -19,6 +20,7 @@ GEM ast (2.4.3) base64 (0.3.0) bigdecimal (3.3.1) + concurrent-ruby (1.3.5) crack (1.0.0) bigdecimal rexml @@ -136,6 +138,7 @@ GEM PLATFORMS arm64-darwin-22 + arm64-darwin-24 arm64-darwin-25 x86_64-linux diff --git a/langfuse.gemspec b/langfuse.gemspec index e6aebd5..034ad2b 100644 --- a/langfuse.gemspec +++ b/langfuse.gemspec @@ -34,6 +34,9 @@ Gem::Specification.new do |spec| spec.add_dependency "faraday-retry", "~> 2.0" spec.add_dependency "mustache", "~> 1.1" + # Runtime dependencies - Concurrency (for SWR caching) + spec.add_dependency "concurrent-ruby", "~> 1.2" + # Runtime dependencies - OpenTelemetry (for tracing) spec.add_dependency "opentelemetry-api", "~> 1.2" spec.add_dependency "opentelemetry-common", "~> 0.21" diff --git a/lib/langfuse/api_client.rb b/lib/langfuse/api_client.rb index 2935319..4f299f4 100644 --- a/lib/langfuse/api_client.rb +++ b/lib/langfuse/api_client.rb @@ -6,6 +6,7 @@ require "json" module Langfuse + # rubocop:disable Metrics/ClassLength # HTTP client for Langfuse API # # Handles authentication, connection management, and HTTP requests @@ -108,26 +109,68 @@ def get_prompt(name, version: nil, label: nil) cache_key = PromptCache.build_key(name, version: version, label: label) - # Use distributed lock if cache supports it (Rails.cache backend) - if cache.respond_to?(:fetch_with_lock) - cache.fetch_with_lock(cache_key) do - fetch_prompt_from_api(name, version: version, label: label) - end - elsif cache - # In-memory cache - use simple get/set pattern - cached_data = cache.get(cache_key) - return cached_data if cached_data + fetch_with_appropriate_caching_strategy(cache_key, name, version, label) + end + + private - prompt_data = fetch_prompt_from_api(name, version: version, label: label) - cache.set(cache_key, prompt_data) - prompt_data + # Fetch prompt using the most appropriate caching strategy available + # + # @param cache_key [String] The cache key for this prompt + # @param name [String] The name of the prompt + # @param version [Integer, nil] Optional specific version number + # @param label [String, nil] Optional label + # @return [Hash] The prompt data + def fetch_with_appropriate_caching_strategy(cache_key, name, version, label) + if swr_cache_available? + fetch_with_swr_cache(cache_key, name, version, label) + elsif distributed_cache_available? + fetch_with_distributed_cache(cache_key, name, version, label) + elsif simple_cache_available? + fetch_with_simple_cache(cache_key, name, version, label) else - # No cache - fetch directly fetch_prompt_from_api(name, version: version, label: label) end end - private + # Check if SWR cache is available + def swr_cache_available? + cache&.respond_to?(:fetch_with_stale_while_revalidate) + end + + # Check if distributed cache is available + def distributed_cache_available? + cache&.respond_to?(:fetch_with_lock) + end + + # Check if simple cache is available + def simple_cache_available? + !cache.nil? + end + + # Fetch with SWR cache + def fetch_with_swr_cache(cache_key, name, version, label) + cache.fetch_with_stale_while_revalidate(cache_key) do + fetch_prompt_from_api(name, version: version, label: label) + end + end + + # Fetch with distributed cache (Rails.cache with stampede protection) + def fetch_with_distributed_cache(cache_key, name, version, label) + cache.fetch_with_lock(cache_key) do + fetch_prompt_from_api(name, version: version, label: label) + end + end + + # Fetch with simple cache (in-memory cache) + def fetch_with_simple_cache(cache_key, name, version, label) + cached_data = cache.get(cache_key) + return cached_data if cached_data + + prompt_data = fetch_prompt_from_api(name, version: version, label: label) + cache.set(cache_key, prompt_data) + prompt_data + end # Fetch a prompt from the API (without caching) # @@ -260,4 +303,5 @@ def extract_error_message(response) response.body["message"] || response.body["error"] || "Unknown error" end end + # rubocop:enable Metrics/ClassLength end diff --git a/lib/langfuse/client.rb b/lib/langfuse/client.rb index 6a06f55..9403349 100644 --- a/lib/langfuse/client.rb +++ b/lib/langfuse/client.rb @@ -156,15 +156,22 @@ def create_cache max_size: config.cache_max_size ) when :rails - RailsCacheAdapter.new( - ttl: config.cache_ttl, - lock_timeout: config.cache_lock_timeout - ) + create_rails_cache_adapter else raise ConfigurationError, "Unknown cache backend: #{config.cache_backend}" end end + def create_rails_cache_adapter + RailsCacheAdapter.new( + ttl: config.cache_ttl, + lock_timeout: config.cache_lock_timeout, + stale_ttl: config.cache_stale_while_revalidate ? config.cache_stale_ttl : nil, + refresh_threads: config.cache_refresh_threads, + logger: config.logger + ) + end + # Build the appropriate prompt client based on prompt type # # @param prompt_data [Hash] The prompt data from API diff --git a/lib/langfuse/config.rb b/lib/langfuse/config.rb index f406eb2..84986f5 100644 --- a/lib/langfuse/config.rb +++ b/lib/langfuse/config.rb @@ -46,6 +46,15 @@ class Config # @return [Integer] Lock timeout in seconds for distributed cache stampede protection attr_accessor :cache_lock_timeout + # @return [Boolean] Enable stale-while-revalidate caching + attr_accessor :cache_stale_while_revalidate + + # @return [Integer] Stale TTL in seconds (grace period for serving stale data) + attr_accessor :cache_stale_ttl + + # @return [Integer] Number of background threads for cache refresh + attr_accessor :cache_refresh_threads + # @return [Boolean] Use async processing for traces (requires ActiveJob) attr_accessor :tracing_async @@ -65,6 +74,9 @@ class Config DEFAULT_CACHE_MAX_SIZE = 1000 DEFAULT_CACHE_BACKEND = :memory DEFAULT_CACHE_LOCK_TIMEOUT = 10 + DEFAULT_CACHE_STALE_WHILE_REVALIDATE = false + DEFAULT_CACHE_STALE_TTL = 300 + DEFAULT_CACHE_REFRESH_THREADS = 5 DEFAULT_TRACING_ASYNC = true DEFAULT_BATCH_SIZE = 50 DEFAULT_FLUSH_INTERVAL = 10 @@ -83,6 +95,9 @@ def initialize @cache_max_size = DEFAULT_CACHE_MAX_SIZE @cache_backend = DEFAULT_CACHE_BACKEND @cache_lock_timeout = DEFAULT_CACHE_LOCK_TIMEOUT + @cache_stale_while_revalidate = DEFAULT_CACHE_STALE_WHILE_REVALIDATE + @cache_stale_ttl = DEFAULT_CACHE_STALE_TTL + @cache_refresh_threads = DEFAULT_CACHE_REFRESH_THREADS @tracing_async = DEFAULT_TRACING_ASYNC @batch_size = DEFAULT_BATCH_SIZE @flush_interval = DEFAULT_FLUSH_INTERVAL @@ -110,6 +125,8 @@ def validate! "cache_lock_timeout must be positive" end + validate_swr_config! + validate_cache_backend! end # rubocop:enable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity @@ -131,5 +148,20 @@ def validate_cache_backend! raise ConfigurationError, "cache_backend must be one of #{valid_backends.inspect}, got #{cache_backend.inspect}" end + + def validate_swr_config! + if cache_stale_ttl.nil? || cache_stale_ttl.negative? + raise ConfigurationError, "cache_stale_ttl must be non-negative" + end + + if cache_refresh_threads.nil? || cache_refresh_threads <= 0 + raise ConfigurationError, "cache_refresh_threads must be positive" + end + + return unless cache_stale_while_revalidate && cache_backend != :rails + + raise ConfigurationError, + "cache_stale_while_revalidate requires cache_backend to be :rails" + end end end diff --git a/lib/langfuse/rails_cache_adapter.rb b/lib/langfuse/rails_cache_adapter.rb index bf51d64..d457195 100644 --- a/lib/langfuse/rails_cache_adapter.rb +++ b/lib/langfuse/rails_cache_adapter.rb @@ -1,6 +1,11 @@ # frozen_string_literal: true +require "concurrent" +require "json" +require "logger" + module Langfuse + # rubocop:disable Metrics/ClassLength # Rails.cache adapter for distributed caching with Redis # # Wraps Rails.cache to provide distributed caching for prompts across @@ -12,20 +17,26 @@ module Langfuse # adapter.get("greeting:1") # => prompt_data # class RailsCacheAdapter - attr_reader :ttl, :namespace, :lock_timeout + attr_reader :ttl, :namespace, :lock_timeout, :stale_ttl, :thread_pool, :logger # Initialize a new Rails.cache adapter # # @param ttl [Integer] Time-to-live in seconds (default: 60) # @param namespace [String] Cache key namespace (default: "langfuse") # @param lock_timeout [Integer] Lock timeout in seconds for stampede protection (default: 10) + # @param stale_ttl [Integer, nil] Stale TTL for SWR (default: nil, disabled) + # @param refresh_threads [Integer] Number of background refresh threads (default: 5) + # @param logger [Logger, nil] Logger instance for error reporting (default: nil, creates new logger) # @raise [ConfigurationError] if Rails.cache is not available - def initialize(ttl: 60, namespace: "langfuse", lock_timeout: 10) + def initialize(ttl: 60, namespace: "langfuse", lock_timeout: 10, stale_ttl: nil, refresh_threads: 5, logger: nil) validate_rails_cache! @ttl = ttl @namespace = namespace @lock_timeout = lock_timeout + @stale_ttl = stale_ttl + @logger = logger || default_logger + @thread_pool = initialize_thread_pool(refresh_threads) if stale_ttl end # Get a value from the cache @@ -46,6 +57,45 @@ def set(key, value) value end + # Fetch a value from cache with Stale-While-Revalidate support + # + # This method implements SWR caching: serves stale data immediately while + # refreshing in the background. Falls back to fetch_with_lock if SWR is disabled. + # + # Three cache states: + # - FRESH: Return immediately, no action needed + # - REVALIDATE: Return stale data + trigger background refresh + # - STALE: Must fetch fresh data synchronously + # + # @param key [String] Cache key + # @yield Block to execute to fetch fresh data + # @return [Object] Cached, stale, or freshly fetched value + # + # @example + # adapter.fetch_with_stale_while_revalidate("greeting:v1") do + # api_client.get_prompt("greeting") + # end + def fetch_with_stale_while_revalidate(key, &) + return fetch_with_lock(key, &) unless stale_ttl + + entry = get_entry_with_metadata(key) + + if entry && entry["fresh_until"] > Time.now + # FRESH - return immediately + logger.debug("CACHE HIT!") + entry["data"] + elsif entry && entry["stale_until"] > Time.now + # REVALIDATE - return stale + refresh in background + logger.debug("CACHE STALE!") + schedule_refresh(key, &) + entry["data"] # Instant response! ✨ + else + # MISS - must fetch synchronously + logger.debug("CACHE MISS!") + fetch_and_cache_with_metadata(key, &) + end + end + # Fetch a value from cache with distributed lock for stampede protection # # This method prevents cache stampedes (thundering herd) by ensuring only one @@ -133,8 +183,118 @@ def self.build_key(name, version: nil, label: nil) PromptCache.build_key(name, version: version, label: label) end + # Shutdown the thread pool gracefully + # + # @return [void] + def shutdown + return unless thread_pool + + thread_pool.shutdown + thread_pool.wait_for_termination(5) # Wait up to 5 seconds + end + private + # Initialize thread pool for background refresh operations + # + # @param refresh_threads [Integer] Maximum number of refresh threads + # @return [Concurrent::CachedThreadPool] + def initialize_thread_pool(refresh_threads) + Concurrent::CachedThreadPool.new( + max_threads: refresh_threads, + min_threads: 2, + max_queue: 50, + fallback_policy: :discard # Drop oldest if queue full + ) + end + + # Schedule a background refresh for a cache key + # + # Prevents duplicate refreshes by using a refresh lock. If another process + # is already refreshing this key, this method returns immediately. + # + # Errors during refresh are caught and logged to prevent thread crashes. + # + # @param key [String] Cache key + # @yield Block to execute to fetch fresh data + # @return [void] + def schedule_refresh(key) + # Prevent duplicate refreshes + refresh_lock_key = "#{namespaced_key(key)}:refreshing" + return unless acquire_refresh_lock(refresh_lock_key) + + thread_pool.post do + value = yield + set_with_metadata(key, value) + rescue StandardError => e + logger.error("Langfuse cache refresh failed for key '#{key}': #{e.class} - #{e.message}") + ensure + release_lock(refresh_lock_key) + end + end + + # Fetch data and cache it with SWR metadata + # + # @param key [String] Cache key + # @yield Block to execute to fetch fresh data + # @return [Object] Freshly fetched value + def fetch_and_cache_with_metadata(key) + value = yield + set_with_metadata(key, value) + value + end + + # Get cache entry with SWR metadata (timestamps) + # + # @param key [String] Cache key + # @return [Hash, nil] Entry with "data", "fresh_until", "stale_until" keys, or nil + def get_entry_with_metadata(key) + raw = Rails.cache.read("#{namespaced_key(key)}:metadata") + return nil unless raw + + parsed = JSON.parse(raw) + # Convert timestamp strings back to Time objects + parsed["fresh_until"] = Time.parse(parsed["fresh_until"]) if parsed["fresh_until"].is_a?(String) + parsed["stale_until"] = Time.parse(parsed["stale_until"]) if parsed["stale_until"].is_a?(String) + parsed + rescue JSON::ParserError, ArgumentError + nil + end + + # Set value in cache with SWR metadata + # + # @param key [String] Cache key + # @param value [Object] Value to cache + # @return [Object] The cached value + def set_with_metadata(key, value) + now = Time.now + entry = { + data: value, + fresh_until: now + ttl, + stale_until: now + ttl + stale_ttl + } + + # Store both data and metadata + total_ttl = ttl + stale_ttl + Rails.cache.write(namespaced_key(key), value, expires_in: total_ttl) + Rails.cache.write("#{namespaced_key(key)}:metadata", entry.to_json, expires_in: total_ttl) + + value + end + + # Acquire a refresh lock to prevent duplicate background refreshes + # + # @param lock_key [String] Full lock key (already namespaced) + # @return [Boolean] true if lock was acquired, false if already held + def acquire_refresh_lock(lock_key) + Rails.cache.write( + lock_key, + true, + unless_exist: true, # Atomic: only write if key doesn't exist + expires_in: 60 # Short-lived lock for background refreshes + ) + end + # Add namespace prefix to cache key # # @param key [String] Original cache key @@ -196,5 +356,17 @@ def validate_rails_cache! raise ConfigurationError, "Rails.cache is not available. Rails cache backend requires Rails with a configured cache store." end + + # Create a default logger + # + # @return [Logger] + def default_logger + if defined?(Rails) && Rails.respond_to?(:logger) && Rails.logger + Rails.logger + else + Logger.new($stdout, level: Logger::WARN) + end + end end + # rubocop:enable Metrics/ClassLength end diff --git a/spec/langfuse/api_client_swr_spec.rb b/spec/langfuse/api_client_swr_spec.rb new file mode 100644 index 0000000..0cc8e01 --- /dev/null +++ b/spec/langfuse/api_client_swr_spec.rb @@ -0,0 +1,354 @@ +# frozen_string_literal: true + +require "spec_helper" + +RSpec.describe Langfuse::ApiClient do + let(:public_key) { "pk_test" } + let(:secret_key) { "sk_test" } + let(:base_url) { "https://api.langfuse.com" } + let(:logger) { Logger.new($stdout, level: Logger::WARN) } + + let(:prompt_data) do + { + "id" => "prompt123", + "name" => "greeting", + "version" => 1, + "type" => "text", + "prompt" => "Hello {{name}}!", + "labels" => ["production"], + "tags" => ["customer-facing"], + "config" => {} + } + end + + describe "SWR caching integration" do + context "with SWR-enabled cache" do + it "uses SWR fetch method when available" do + swr_cache = instance_double(Langfuse::RailsCacheAdapter) + cache_key = "greeting:version:1" + + api_client = described_class.new( + public_key: public_key, + secret_key: secret_key, + base_url: base_url, + logger: logger, + cache: swr_cache + ) + + # Mock SWR cache methods + allow(swr_cache).to receive(:respond_to?) + .with(:fetch_with_stale_while_revalidate) + .and_return(true) + + expect(Langfuse::PromptCache).to receive(:build_key) + .with("greeting", version: 1, label: nil) + .and_return(cache_key) + + expect(swr_cache).to receive(:fetch_with_stale_while_revalidate) + .with(cache_key) + .and_yield + .and_return(prompt_data) + + # Mock the API call that would happen in the block + expect(api_client).to receive(:fetch_prompt_from_api) + .with("greeting", version: 1, label: nil) + .and_return(prompt_data) + + result = api_client.get_prompt("greeting", version: 1) + expect(result).to eq(prompt_data) + end + + it "handles cache miss with SWR" do + swr_cache = instance_double(Langfuse::RailsCacheAdapter) + + api_client = described_class.new( + public_key: public_key, + secret_key: secret_key, + base_url: base_url, + logger: logger, + cache: swr_cache + ) + + # Mock SWR cache methods + allow(swr_cache).to receive(:respond_to?) + .with(:fetch_with_stale_while_revalidate) + .and_return(true) + + expect(Langfuse::PromptCache).to receive(:build_key) + .with("greeting", version: nil, label: nil) + .and_return("greeting:latest") + + expect(swr_cache).to receive(:fetch_with_stale_while_revalidate) + .with("greeting:latest") + .and_yield + .and_return(prompt_data) + + # Mock the actual API call + connection = instance_double(Faraday::Connection) + response = instance_double(Faraday::Response, status: 200, body: prompt_data.to_json) + + allow(api_client).to receive(:connection).and_return(connection) + allow(connection).to receive(:get).and_return(response) + allow(api_client).to receive(:handle_response).with(response).and_return(prompt_data) + + result = api_client.get_prompt("greeting") + expect(result).to eq(prompt_data) + end + + it "passes through all prompt parameters to cache key building" do + swr_cache = instance_double(Langfuse::RailsCacheAdapter) + + api_client = described_class.new( + public_key: public_key, + secret_key: secret_key, + base_url: base_url, + logger: logger, + cache: swr_cache + ) + + # Mock SWR cache methods + allow(swr_cache).to receive(:respond_to?) + .with(:fetch_with_stale_while_revalidate) + .and_return(true) + + expect(Langfuse::PromptCache).to receive(:build_key) + .with("support-bot", version: nil, label: "staging") + .and_return("support-bot:label:staging") + + expect(swr_cache).to receive(:fetch_with_stale_while_revalidate) + .with("support-bot:label:staging") + .and_return(prompt_data) + + api_client.get_prompt("support-bot", label: "staging") + end + end + + context "with stampede protection cache (no SWR)" do + it "falls back to stampede protection when SWR not available" do + stampede_cache = instance_double(Langfuse::RailsCacheAdapter) + cache_key = "greeting:version:1" + + api_client = described_class.new( + public_key: public_key, + secret_key: secret_key, + base_url: base_url, + logger: logger, + cache: stampede_cache + ) + + allow(stampede_cache).to receive(:respond_to?) + .with(:fetch_with_stale_while_revalidate) + .and_return(false) + allow(stampede_cache).to receive(:respond_to?) + .with(:fetch_with_lock) + .and_return(true) + + expect(Langfuse::PromptCache).to receive(:build_key) + .with("greeting", version: 1, label: nil) + .and_return(cache_key) + + expect(stampede_cache).to receive(:fetch_with_lock) + .with(cache_key) + .and_yield + .and_return(prompt_data) + + expect(api_client).to receive(:fetch_prompt_from_api) + .with("greeting", version: 1, label: nil) + .and_return(prompt_data) + + result = api_client.get_prompt("greeting", version: 1) + expect(result).to eq(prompt_data) + end + end + + context "with simple cache (no SWR, no stampede protection)" do + it "uses simple get/set pattern when advanced caching not available" do + simple_cache = instance_double(Langfuse::PromptCache) + + api_client = described_class.new( + public_key: public_key, + secret_key: secret_key, + base_url: base_url, + logger: logger, + cache: simple_cache + ) + + allow(simple_cache).to receive(:respond_to?) + .with(:fetch_with_stale_while_revalidate) + .and_return(false) + allow(simple_cache).to receive(:respond_to?) + .with(:fetch_with_lock) + .and_return(false) + + expect(Langfuse::PromptCache).to receive(:build_key) + .with("greeting", version: nil, label: nil) + .and_return("greeting:latest") + + # First check cache (miss) + expect(simple_cache).to receive(:get) + .with("greeting:latest") + .and_return(nil) + + # Fetch from API + expect(api_client).to receive(:fetch_prompt_from_api) + .with("greeting", version: nil, label: nil) + .and_return(prompt_data) + + # Set in cache + expect(simple_cache).to receive(:set) + .with("greeting:latest", prompt_data) + + result = api_client.get_prompt("greeting") + expect(result).to eq(prompt_data) + end + + it "returns cached data when available" do + simple_cache = instance_double(Langfuse::PromptCache) + + api_client = described_class.new( + public_key: public_key, + secret_key: secret_key, + base_url: base_url, + logger: logger, + cache: simple_cache + ) + + allow(simple_cache).to receive(:respond_to?) + .with(:fetch_with_stale_while_revalidate) + .and_return(false) + allow(simple_cache).to receive(:respond_to?) + .with(:fetch_with_lock) + .and_return(false) + + expect(Langfuse::PromptCache).to receive(:build_key) + .with("greeting", version: nil, label: nil) + .and_return("greeting:latest") + + # Cache hit + expect(simple_cache).to receive(:get) + .with("greeting:latest") + .and_return(prompt_data) + + # Should not fetch from API or set cache + expect(api_client).not_to receive(:fetch_prompt_from_api) + expect(simple_cache).not_to receive(:set) + + result = api_client.get_prompt("greeting") + expect(result).to eq(prompt_data) + end + end + + context "with no cache" do + it "fetches directly from API without caching" do + api_client = described_class.new( + public_key: public_key, + secret_key: secret_key, + base_url: base_url, + logger: logger, + cache: nil + ) + + expect(api_client).to receive(:fetch_prompt_from_api) + .with("greeting", version: nil, label: nil) + .and_return(prompt_data) + + result = api_client.get_prompt("greeting") + expect(result).to eq(prompt_data) + end + end + end + + describe "cache method detection" do + context "when detecting SWR cache" do + it "correctly detects SWR capability" do + swr_cache = instance_double(Langfuse::RailsCacheAdapter) + + api_client = described_class.new( + public_key: public_key, + secret_key: secret_key, + base_url: base_url, + cache: swr_cache + ) + + allow(swr_cache).to receive(:respond_to?) + .with(:fetch_with_stale_while_revalidate) + .and_return(true) + + expect(swr_cache).to receive(:fetch_with_stale_while_revalidate) + allow(swr_cache).to receive(:fetch_with_stale_while_revalidate) + .and_return(prompt_data) + + api_client.get_prompt("test") + end + + it "falls back when SWR not available but stampede protection is" do + swr_cache = instance_double(Langfuse::RailsCacheAdapter) + + api_client = described_class.new( + public_key: public_key, + secret_key: secret_key, + base_url: base_url, + cache: swr_cache + ) + + allow(swr_cache).to receive(:respond_to?) + .with(:fetch_with_stale_while_revalidate) + .and_return(false) + allow(swr_cache).to receive(:respond_to?) + .with(:fetch_with_lock) + .and_return(true) + + expect(swr_cache).to receive(:fetch_with_lock) + allow(swr_cache).to receive(:fetch_with_lock) + .and_return(prompt_data) + + api_client.get_prompt("test") + end + end + + context "when handling nil cache" do + it "handles nil cache gracefully" do + api_client = described_class.new( + public_key: public_key, + secret_key: secret_key, + base_url: base_url, + cache: nil + ) + + expect(api_client).to receive(:fetch_prompt_from_api) + .and_return(prompt_data) + + result = api_client.get_prompt("test") + expect(result).to eq(prompt_data) + end + end + end + + describe "error handling with SWR" do + it "propagates API errors when SWR cache fails" do + swr_cache = instance_double(Langfuse::RailsCacheAdapter) + + api_client = described_class.new( + public_key: public_key, + secret_key: secret_key, + base_url: base_url, + logger: logger, + cache: swr_cache + ) + + allow(swr_cache).to receive(:respond_to?) + .with(:fetch_with_stale_while_revalidate) + .and_return(true) + + allow(swr_cache).to receive(:fetch_with_stale_while_revalidate) + .and_yield + + expect(api_client).to receive(:fetch_prompt_from_api) + .and_raise(Langfuse::NotFoundError, "Prompt not found") + + expect do + api_client.get_prompt("nonexistent") + end.to raise_error(Langfuse::NotFoundError, "Prompt not found") + end + end +end diff --git a/spec/langfuse/client_spec.rb b/spec/langfuse/client_spec.rb index 48fda13..0851d00 100644 --- a/spec/langfuse/client_spec.rb +++ b/spec/langfuse/client_spec.rb @@ -121,6 +121,36 @@ class << self described_class.new(config_with_rails_cache) end.not_to raise_error end + + it "passes logger from config to RailsCacheAdapter" do + custom_logger = Logger.new($stdout) + config_with_rails_cache.logger = custom_logger + client = described_class.new(config_with_rails_cache) + expect(client.api_client.cache.logger).to eq(custom_logger) + end + + it "configures RailsCacheAdapter with stale-while-revalidate settings" do + config_with_rails_cache.cache_stale_while_revalidate = true + config_with_rails_cache.cache_stale_ttl = 300 + config_with_rails_cache.cache_refresh_threads = 3 + config_with_rails_cache.cache_lock_timeout = 15 + + client = described_class.new(config_with_rails_cache) + adapter = client.api_client.cache + + expect(adapter.stale_ttl).to eq(300) + expect(adapter.lock_timeout).to eq(15) + expect(adapter.thread_pool).to be_a(Concurrent::CachedThreadPool) + end + + it "configures RailsCacheAdapter without SWR when disabled" do + config_with_rails_cache.cache_stale_while_revalidate = false + client = described_class.new(config_with_rails_cache) + adapter = client.api_client.cache + + expect(adapter.stale_ttl).to be_nil + expect(adapter.thread_pool).to be_nil + end end context "with invalid cache backend" do diff --git a/spec/langfuse/config_swr_spec.rb b/spec/langfuse/config_swr_spec.rb new file mode 100644 index 0000000..77d7ae8 --- /dev/null +++ b/spec/langfuse/config_swr_spec.rb @@ -0,0 +1,173 @@ +# frozen_string_literal: true + +require "spec_helper" + +RSpec.describe Langfuse::Config do + let(:config) { described_class.new } + + describe "SWR configuration options" do + describe "default values" do + it "sets cache_stale_while_revalidate to false by default" do + expect(config.cache_stale_while_revalidate).to be false + end + + it "sets cache_stale_ttl to 300 seconds by default" do + expect(config.cache_stale_ttl).to eq(300) + end + + it "sets cache_refresh_threads to 5 by default" do + expect(config.cache_refresh_threads).to eq(5) + end + end + + describe "configuration block" do + it "allows setting SWR options" do + config = described_class.new do |c| + c.cache_stale_while_revalidate = true + c.cache_stale_ttl = 600 + c.cache_refresh_threads = 10 + end + + expect(config.cache_stale_while_revalidate).to be true + expect(config.cache_stale_ttl).to eq(600) + expect(config.cache_refresh_threads).to eq(10) + end + end + + describe "validation" do + before do + config.public_key = "pk_test" + config.secret_key = "sk_test" + end + + context "when validating cache_stale_ttl" do + it "accepts positive values" do + config.cache_stale_ttl = 300 + expect { config.validate! }.not_to raise_error + end + + it "accepts zero" do + config.cache_stale_ttl = 0 + expect { config.validate! }.not_to raise_error + end + + it "rejects negative values" do + config.cache_stale_ttl = -1 + expect { config.validate! }.to raise_error( + Langfuse::ConfigurationError, + "cache_stale_ttl must be non-negative" + ) + end + + it "rejects nil values" do + config.cache_stale_ttl = nil + expect { config.validate! }.to raise_error( + Langfuse::ConfigurationError, + "cache_stale_ttl must be non-negative" + ) + end + end + + context "when validating cache_refresh_threads" do + it "accepts positive values" do + config.cache_refresh_threads = 5 + expect { config.validate! }.not_to raise_error + end + + it "rejects zero" do + config.cache_refresh_threads = 0 + expect { config.validate! }.to raise_error( + Langfuse::ConfigurationError, + "cache_refresh_threads must be positive" + ) + end + + it "rejects negative values" do + config.cache_refresh_threads = -1 + expect { config.validate! }.to raise_error( + Langfuse::ConfigurationError, + "cache_refresh_threads must be positive" + ) + end + + it "rejects nil values" do + config.cache_refresh_threads = nil + expect { config.validate! }.to raise_error( + Langfuse::ConfigurationError, + "cache_refresh_threads must be positive" + ) + end + end + + context "when validating SWR with cache backend" do + it "allows SWR with Rails cache backend" do + config.cache_backend = :rails + config.cache_stale_while_revalidate = true + expect { config.validate! }.not_to raise_error + end + + it "allows SWR disabled with any cache backend" do + config.cache_backend = :memory + config.cache_stale_while_revalidate = false + expect { config.validate! }.not_to raise_error + end + + it "rejects SWR with memory cache backend" do + config.cache_backend = :memory + config.cache_stale_while_revalidate = true + expect { config.validate! }.to raise_error( + Langfuse::ConfigurationError, + "cache_stale_while_revalidate requires cache_backend to be :rails" + ) + end + end + end + + describe "constants" do + it "defines correct default values" do + expect(Langfuse::Config::DEFAULT_CACHE_STALE_WHILE_REVALIDATE).to be false + expect(Langfuse::Config::DEFAULT_CACHE_STALE_TTL).to eq(300) + expect(Langfuse::Config::DEFAULT_CACHE_REFRESH_THREADS).to eq(5) + end + end + end + + describe "SWR integration with existing config" do + it "works with all configuration options together" do + config = described_class.new do |c| + c.public_key = "pk_test" + c.secret_key = "sk_test" + c.base_url = "https://test.langfuse.com" + c.timeout = 10 + c.cache_ttl = 120 + c.cache_backend = :rails + c.cache_stale_while_revalidate = true + c.cache_stale_ttl = 240 + c.cache_refresh_threads = 8 + end + + expect { config.validate! }.not_to raise_error + + expect(config.cache_ttl).to eq(120) + expect(config.cache_stale_while_revalidate).to be true + expect(config.cache_stale_ttl).to eq(240) + expect(config.cache_refresh_threads).to eq(8) + end + + it "maintains backward compatibility when SWR is disabled" do + config = described_class.new do |c| + c.public_key = "pk_test" + c.secret_key = "sk_test" + c.cache_ttl = 60 + c.cache_backend = :rails + # SWR options not set - should use defaults + end + + expect { config.validate! }.not_to raise_error + + expect(config.cache_stale_while_revalidate).to be false + expect(config.cache_stale_ttl).to eq(300) # Default + expect(config.cache_refresh_threads).to eq(5) # Default + end + end +end diff --git a/spec/langfuse/rails_cache_adapter_spec.rb b/spec/langfuse/rails_cache_adapter_spec.rb index 0be2716..c1d472d 100644 --- a/spec/langfuse/rails_cache_adapter_spec.rb +++ b/spec/langfuse/rails_cache_adapter_spec.rb @@ -39,6 +39,90 @@ class << self adapter = described_class.new(namespace: "my_app") expect(adapter.namespace).to eq("my_app") end + + context "when stale TTL is provided (SWR enabled)" do + it "creates an adapter with custom stale TTL" do + adapter = described_class.new(stale_ttl: 300) + expect(adapter.stale_ttl).to eq(300) + end + + it "initializes thread pool with default refresh_threads (5)" do + expect(Concurrent::CachedThreadPool).to receive(:new) + .with( + max_threads: 5, + min_threads: 2, + max_queue: 50, + fallback_policy: :discard + ).and_call_original + + adapter = described_class.new(stale_ttl: 300) + + # Verify thread pool is initialized and can accept work + expect(adapter.thread_pool).to be_a(Concurrent::CachedThreadPool) + expect(adapter.thread_pool.running?).to be true + expect(adapter.thread_pool.shuttingdown?).to be false + end + + it "initializes thread pool with custom refresh_threads parameter" do + expect(Concurrent::CachedThreadPool).to receive(:new) + .with( + max_threads: 3, + min_threads: 2, + max_queue: 50, + fallback_policy: :discard + ).and_call_original + + adapter = described_class.new(stale_ttl: 300, refresh_threads: 3) + + # Verify thread pool is initialized and can accept work + expect(adapter.thread_pool).to be_a(Concurrent::CachedThreadPool) + expect(adapter.thread_pool.running?).to be true + expect(adapter.thread_pool.shuttingdown?).to be false + end + end + + context "when stale TTL is not provided (SWR disabled)" do + it "does not initialize thread pool when stale_ttl is not provided" do + adapter = described_class.new(ttl: 60) + expect(adapter.thread_pool).to be_nil + end + + it "ignores refresh_threads when stale_ttl is not provided" do + # refresh_threads should have no effect without stale_ttl + adapter = described_class.new(ttl: 60, refresh_threads: 20) + expect(adapter.thread_pool).to be_nil + end + end + + context "with logger parameter" do + it "uses provided logger" do + custom_logger = Logger.new($stdout) + adapter = described_class.new(logger: custom_logger) + expect(adapter.logger).to eq(custom_logger) + end + + it "creates default stdout logger when no logger provided and Rails.logger not available" do + allow(Rails).to receive(:respond_to?).and_return(true) + allow(Rails).to receive(:respond_to?).with(:logger).and_return(false) + adapter = described_class.new + expect(adapter.logger).to be_a(Logger) + end + + it "uses Rails.logger as default when Rails is available" do + rails_logger = Logger.new($stdout) + allow(Rails).to receive(:respond_to?).and_return(true) + allow(Rails).to receive(:logger).and_return(rails_logger) + adapter = described_class.new + expect(adapter.logger).to eq(rails_logger) + end + + it "creates stdout logger when Rails.logger returns nil" do + allow(Rails).to receive(:respond_to?).and_return(true) + allow(Rails).to receive(:logger).and_return(nil) + adapter = described_class.new + expect(adapter.logger).to be_a(Logger) + end + end end context "when Rails.cache is not available" do @@ -428,4 +512,466 @@ class << self expect(rails_cache).to respond_to(:fetch_with_lock) end end + + describe "#fetch_with_stale_while_revalidate" do + let(:ttl) { 60 } + let(:stale_ttl) { 120 } + let(:refresh_threads) { 2 } + let(:adapter_with_swr) { described_class.new(ttl:, stale_ttl:, refresh_threads:) } + let(:adapter_without_swr) { described_class.new(ttl:) } + + before do + allow(mock_cache).to receive_messages(read: nil, write: true, delete: true, delete_matched: true) + end + + context "when SWR is disabled" do + it "falls back to fetch_with_lock" do + cache_key = "test_key" + new_data = "new_value" + expect(adapter_without_swr).to receive(:fetch_with_lock).with(cache_key) + adapter_without_swr.fetch_with_stale_while_revalidate(cache_key) { new_data } + end + end + + context "with fresh cache entry" do + it "returns cached data immediately" do + cache_key = "test_key" + fresh_data = "fresh_value" + new_data = "new_value" + + fresh_entry = { + "data" => fresh_data, + "fresh_until" => Time.now + 30, + "stale_until" => Time.now + 150 + } + + allow(adapter_with_swr).to receive(:get_entry_with_metadata) + .with(cache_key) + .and_return(fresh_entry) + + result = adapter_with_swr.fetch_with_stale_while_revalidate(cache_key) { new_data } + expect(result).to eq(fresh_data) + end + + it "does not trigger background refresh" do + cache_key = "test_key" + fresh_data = "fresh_value" + new_data = "new_value" + + fresh_entry = { + "data" => fresh_data, + "fresh_until" => Time.now + 30, + "stale_until" => Time.now + 150 + } + + allow(adapter_with_swr).to receive(:get_entry_with_metadata) + .with(cache_key) + .and_return(fresh_entry) + + expect(adapter_with_swr).not_to receive(:schedule_refresh) + adapter_with_swr.fetch_with_stale_while_revalidate(cache_key) { new_data } + end + end + + context "with stale entry (revalidate state)" do + it "returns stale data immediately" do + cache_key = "test_key" + stale_data = "stale_value" + new_data = "new_value" + stale_entry = { + "data" => stale_data, + "fresh_until" => Time.now - 30, # Expired + "stale_until" => Time.now + 90 # Still within grace period + } + + allow(adapter_with_swr).to receive(:get_entry_with_metadata) + .with(cache_key) + .and_return(stale_entry) + allow(adapter_with_swr).to receive(:schedule_refresh) + + result = adapter_with_swr.fetch_with_stale_while_revalidate(cache_key) { new_data } + + expect(result).to eq(stale_data) + end + + it "schedules background refresh" do + cache_key = "test_key" + stale_data = "stale_value" + new_data = "new_value" + stale_entry = { + "data" => stale_data, + "fresh_until" => Time.now - 30, # Expired + "stale_until" => Time.now + 90 # Still within grace period + } + + allow(adapter_with_swr).to receive(:get_entry_with_metadata) + .with(cache_key) + .and_return(stale_entry) + + expect(adapter_with_swr).to receive(:schedule_refresh).with(cache_key) + + adapter_with_swr.fetch_with_stale_while_revalidate(cache_key) { new_data } + end + end + + context "with expired entry (past stale period)" do + it "fetches fresh data synchronously" do + cache_key = "test_key" + stale_data = "stale_value" + new_data = "new_value" + expired_entry = { + "data" => stale_data, + "fresh_until" => Time.now - 150, # Expired + "stale_until" => Time.now - 30 # Past grace period + } + + allow(adapter_with_swr).to receive(:get_entry_with_metadata) + .with(cache_key) + .and_return(expired_entry) + + expect(adapter_with_swr).to receive(:fetch_and_cache_with_metadata) + .with(cache_key) + .and_return(new_data) + + result = adapter_with_swr.fetch_with_stale_while_revalidate(cache_key) { new_data } + expect(result).to eq(new_data) + end + + it "does not schedule background refresh" do + cache_key = "test_key" + stale_data = "stale_value" + new_data = "new_value" + + expired_entry = { + "data" => stale_data, + "fresh_until" => Time.now - 150, # Expired + "stale_until" => Time.now - 30 # Past grace period + } + + allow(adapter_with_swr).to receive(:get_entry_with_metadata) + .with(cache_key) + .and_return(expired_entry) + allow(adapter_with_swr).to receive(:fetch_and_cache_with_metadata) + .with(cache_key) + .and_return(new_data) + + expect(adapter_with_swr).not_to receive(:schedule_refresh) + + adapter_with_swr.fetch_with_stale_while_revalidate(cache_key) { new_data } + end + end + + context "with cache miss" do + it "fetches fresh data synchronously" do + cache_key = "test_key" + new_data = "new_value" + + allow(adapter_with_swr).to receive(:get_entry_with_metadata) + .with(cache_key) + .and_return(nil) + + expect(adapter_with_swr).to receive(:fetch_and_cache_with_metadata) + .with(cache_key) + .and_return(new_data) + + expect(adapter_with_swr).not_to receive(:schedule_refresh) + + result = adapter_with_swr.fetch_with_stale_while_revalidate(cache_key) { new_data } + + expect(result).to eq(new_data) + end + end + end + + describe "#schedule_refresh" do + let(:ttl) { 60 } + let(:stale_ttl) { 120 } + let(:refresh_threads) { 2 } + let(:adapter_with_swr) { described_class.new(ttl: ttl, stale_ttl: stale_ttl, refresh_threads: refresh_threads) } + + before do + allow(mock_cache).to receive_messages(read: nil, write: true, delete: true, delete_matched: true) + end + + context "when refresh lock is acquired" do + it "schedules refresh in thread pool" do + cache_key = "test_key" + refresh_lock_key = "langfuse:#{cache_key}:refreshing" + + allow(adapter_with_swr).to receive(:acquire_refresh_lock) + .with(refresh_lock_key) + .and_return(true) + allow(adapter_with_swr).to receive(:set_with_metadata) + allow(adapter_with_swr).to receive(:release_lock) + # Mock thread pool to execute immediately for testing + allow(adapter_with_swr.thread_pool).to receive(:post).and_yield + + expect(adapter_with_swr).to receive(:set_with_metadata) + .with(cache_key, "refreshed_value") + + adapter_with_swr.send(:schedule_refresh, cache_key) { "refreshed_value" } + end + + it "releases the refresh lock after completion" do + cache_key = "test_key" + refresh_lock_key = "langfuse:#{cache_key}:refreshing" + + allow(adapter_with_swr).to receive(:acquire_refresh_lock) + .with(refresh_lock_key) + .and_return(true) + allow(adapter_with_swr).to receive(:set_with_metadata) + allow(adapter_with_swr.thread_pool).to receive(:post).and_yield + + expect(adapter_with_swr).to receive(:release_lock) + .with(refresh_lock_key) + + adapter_with_swr.send(:schedule_refresh, cache_key) { "refreshed_value" } + end + + it "logs error and releases lock when refresh block raises error" do + cache_key = "test_key" + refresh_lock_key = "langfuse:#{cache_key}:refreshing" + mock_logger = instance_double(Logger) + + adapter_with_logger = described_class.new( + ttl: ttl, + stale_ttl: stale_ttl, + refresh_threads: refresh_threads, + logger: mock_logger + ) + + allow(adapter_with_logger).to receive(:acquire_refresh_lock) + .with(refresh_lock_key) + .and_return(true) + allow(adapter_with_logger.thread_pool).to receive(:post).and_yield + + expect(mock_logger).to receive(:error) + .with(/Langfuse cache refresh failed for key 'test_key': RuntimeError - test error/) + + expect(adapter_with_logger).to receive(:release_lock) + .with(refresh_lock_key) + + # Error should be caught and logged, not raised + expect do + adapter_with_logger.send(:schedule_refresh, cache_key) { raise "test error" } + end.not_to raise_error + end + + it "logs error with correct exception class and message" do + cache_key = "greeting:1" + refresh_lock_key = "langfuse:#{cache_key}:refreshing" + mock_logger = instance_double(Logger) + + adapter_with_logger = described_class.new( + ttl: ttl, + stale_ttl: stale_ttl, + refresh_threads: refresh_threads, + logger: mock_logger + ) + + allow(adapter_with_logger).to receive(:acquire_refresh_lock) + .with(refresh_lock_key) + .and_return(true) + allow(adapter_with_logger.thread_pool).to receive(:post).and_yield + + expect(mock_logger).to receive(:error) + .with("Langfuse cache refresh failed for key 'greeting:1': ArgumentError - Invalid prompt data") + + expect(adapter_with_logger).to receive(:release_lock) + .with(refresh_lock_key) + + # Custom exception type + adapter_with_logger.send(:schedule_refresh, cache_key) do + raise ArgumentError, "Invalid prompt data" + end + end + end + + context "when refresh lock is not acquired" do + it "does not schedule refresh" do + cache_key = "test_key" + refresh_lock_key = "langfuse:#{cache_key}:refreshing" + + allow(adapter_with_swr).to receive(:acquire_refresh_lock) + .with(refresh_lock_key) + .and_return(false) + + expect(adapter_with_swr.thread_pool).not_to receive(:post) + adapter_with_swr.send(:schedule_refresh, cache_key) { "refreshed_value" } + end + end + end + + describe "#get_entry_with_metadata" do + let(:ttl) { 60 } + let(:stale_ttl) { 120 } + let(:refresh_threads) { 2 } + let(:adapter_with_swr) { described_class.new(ttl: ttl, stale_ttl: stale_ttl, refresh_threads: refresh_threads) } + + context "when metadata exists" do + it "returns parsed metadata with symbolized keys" do + cache_key = "test_key" + namespaced_metadata_key = "langfuse:#{cache_key}:metadata" + fresh_until_time = Time.now + 30 + stale_until_time = Time.now + 150 + + metadata_json = { + "data" => "test_value", + "fresh_until" => fresh_until_time.to_s, + "stale_until" => stale_until_time.to_s + }.to_json + + allow(mock_cache).to receive(:read) + .with(namespaced_metadata_key) + .and_return(metadata_json) + + result = adapter_with_swr.send(:get_entry_with_metadata, cache_key) + + expect(result).to be_a(Hash) + expect(result["data"]).to eq("test_value") + expect(result["fresh_until"]).to be_a(Time) + expect(result["stale_until"]).to be_a(Time) + end + end + + context "when metadata does not exist" do + it "returns nil" do + cache_key = "test_key" + namespaced_metadata_key = "langfuse:#{cache_key}:metadata" + + allow(mock_cache).to receive(:read) + .with(namespaced_metadata_key) + .and_return(nil) + + result = adapter_with_swr.send(:get_entry_with_metadata, cache_key) + expect(result).to be_nil + end + end + + context "when metadata is invalid JSON" do + it "returns nil" do + cache_key = "test_key" + namespaced_metadata_key = "langfuse:#{cache_key}:metadata" + + allow(mock_cache).to receive(:read) + .with(namespaced_metadata_key) + .and_return("invalid json") + + result = adapter_with_swr.send(:get_entry_with_metadata, cache_key) + expect(result).to be_nil + end + end + end + + describe "#set_with_metadata" do + let(:ttl) { 60 } + let(:stale_ttl) { 120 } + let(:refresh_threads) { 2 } + let(:adapter_with_swr) { described_class.new(ttl: ttl, stale_ttl: stale_ttl, refresh_threads: refresh_threads) } + + it "stores both value and metadata with correct TTL" do + cache_key = "test_key" + value = "test_value" + namespaced_key = "langfuse:#{cache_key}" + namespaced_metadata_key = "langfuse:#{cache_key}:metadata" + total_ttl = ttl + stale_ttl + + freeze_time = Time.now + allow(Time).to receive(:now).and_return(freeze_time) + + expect(mock_cache).to receive(:write) + .with(namespaced_key, value, expires_in: total_ttl) + + expect(mock_cache).to receive(:write) + .with(namespaced_metadata_key, anything, expires_in: total_ttl) + + result = adapter_with_swr.send(:set_with_metadata, cache_key, value) + expect(result).to eq(value) + end + + it "stores metadata with correct timestamps" do + cache_key = "test_key" + value = "test_value" + namespaced_key = "langfuse:#{cache_key}" + namespaced_metadata_key = "langfuse:#{cache_key}:metadata" + total_ttl = ttl + stale_ttl + + freeze_time = Time.now + allow(Time).to receive(:now).and_return(freeze_time) + + expected_metadata = { + "data" => value, + "fresh_until" => freeze_time + ttl, + "stale_until" => freeze_time + ttl + stale_ttl + }.to_json + + # Expect both value and metadata to be written + expect(mock_cache).to receive(:write) + .with(namespaced_key, value, expires_in: total_ttl) + + expect(mock_cache).to receive(:write) + .with(namespaced_metadata_key, expected_metadata, expires_in: total_ttl) + + adapter_with_swr.send(:set_with_metadata, cache_key, value) + end + end + + describe "#acquire_refresh_lock" do + let(:ttl) { 60 } + let(:stale_ttl) { 120 } + let(:refresh_threads) { 2 } + let(:adapter_with_swr) { described_class.new(ttl: ttl, stale_ttl: stale_ttl, refresh_threads: refresh_threads) } + + context "when lock is available" do + it "acquires the lock and returns true" do + lock_key = "langfuse:test_key:refreshing" + + allow(mock_cache).to receive(:write) + .with(lock_key, true, unless_exist: true, expires_in: 60) + .and_return(true) + + result = adapter_with_swr.send(:acquire_refresh_lock, lock_key) + expect(result).to be true + end + end + + context "when lock is already held" do + it "fails to acquire lock and returns false" do + lock_key = "langfuse:test_key:refreshing" + + allow(mock_cache).to receive(:write) + .with(lock_key, true, unless_exist: true, expires_in: 60) + .and_return(false) + + result = adapter_with_swr.send(:acquire_refresh_lock, lock_key) + expect(result).to be false + end + end + end + + describe "#shutdown" do + let(:ttl) { 60 } + let(:stale_ttl) { 120 } + let(:refresh_threads) { 2 } + + before do + allow(mock_cache).to receive_messages(read: nil, write: true, delete: true, delete_matched: true) + end + + it "shuts down the thread pool gracefully" do + adapter = described_class.new(ttl: ttl, stale_ttl: stale_ttl, refresh_threads: refresh_threads) + thread_pool = adapter.thread_pool + expect(thread_pool).to receive(:shutdown).once + expect(thread_pool).to receive(:wait_for_termination).with(5).once + + adapter.shutdown + end + + context "when no thread pool exists" do + it "does not raise an error" do + adapter = described_class.new(ttl: ttl) + expect { adapter.shutdown }.not_to raise_error + end + end + end end