Skip to content

EnSync-engine/Go-SDK

Repository files navigation

EnSync Go SDK

Go Reference Go Report Card

Go client SDK for EnSync Engine - an event-delivery based integration engine that enables you to integrate with third-party apps as though they were native to your system and in real-time.

Features

  • Dual Transport Support: gRPC (high-performance) and WebSocket (real-time)
  • End-to-End Encryption: Ed25519/Curve25519 encryption with hybrid encryption support
  • Event Management: Publish, subscribe, acknowledge, defer, discard, and replay events
  • Flow Control: Pause and resume event processing
  • Automatic Reconnection: Built-in reconnection logic with configurable retry
  • Type-Safe: Idiomatic Go interfaces and strong typing
  • Concurrent-Safe: Thread-safe operations with proper synchronization
  • Context Support: Full context.Context integration for cancellation

Installation

go get github.com/EnSync-engine/Go-SDK

Quick Start

gRPC Client (Recommended for Server-to-Server)

package main

import (
    "log"
    ensync "github.com/EnSync-engine/Go-SDK/grpc"
)

func main() {
    // Create gRPC engine
    engine, err := ensync.NewGRPCEngine("grpc://localhost:50051")
    if err != nil {
        log.Fatal(err)
    }
    defer engine.Close()

    // Authenticate
    err = engine.CreateClient("your-access-key")
    if err != nil {
        log.Fatal(err)
    }

    // Publish an event
    eventID, err := engine.Publish(
        "yourcompany/payment/created",
        []string{"recipient-public-key-base64"},
        map[string]interface{}{
            "amount": 100,
            "currency": "USD",
        },
        nil,
        nil,
    )
    if err != nil {
        log.Fatal(err)
    }

    log.Printf("Event published: %s", eventID)
}

WebSocket Client

package main

import (
    "log"
    ensync "github.com/EnSync-engine/Go-SDK/websocket"
)

func main() {
    // Create WebSocket engine
    engine, err := ensync.NewWebSocketEngine("ws://localhost:8082")
    if err != nil {
        log.Fatal(err)
    }
    defer engine.Close()

    // Authenticate
    err = engine.CreateClient("your-access-key")
    if err != nil {
        log.Fatal(err)
    }

    // Subscribe to events
    subscription, err := engine.Subscribe("yourcompany/payment/created", &ensync.SubscribeOptions{
        AutoAck: true,
    })
    if err != nil {
        log.Fatal(err)
    }

    // Register event handler
    subscription.AddHandler(func(event *ensync.EventPayload) error {
        log.Printf("Received: %+v", event.Payload)
        return nil
    })

    // Keep running...
    select {}
}

Transport Options

gRPC (Default)

High-performance binary protocol with HTTP/2, ideal for server-to-server communication.

// Insecure connection
engine, _ := ensync.NewGRPCEngine("grpc://localhost:50051")

// Secure connection (TLS)
engine, _ := ensync.NewGRPCEngine("grpcs://node.ensync.cloud:50051")

WebSocket

Real-time bidirectional communication, great for browser and Node.js applications.

// Insecure connection
engine, _ := ensync.NewWebSocketEngine("ws://localhost:8082")

// Secure connection (WSS)
engine, _ := ensync.NewWebSocketEngine("wss://node.ensync.cloud:8082")

API Reference

Creating a Client

// Basic client creation
engine, err := ensync.NewGRPCEngine("grpc://localhost:50051")
err = engine.CreateClient("your-access-key")

// With options
err = engine.CreateClient("your-access-key",
    ensync.WithAppSecretKey("your-app-secret"),
    ensync.WithClientID("custom-client-id"),
)

Publishing Events

eventID, err := engine.Publish(
    eventName string,                      // Event name
    recipients []string,                   // Recipient public keys (base64)
    payload map[string]interface{},        // Event payload
    metadata *ensync.EventMetadata,        // Optional metadata
    options *ensync.PublishOptions,        // Optional publish options
)

Example:

metadata := &ensync.EventMetadata{
    Persist: true,
    Headers: map[string]string{
        "source": "payment-service",
    },
}

options := &ensync.PublishOptions{
    UseHybridEncryption: true, // Use hybrid encryption for multiple recipients
}

eventID, err := engine.Publish(
    "yourcompany/payment/created",
    []string{"recipient1-key", "recipient2-key"},
    map[string]interface{}{
        "transactionId": "txn-123",
        "amount": 100.50,
        "currency": "USD",
    },
    metadata,
    options,
)

Subscribing to Events

subscription, err := engine.Subscribe(
    eventName string,
    options *ensync.SubscribeOptions,
)

Subscribe Options:

options := &ensync.SubscribeOptions{
    AutoAck: true,                    // Auto-acknowledge events
    AppSecretKey: "your-secret-key",  // Override decryption key
}

Event Handlers

// Register a handler
unsubscribe := subscription.AddHandler(func(event *ensync.EventPayload) error {
    log.Printf("Event: %s", event.EventName)
    log.Printf("ID: %s", event.Idem)
    log.Printf("Block: %d", event.Block)
    log.Printf("Payload: %+v", event.Payload)
    log.Printf("Timestamp: %v", event.Timestamp)
    log.Printf("Sender: %s", event.Sender)
    
    // Process event...
    
    return nil
})

// Unregister handler
unsubscribe()

Event Management

Acknowledge Event

err := subscription.Ack(eventIdem string, block int64)

Defer Event

Postpone event processing for later delivery:

response, err := subscription.Defer(
    eventIdem string,
    delayMs int64,      // Delay in milliseconds (1000ms to 24h)
    reason string,      // Optional reason
)

Discard Event

Permanently reject an event without processing:

response, err := subscription.Discard(
    eventIdem string,
    reason string,      // Optional reason
)

Replay Event

Request a specific event to be sent again:

event, err := subscription.Replay(eventIdem string)

Flow Control

Pause Processing

err := subscription.Pause("System maintenance")

Resume Processing

err := subscription.Resume()

Unsubscribe

err := subscription.Unsubscribe()

Close Connection

err := engine.Close()

Event Structure

type EventPayload struct {
    EventName string                 // Event name
    Idem      string                 // Unique event ID
    Block     int64                  // Block ID for acknowledgment
    Timestamp time.Time              // Event timestamp
    Payload   map[string]interface{} // Event data
    Metadata  map[string]interface{} // Event metadata
    Sender    string                 // Sender client ID
}

Error Handling

The SDK uses typed errors for better error handling:

eventID, err := engine.Publish(...)
if err != nil {
    if ensyncErr, ok := err.(*ensync.EnSyncError); ok {
        switch ensyncErr.Type {
        case ensync.ErrTypeAuth:
            log.Println("Authentication error:", ensyncErr.Message)
        case ensync.ErrTypePublish:
            log.Println("Publish error:", ensyncErr.Message)
        case ensync.ErrTypeConnection:
            log.Println("Connection error:", ensyncErr.Message)
        default:
            log.Println("Error:", ensyncErr.Message)
        }
    }
}

Error Types:

  • ErrTypeGeneric - Generic errors
  • ErrTypeAuth - Authentication errors
  • ErrTypeConnection - Connection errors
  • ErrTypePublish - Publishing errors
  • ErrTypeSubscription - Subscription errors
  • ErrTypeTimeout - Timeout errors
  • ErrTypeReplay - Replay errors
  • ErrTypeDefer - Defer errors
  • ErrTypeDiscard - Discard errors
  • ErrTypePause - Pause errors
  • ErrTypeContinue - Continue errors
  • ErrTypeValidation - Validation errors

Complete Examples

Event Producer

package main

import (
    "log"
    ensync "github.com/EnSync-engine/Go-SDK/grpc"
)

func main() {
    engine, err := ensync.NewGRPCEngine("grpc://localhost:50051")
    if err != nil {
        log.Fatal(err)
    }
    defer engine.Close()

    err = engine.CreateClient("your-access-key")
    if err != nil {
        log.Fatal(err)
    }

    eventName := "yourcompany/payment/POS/PAYMENT_SUCCESSFUL"
    recipients := []string{"recipient-public-key-base64"}
    
    payload := map[string]interface{}{
        "transactionId": "123",
        "amount":        100,
        "terminal":      "pos-1",
        "timestamp":     1234567890,
    }

    eventID, err := engine.Publish(eventName, recipients, payload, nil, nil)
    if err != nil {
        log.Fatalf("Publish failed: %v", err)
    }

    log.Printf("Event published: %s", eventID)
}

Event Subscriber

package main

import (
    "log"
    "os"
    "os/signal"
    "syscall"
    
    ensync "github.com/EnSync-engine/Go-SDK/grpc"
)

func main() {
    engine, err := ensync.NewGRPCEngine("grpc://localhost:50051")
    if err != nil {
        log.Fatal(err)
    }
    defer engine.Close()

    err = engine.CreateClient("your-access-key")
    if err != nil {
        log.Fatal(err)
    }

    eventName := "yourcompany/payment/POS/PAYMENT_SUCCESSFUL"
    subscription, err := engine.Subscribe(eventName, &ensync.SubscribeOptions{
        AutoAck: false, // Manual acknowledgment
    })
    if err != nil {
        log.Fatal(err)
    }

    subscription.AddHandler(func(event *ensync.EventPayload) error {
        log.Printf("Event ID: %s", event.Idem)
        log.Printf("Event Block: %d", event.Block)
        log.Printf("Event Data: %+v", event.Payload)
        
        // Process the event
        if err := processPayment(event.Payload); err != nil {
            log.Printf("Processing failed: %v", err)
            // Defer for retry
            subscription.Defer(event.Idem, 5000, "Processing failed")
            return err
        }
        
        // Acknowledge successful processing
        return subscription.Ack(event.Idem, event.Block)
    })

    log.Println("Listening for events... Press Ctrl+C to exit")

    // Graceful shutdown
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
    <-sigChan

    log.Println("Shutting down...")
    subscription.Unsubscribe()
}

func processPayment(payload map[string]interface{}) error {
    // Process payment logic
    return nil
}

Configuration Options

Common Configuration Options

Both gRPC and WebSocket engines support common configuration options:

import (
    "context"
    "time"
    
    "github.com/EnSync-engine/Go-SDK/common"
    ensync "github.com/EnSync-engine/Go-SDK/grpc"
)

ctx := context.Background()
engine, err := ensync.NewGRPCEngine(ctx, "grpc://localhost:50051",
    // Circuit breaker: 5 failures, 10s base reset, 60s max reset
    common.WithCircuitBreaker(5, 10*time.Second, 60*time.Second),
    
    // Retry: 3 attempts, 1s initial backoff, 10s max backoff, 10% jitter
    common.WithRetryConfig(3, time.Second, 10*time.Second, 0.1),
    
    // Custom timeouts
    common.WithTimeoutOptions(
        common.WithOperationTimeout(15*time.Second),
        common.WithGracefulShutdownTimeout(5*time.Second),
    ),

    // Custom logger
    common.WithLogger(customLogger),
)

WebSocket-Specific Options

WebSocket engines have additional configuration options:

import (
    "context"
    "time"
    
    "github.com/EnSync-engine/Go-SDK/common"
    ensync "github.com/EnSync-engine/Go-SDK/websocket"
)

ctx := context.Background()
engine, err := ensync.NewWebSocketEngine(ctx, "ws://localhost:8082",
    // WebSocket-specific options would go here
    // (currently using embedded common config)
)

Client Authentication Options

When creating a client, you can pass additional options:

err = engine.CreateClient("your-access-key",
    common.WithAppSecretKey("your-app-secret-key"),
    common.WithClientID("custom-client-id"),
)

Best Practices

Connection Management

// Use environment variables for credentials
import "os"

accessKey := os.Getenv("ENSYNC_ACCESS_KEY")
engine, _ := ensync.NewGRPCEngine(os.Getenv("ENSYNC_URL"))
engine.CreateClient(accessKey)

// Always close connections
defer engine.Close()

Event Design

// Use hierarchical event names
eventName := "domain/entity/action"

// Example: "inventory/product/created"
// Example: "payment/transaction/completed"

Error Handling

subscription.AddHandler(func(event *ensync.EventPayload) error {
    if err := processEvent(event); err != nil {
        log.Printf("Processing error: %v", err)
        
        // Defer for retry
        subscription.Defer(event.Idem, 5000, err.Error())
        return err
    }
    
    return subscription.Ack(event.Idem, event.Block)
})

Graceful Shutdown

sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)

go func() {
    <-sigChan
    log.Println("Shutting down...")
    subscription.Unsubscribe()
    engine.Close()
    os.Exit(0)
}()

Generating gRPC Code

If you need to regenerate the gRPC code from the proto file:

# Install protoc plugins
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

# Generate code
chmod +x generate.sh
./generate.sh

Project Structure

Go-SDK/
├── common/                # Shared utilities and types
│   ├── base_engine.go     # Base engine functionality
│   ├── circuit_breaker.go # Circuit breaker implementation
│   ├── crypto.go          # Encryption/decryption utilities
│   ├── errors.go          # Error types and handling
│   ├── logger.go          # Logging utilities
│   ├── options.go         # Configuration options
│   ├── retry.go           # Retry logic
│   ├── subscription.go    # Subscription management
│   └── types.go           # Core types and interfaces
├── grpc/                  # gRPC transport implementation
│   ├── engine.go          # gRPC engine
│   └── options.go         # gRPC-specific options
├── websocket/             # WebSocket transport implementation
│   ├── engine.go          # WebSocket engine
│   └── options.go         # WebSocket-specific options
├── proto/                 # Protocol buffer definitions
│   ├── ensync.proto       # Protocol definitions
│   ├── ensync.pb.go       # Generated Go code
│   └── ensync_grpc.pb.go  # Generated gRPC code
├── examples/              # Example applications
│   ├── grpc_publisher/    # gRPC publisher example
│   ├── grpc_subscriber/   # gRPC subscriber example
│   └── websocket_example/ # WebSocket example
├── example_test.go        # Example usage tests
├── go.mod                 # Go module definition
├── go.sum                 # Go checksum file
├── Makefile               # Build automation
├── generate.sh            # gRPC code generation
└── README.md              # This file

Design Decisions

Interfaces Over Concrete Types

The SDK uses interfaces (Engine, Subscription) to allow easy mocking for testing and future extensibility.

Goroutines and Channels

Event handling uses goroutines for concurrent processing, with proper synchronization using mutexes and channels.

Context Support

All blocking operations support context.Context for cancellation and timeout control.

Error Handling

Go-idiomatic error handling with typed errors instead of try-catch blocks.

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

Development Workflow

  1. Fork the repository
  2. Create a feature branch: git checkout -b feature/new-feature
  3. Make your changes and add tests
  4. Run tests: make test
  5. Submit a pull request

Releases

We use automated semantic versioning:

# Bump version and update changelog
make version-patch   # Bug fixes
make version-minor   # New features  
make version-major   # Breaking changes

# Commit and push to trigger automated release
git add . && git commit -m "chore: bump version to v1.2.3"
git push origin main

See VERSIONING.md for detailed release instructions.

License

ISC License

Links

Support

For issues and questions:

About

No description, website, or topics provided.

Resources

License

Contributing

Stars

Watchers

Forks

Packages

No packages published