Skip to content

Implement Data Loaders and Dataset Management #18

@obie

Description

@obie

Overview

Implement comprehensive data loading utilities supporting various formats (CSV, JSON, HuggingFace) with built-in train/dev/test splitting and DSPy-compatible formatting.

Description

Data loaders are essential for working with real datasets in DSPy. They handle loading from various sources, automatic splitting, and formatting data into Example objects that work seamlessly with the framework.

Key Features to Implement

  • Load from multiple formats (CSV, JSON, JSONL, HuggingFace)
  • Automatic train/dev/test splitting
  • Data validation and cleaning
  • Batch iteration support
  • Lazy loading for large datasets
  • Format conversion to Example objects

Implementation Requirements

1. Dataset Class

module Desiru
  class Dataset
    include Enumerable
    
    attr_reader :examples, :name, :splits
    
    def initialize(examples: [], name: nil)
      @examples = ensure_examples(examples)
      @name = name
      @splits = {}
    end
    
    # Iteration
    def each(&block)
      @examples.each(&block)
    end
    
    def [](index)
      @examples[index]
    end
    
    def size
      @examples.size
    end
    
    # Splitting
    def split(ratios: [0.8, 0.1, 0.1], shuffle: true, seed: nil)
      rng = seed ? Random.new(seed) : Random.new
      
      examples = shuffle ? @examples.shuffle(random: rng) : @examples.dup
      
      train_size = (examples.size * ratios[0]).floor
      dev_size = (examples.size * ratios[1]).floor
      
      @splits[:train] = Dataset.new(
        examples: examples[0...train_size],
        name: "#{@name}_train"
      )
      
      @splits[:dev] = Dataset.new(
        examples: examples[train_size...(train_size + dev_size)],
        name: "#{@name}_dev"
      )
      
      @splits[:test] = Dataset.new(
        examples: examples[(train_size + dev_size)..-1],
        name: "#{@name}_test"
      )
      
      self
    end
    
    # Filtering
    def filter(&block)
      Dataset.new(
        examples: @examples.select(&block),
        name: @name
      )
    end
    
    # Mapping
    def map(&block)
      Dataset.new(
        examples: @examples.map(&block),
        name: @name
      )
    end
    
    # Sampling
    def sample(n, seed: nil)
      rng = seed ? Random.new(seed) : Random.new
      Dataset.new(
        examples: @examples.sample(n, random: rng),
        name: "#{@name}_sample"
      )
    end
    
    # Batching
    def batch(size)
      Enumerator.new do |yielder|
        @examples.each_slice(size) do |batch|
          yielder << Dataset.new(examples: batch)
        end
      end
    end
    
    private
    
    def ensure_examples(examples)
      examples.map { |ex|
        ex.is_a?(Example) ? ex : Example.new(**ex)
      }
    end
  end
end

2. Data Loaders

module Desiru
  module DataLoaders
    # Base loader
    class Base
      def load(source, **options)
        raise NotImplementedError
      end
      
      protected
      
      def create_dataset(data, name: nil)
        examples = data.map { |item| create_example(item) }
        Dataset.new(examples: examples, name: name)
      end
      
      def create_example(item)
        Example.new(**normalize_fields(item))
      end
    end
    
    # CSV Loader
    class CSV < Base
      def load(filepath, headers: true, field_mapping: {}, **options)
        require 'csv'
        
        data = ::CSV.read(filepath, headers: headers, **options)
        
        examples = data.map do |row|
          fields = {}
          
          if field_mapping.any?
            field_mapping.each do |csv_field, example_field|
              fields[example_field] = row[csv_field.to_s]
            end
          else
            row.to_h.each { |k, v| fields[k.to_sym] = v }
          end
          
          Example.new(**fields)
        end
        
        Dataset.new(examples: examples, name: File.basename(filepath, '.*'))
      end
    end
    
    # JSON Loader
    class JSON < Base
      def load(filepath, field_mapping: {}, **options)
        require 'json'
        
        data = ::JSON.parse(File.read(filepath))
        data = [data] unless data.is_a?(Array)
        
        examples = data.map do |item|
          fields = {}
          
          if field_mapping.any?
            field_mapping.each do |json_path, example_field|
              fields[example_field] = extract_path(item, json_path)
            end
          else
            fields = item.transform_keys(&:to_sym)
          end
          
          Example.new(**fields)
        end
        
        Dataset.new(examples: examples, name: File.basename(filepath, '.*'))
      end
      
      private
      
      def extract_path(data, path)
        path.split('.').reduce(data) { |obj, key| obj[key] }
      end
    end
    
    # JSONL Loader
    class JSONL < Base
      def load(filepath, field_mapping: {}, **options)
        examples = []
        
        File.foreach(filepath) do |line|
          next if line.strip.empty?
          
          item = JSON.parse(line)
          fields = field_mapping.any? ? 
            map_fields(item, field_mapping) : 
            item.transform_keys(&:to_sym)
          
          examples << Example.new(**fields)
        end
        
        Dataset.new(examples: examples, name: File.basename(filepath, '.*'))
      end
    end
    
    # HuggingFace Loader
    class HuggingFace < Base
      def load(dataset_name, split: "train", field_mapping: {}, **options)
        require 'huggingface_hub'
        
        # Download dataset
        dataset = HuggingfaceHub.load_dataset(dataset_name, split: split)
        
        examples = dataset.map do |item|
          fields = field_mapping.any? ? 
            map_fields(item, field_mapping) : 
            item.transform_keys(&:to_sym)
          
          Example.new(**fields)
        end
        
        Dataset.new(examples: examples, name: "#{dataset_name}_#{split}")
      end
    end
  end
end

3. Convenience Methods

module Desiru
  class << self
    def load_dataset(source, format: :auto, **options)
      loader = case format
      when :auto
        detect_format(source)
      when :csv
        DataLoaders::CSV.new
      when :json
        DataLoaders::JSON.new
      when :jsonl
        DataLoaders::JSONL.new
      when :huggingface
        DataLoaders::HuggingFace.new
      else
        raise ArgumentError, "Unknown format: #{format}"
      end
      
      loader.load(source, **options)
    end
    
    private
    
    def detect_format(source)
      case source
      when /\.csv$/i
        DataLoaders::CSV.new
      when /\.json$/i
        DataLoaders::JSON.new
      when /\.jsonl$/i
        DataLoaders::JSONL.new
      else
        # Assume HuggingFace if not a file
        DataLoaders::HuggingFace.new
      end
    end
  end
end

4. Lazy Loading for Large Datasets

module Desiru
  class LazyDataset < Dataset
    def initialize(loader:, source:, batch_size: 1000, **options)
      @loader = loader
      @source = source
      @batch_size = batch_size
      @options = options
      @cache = []
      @fully_loaded = false
    end
    
    def each
      if @fully_loaded
        @cache.each { |example| yield example }
      else
        @loader.stream(@source, **@options) do |batch|
          batch.each do |example|
            @cache << example
            yield example
          end
        end
        @fully_loaded = true
      end
    end
    
    def size
      load_all unless @fully_loaded
      @cache.size
    end
  end
end

Example Usage

# Load from CSV
dataset = Desiru.load_dataset(
  "data/questions.csv",
  field_mapping: {
    'question_text' => :question,
    'correct_answer' => :answer
  }
)

# Split into train/dev/test
dataset.split(ratios: [0.7, 0.15, 0.15], shuffle: true, seed: 42)

train_set = dataset.splits[:train]
dev_set = dataset.splits[:dev]
test_set = dataset.splits[:test]

# Load from JSON with nested fields
dataset = Desiru.load_dataset(
  "data/qa_pairs.json",
  field_mapping: {
    'data.question' => :question,
    'data.context' => :context,
    'data.answer.text' => :answer
  }
)

# Load from HuggingFace
dataset = Desiru.load_dataset(
  "squad", 
  format: :huggingface,
  split: "train",
  field_mapping: {
    'question' => :question,
    'context' => :context,
    'answers.text.0' => :answer  # First answer
  }
)

# Filter and sample
filtered = dataset
  .filter { |ex| ex.question.length > 10 }
  .sample(100)

# Batch processing
dataset.batch(32).each do |batch|
  # Process batch of 32 examples
  results = program.forward_batch(batch)
end

# Lazy loading for large datasets
large_dataset = Desiru::LazyDataset.new(
  loader: Desiru::DataLoaders::JSONL.new,
  source: "huge_dataset.jsonl",
  batch_size: 1000
)

Configuration for Common Datasets

# Predefined configurations for common datasets
Desiru::DataLoaders.register(:squad) do
  {
    format: :huggingface,
    dataset_name: "squad",
    field_mapping: {
      'question' => :question,
      'context' => :context,
      'answers.text.0' => :answer
    }
  }
end

# Use registered configuration
dataset = Desiru.load_dataset(:squad)

Testing Requirements

  • Unit tests for each loader type
  • Test various field mappings
  • Test splitting with different ratios
  • Test lazy loading functionality
  • Test error handling for malformed data
  • Performance tests for large datasets

Priority

High - Essential for working with real datasets and benchmarking

Metadata

Metadata

Assignees

No one assigned

    Labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions