Skip to content

Implement Streaming Support for LLM Outputs #20

@obie

Description

@obie

Overview

Implement token-by-token streaming support for LLM outputs, enabling real-time display of generated content and better user experience for long-form generation.

Description

Streaming allows modules to yield tokens as they're generated rather than waiting for complete responses. This is crucial for interactive applications and long-form content generation.

Key Features to Implement

  • Streaming interface for modules
  • Token-by-token yield capability
  • Stream processing and transformation
  • Buffer management
  • Integration with various LLM providers
  • Graceful fallback for non-streaming models

Implementation Requirements

1. Streaming Module Interface

module Desiru
  module Streaming
    class StreamingModule < Module
      def forward_stream(**inputs, &block)
        validate_inputs(inputs)
        
        prompt = build_prompt(inputs)
        
        stream = @llm.generate_streaming(
          prompt: prompt,
          temperature: @temperature,
          max_tokens: @max_tokens
        ) do |token|
          yield token if block_given?
        end
        
        # Collect full response
        full_response = stream.complete
        parse_streaming_response(full_response, inputs)
      end
      
      # Compatibility method
      def forward(**inputs)
        buffer = StringIO.new
        
        result = forward_stream(**inputs) do |token|
          buffer.write(token)
        end
        
        result
      end
    end
    
    class TokenStream
      include Enumerable
      
      def initialize
        @tokens = []
        @complete = false
        @mutex = Mutex.new
      end
      
      def <<(token)
        @mutex.synchronize do
          @tokens << token
        end
      end
      
      def complete\!
        @complete = true
      end
      
      def complete?
        @complete
      end
      
      def to_s
        @tokens.join
      end
      
      def each(&block)
        @tokens.each(&block)
      end
    end
  end
end

2. Streaming Chain of Thought

module Desiru::Modules
  class StreamingChainOfThought < Streaming::StreamingModule
    def initialize(signature:, **options)
      super
      extend_signature_with_reasoning
    end
    
    def forward_stream(**inputs, &block)
      validate_inputs(inputs)
      
      prompt = build_prompt(inputs)
      
      # Stream state tracking
      in_reasoning = true
      reasoning_buffer = StringIO.new
      answer_buffer = StringIO.new
      
      @llm.generate_streaming(prompt: prompt) do |token|
        if token.include?("Answer:") || token.include?("Therefore:")
          in_reasoning = false
        end
        
        if in_reasoning
          reasoning_buffer.write(token)
          yield({ type: :reasoning, token: token }) if block_given?
        else
          answer_buffer.write(token)
          yield({ type: :answer, token: token }) if block_given?
        end
      end
      
      # Parse complete response
      {
        reasoning: clean_text(reasoning_buffer.string),
        answer: clean_text(answer_buffer.string)
      }
    end
  end
end

3. Streaming Utilities

module Desiru::Streaming
  # Buffer with timeout
  class TimedBuffer
    def initialize(timeout: 30)
      @buffer = Queue.new
      @timeout = timeout
      @complete = false
    end
    
    def write(token)
      @buffer.push(token)
    end
    
    def read
      Timeout.timeout(@timeout) do
        @buffer.pop
      end
    rescue Timeout::Error
      raise TimeoutError, "Stream timeout after #{@timeout} seconds"
    end
    
    def complete\!
      @complete = true
      @buffer.push(:eof)
    end
  end
  
  # Stream transformers
  class StreamTransformer
    def initialize(stream)
      @stream = stream
    end
    
    def filter(&block)
      StreamTransformer.new(
        Enumerator.new do |yielder|
          @stream.each do |token|
            yielder << token if block.call(token)
          end
        end
      )
    end
    
    def map(&block)
      StreamTransformer.new(
        Enumerator.new do |yielder|
          @stream.each do |token|
            yielder << block.call(token)
          end
        end
      )
    end
    
    def batch(size)
      StreamTransformer.new(
        Enumerator.new do |yielder|
          buffer = []
          @stream.each do |token|
            buffer << token
            if buffer.size >= size
              yielder << buffer.join
              buffer.clear
            end
          end
          yielder << buffer.join if buffer.any?
        end
      )
    end
  end
end

4. Provider Streaming Support

module Desiru::LLM
  class OpenAI < Base
    def generate_streaming(prompt:, **options, &block)
      stream_processor = StreamProcessor.new(&block)
      
      response = @client.chat(
        parameters: {
          model: @model,
          messages: format_messages(prompt),
          stream: true,
          **options
        }
      )
      
      response.each do |chunk|
        if chunk.dig("choices", 0, "delta", "content")
          token = chunk.dig("choices", 0, "delta", "content")
          stream_processor.process(token)
        end
      end
      
      stream_processor.complete
    end
  end
  
  class StreamProcessor
    def initialize(&block)
      @block = block
      @buffer = StringIO.new
    end
    
    def process(token)
      @buffer.write(token)
      @block.call(token) if @block
    end
    
    def complete
      OpenStruct.new(text: @buffer.string)
    end
  end
end

5. Web Integration (SSE)

module Desiru::API
  class StreamingEndpoint < Grape::API
    format :json
    
    get '/stream/complete' do
      content_type 'text/event-stream'
      
      stream do |out|
        module_instance = ChainOfThought.new(signature: params[:signature])
        
        module_instance.forward_stream(params[:inputs]) do |token_data|
          event = {
            type: token_data[:type],
            content: token_data[:token]
          }
          
          out << "data: #{event.to_json}\n\n"
        end
        
        out << "data: [DONE]\n\n"
      end
    end
  end
end

Example Usage

# Basic streaming
cot = Desiru::Modules::StreamingChainOfThought.new(
  signature: "question -> reasoning, answer"
)

# Stream to console
cot.forward_stream(question: "What is consciousness?") do |token_data|
  if token_data[:type] == :reasoning
    print token_data[:token]  # Real-time reasoning display
  end
end

# Stream transformation
stream = cot.create_stream(question: "Explain quantum computing")
filtered = Desiru::Streaming::StreamTransformer.new(stream)
  .filter { |token| \!token[:token].strip.empty? }
  .batch(10)  # Batch tokens for efficiency

filtered.each do |batch|
  puts batch  # Process batched tokens
end

# With timeout protection
buffer = Desiru::Streaming::TimedBuffer.new(timeout: 60)

Thread.new do
  cot.forward_stream(question: "Long question...") do |token|
    buffer.write(token)
  end
  buffer.complete\!
end

# Read from buffer with timeout
while token = buffer.read
  break if token == :eof
  process_token(token)
end

# Non-streaming fallback
module_instance = Desiru::Modules::ChainOfThought.new(
  signature: "question -> answer",
  model: "non-streaming-model"
)

# Automatically falls back to non-streaming
result = module_instance.forward(question: "What is Ruby?")

Testing Requirements

  • Unit tests for streaming interface
  • Integration tests with mock streaming
  • Test timeout handling
  • Test stream transformation
  • Performance tests for streaming vs batch
  • Test graceful degradation

Considerations

  • Memory management for long streams
  • Error handling mid-stream
  • Partial result handling
  • Stream cancellation support
  • Backpressure handling

Priority

Medium - Improves UX significantly but not required for core functionality

Metadata

Metadata

Assignees

No one assigned

    Labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions