Skip to content

janmarkuslanger/gopipe

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

6 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

gopipe

Small, technical pipeline library for Go. Focus: clear interfaces, chunked processing, and easy extensibility.

Code coverage Go Report Latest Release Build Status Download ZIP


Features

  • Chunked processing with backpressure-friendly readers
  • Composable Transform and Filter steps
  • Lifecycle hooks for timing, tracing, and error reporting
  • Context propagation across reader/steps/writer
  • Minimal core plus multiple reader implementations (CSV today, more can be added)

Installation

go get github.com/janmarkuslanger/gopipe

Quick start

package main

import (
    "context"
    "fmt"
    "log"

    "github.com/janmarkuslanger/gopipe/pipline"
)

func main() {
    ctx := context.Background()

    reader := func(ctx context.Context, n int) ([]pipline.Record, bool, error) {
        records := []pipline.Record{
            {"name": "Alice", "age": 29},
            {"name": "Bob", "age": 34},
        }
        return records, true, nil
    }

    writer := func(ctx context.Context, records []pipline.Record) error {
        for _, r := range records {
            fmt.Println(r)
        }
        return nil
    }

    p := pipline.New("quickstart").
        Read(reader).
        Filter(func(ctx context.Context, r pipline.Record) (bool, error) {
            age, _ := r["age"].(int)
            return age >= 30, nil
        }).
        Write(writer)

    if err := p.Run(ctx, 2); err != nil {
        log.Fatal(err)
    }
}

Concepts

Record

type Record map[string]any

A Record is the data unit passed through the pipeline. Readers and steps are free to choose the value types.

Reader / Transform / Filter / Writer

type Reader func(ctx context.Context, n int) (records []Record, done bool, err error)

type Transform func(ctx context.Context, record Record) (Record, error)

type Predicate func(ctx context.Context, record Record) (bool, error)

type Writer func(ctx context.Context, records []Record) error
  • A Reader returns up to n records and signals when no more data is available via done=true.
  • A Transform returns a (possibly modified) record; it does not drop records.
  • A Filter (Predicate) decides whether a record should be kept.
  • A Writer receives a slice of records (the current chunk).

API reference (package pipline)

Pipeline construction

p := pipline.New("name").
    Read(reader).
    Transform(fn).
    Filter(pred).
    Write(writer)

err := p.Run(ctx, chunkSize)

Notes:

  • New(name string) requires a non-empty name.
  • Read and Write are mandatory; otherwise Run returns an error.
  • Multiple Transform and Filter steps can be chained.
  • Run(ctx, chunkSize) requires chunkSize > 0.

Hooks

type PipelineHook interface {
    OnPipelineStart(ctx context.Context, info PipelineInfo)
    OnPipelineEnd(ctx context.Context, info PipelineInfo, err error, dur time.Duration)

    OnReadStart(ctx context.Context, info PipelineInfo)
    OnReadEnd(ctx context.Context, info PipelineInfo, records int, err error, dur time.Duration)

    OnStepStart(ctx context.Context, info PipelineInfo, step int)
    OnStepEnd(ctx context.Context, info PipelineInfo, step int, err error, dur time.Duration)

    OnWriteStart(ctx context.Context, info PipelineInfo, records int)
    OnWriteEnd(ctx context.Context, info PipelineInfo, err error, dur time.Duration)
}

Use p.Hook(myHook) to attach a hook implementation. The default is NoopPipelineHook.

Readers (package reader)

The reader package is intended to host multiple reader implementations. A pipeline uses exactly one reader instance, but the library can ship many readers that all implement the same pipline.Reader interface.

CSV reader

func NewCSVChunkReader(path string, opts CSVReaderOptions) (
    readerFn pipline.Reader,
    closeFn func() error,
    err error,
)

Options

type CSVReaderOptions struct {
    Comma               rune
    TrimLeadingSpace    bool
    AllowVariableFields bool
}

Behavior

  • First row is treated as header.
  • Each data row becomes a Record (header[i] -> row[i]).
  • Missing values are set to "".
  • All values are strings; convert types in your pipeline.
  • Always call closeFn() (use defer).

Example: CSV pipeline

package main

import (
    "context"
    "fmt"
    "log"
    "strconv"

    "github.com/janmarkuslanger/gopipe/pipline"
    "github.com/janmarkuslanger/gopipe/reader"
)

func main() {
    ctx := context.Background()

    csvReader, closeFn, err := reader.NewCSVChunkReader("example.csv", reader.CSVReaderOptions{
        TrimLeadingSpace:    true,
        AllowVariableFields: true,
    })
    if err != nil {
        log.Fatalf("open csv: %v", err)
    }
    defer func() {
        if err := closeFn(); err != nil {
            log.Printf("close csv: %v", err)
        }
    }()

    p := pipline.New("csv-demo").
        Read(csvReader).
        Filter(func(ctx context.Context, record pipline.Record) (bool, error) {
            v, ok := record["age"]
            if !ok {
                return false, nil
            }
            s, ok := v.(string)
            if !ok {
                return false, nil
            }
            age, err := strconv.Atoi(s)
            if err != nil {
                return false, err
            }
            return age > 29, nil
        }).
        Transform(func(ctx context.Context, record pipline.Record) (pipline.Record, error) {
            if v, ok := record["city"]; ok {
                if s, ok := v.(string); ok {
                    record["city"] = s
                }
            }
            return record, nil
        }).
        Write(func(ctx context.Context, records []pipline.Record) error {
            for _, rec := range records {
                fmt.Println(rec)
            }
            return nil
        })

    if err := p.Run(ctx, 2); err != nil {
        log.Fatalf("pipeline failed: %v", err)
    }
}

Error handling

Errors returned by the reader, any step, or the writer stop execution. Run wraps errors with context (for example, read failed: ...).

License

See LICENSE.

About

Composable pipeline lib written in go.

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages