Skip to content

Latest commit

 

History

History

README.md

Datasync Package

Generic data synchronization workflows for Temporal. Provides a type-safe Source[T] -> Mapper[T,U] -> Sink[U] pipeline that runs as a Temporal workflow, with full OpenTelemetry instrumentation.

Key Features

  • Type-safe pipelineSource[T], Mapper[T,U], Sink[U] interfaces
  • Fluent builder — construct sync jobs with SyncJobBuilder
  • Built-in helpersRecordMapper, InsertIfAbsentSink, IdentityMapper
  • Composable — implements workflow.TaskInput/TaskOutput for use with Pipeline, Parallel, and DAG orchestration
  • Scheduled execution — run sync jobs on a recurring interval
  • OTel instrumented — full observability out of the box

Documentation

Quick Example

source := mySource{}
mapper := datasync.NewRecordMapper[Raw, Entity]("convert", convertFn)
sink   := datasync.NewInsertIfAbsentSink[Entity, string]("db", getID, find, create)

job, _ := builder.NewSyncJobBuilder[Raw, Entity]("my-sync").
    WithSource(source).
    WithMapper(mapper).
    WithSink(sink).
    WithSchedule(5 * time.Minute).
    Build()