Skip to content

uniyakcom/yakio

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

5 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

yakio

English | 简体中文

Go Version Go Reference Go Report Card License: MIT Lint Test Fuzz

yakio is a high-performance, embeddable TCP server core library. Built on an event-driven reactor engine with a zero-copy write path, it provides a lean, efficient TCP service framework designed as the foundation for custom network protocol layers.


Table of Contents


Installation

go get github.com/uniyakcom/yakio

Public API Quick Reference

Category API Description
Server yakio.TCP(addr, handler, opts...) Create a TCP server engine
Client yakio.Connect(addr, handler, opts...) Establish a TCP connection
yakio.ConnectContext(ctx, addr, handler, opts...) Dial with context
Engine eng.Run() Start the engine (blocking)
eng.Stop(ctx) Graceful shutdown
eng.Ready() Listen-ready signal (<-eng.Ready())
eng.Addr() Actual bound address
eng.Get(id) (Conn, bool) O(1) connection lookup by ID
eng.Range(fn) Iterate all active connections
eng.Conns() int64 Current active connection count
eng.Drain() Stop accepting new connections
eng.Submit(fn) bool Submit task to global worker pool
eng.SubmitCtx(ctx, fn) bool Submit with pre-flight ctx cancellation check
eng.Shutting() bool Whether shutdown has been initiated
eng.ListenerFd() Get listener fd
eng.Metrics() *Metrics Engine metrics (with Snapshot())
eng.TopNConns(n) Top-N IPs by connection count (requires PerIP)
eng.TopNMsgRates(n) Top-N IPs by message volume (requires PerIPMsg)
Conn c.ID() uint64 Unique connection ID
c.Write([]byte) error Write data
c.Close() error Close the connection
c.RemoteAddr() / LocalAddr() Remote / local address
c.SetDeadline(t) Per-conn timeout
c.PauseRead() / c.ResumeRead() Backpressure control
c.SetContext(v) / c.Context() Bind / read user data
c.Fd() int Underlying fd (stdnet returns -1)
c.TLSState() TLS negotiation state
c.TLSClientHello() Client's ClientHello info
c.TLSConn() Underlying *tls.Conn
Codec codec.LengthField(n) Length-prefix framing (1/2/4 bytes)
codec.JSON[T]() Generic JSON codec
codec.RESP() RESP2/3 request frame decoder
codec.Line() / codec.Delimiter(sep) Line / custom delimiter
codec.Fixed(n) Fixed-length framing
yakproto.New(factory) Protobuf codec (sub-module)
TLS yakio.NewSessionCache(cap) TLS session ticket cache
Debug yakio.EnableDebug() / DisableDebug() Toggle diagnostic logs

Features

  • Triple backend: epoll/kqueue reactor, io_uring (Linux 5.6+, multi-ring, SEND_ZC, TLS bridge, kernel-driven idle timeout), auto-fallback to stdnet on other platforms
  • Zero-copy writes: reactor uses writev(2) + ring.Buffer scatter-gather; VectorWriter.WriteMany eliminates intermediate copies for header+body responses
  • kTLS offload: optional kernel TLS on Linux 4.13+ (AES-128/256-GCM TX+RX bidirectional), supported on go1.21+, auto-fallback on unsupported versions
  • Complete protocol framework: built-in codec package supports length-prefix, line/delimiter, fixed-size, RESP2/3, generic JSON; codec/proto sub-module for Protobuf — zero external dependencies in the main module
  • TLS 1.2/1.3 native integration: TLS13Only() enforces version; Session Cache enables 1-RTT resumption; TLSConn() exposes the full handshake interface; both reactor and io_uring backends support TLS with full Codec framing in TLS mode
  • Configurable eventLoop count: Loops(n) controls reactor sub-loop count (defaults to GOMAXPROCS); useful for prefork scenarios to avoid CPU over-subscription
  • Multi-layer protection: global/per-IP connection limits, message rate limiting (per-conn + IP-aggregated), write timeout, Slowloris defense
  • Fine-grained observability: zero-lock atomic metrics snapshots, TopNConns/TopNMsgRates hotspot diagnosis, yaklog structured logging
  • Graceful shutdown & zero-downtime restart: Stop(ctx) drains connections, ShutdownHandler sends protocol-level goodbye frames, fd handover for zero-downtime restart
  • Backpressure & affinity routing: PauseRead/ResumeRead per-conn backpressure; WithAffinityKey pins same-IP connections to one eventLoop
  • Pipeline fair scheduling: Quota(n) prevents a single connection from monopolizing the eventLoop; BatchFlushHandler.OnDataEnd batch-flush hook

Quick Start

TCP Echo Server

package main

import (
    "context"
    "log"
    "os"
    "os/signal"
    "time"

    "github.com/uniyakcom/yakio"
)

func main() {
    eng := yakio.TCP(":8080", yakio.Events{
        Open:  func(c yakio.Conn) error          { log.Println("new:", c.RemoteAddr()); return nil },
        Data:  func(c yakio.Conn, b []byte) error { return c.Write(b) },
        Close: func(c yakio.Conn, err error)     { log.Println("closed:", err) },
    })

    go eng.Run()
    <-eng.Ready()
    log.Println("listening on", eng.Addr())

    quit := make(chan os.Signal, 1)
    signal.Notify(quit, os.Interrupt)
    <-quit

    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    eng.Stop(ctx)
}

Frame Protocol Quick Start

import "github.com/uniyakcom/yakio/codec"

// 4-byte big-endian length prefix
eng := yakio.TCP(":8080", handler, yakio.WithCodec(codec.LengthField(4)))

// JSON frames (4B length prefix + JSON body)
type Msg struct{ Text string `json:"text"` }
tc := codec.JSON[Msg]()
eng = yakio.TCP(":8081", yakio.Events{
    Data: func(c yakio.Conn, b []byte) error {
        msg, _ := tc.Unmarshal(b)
        resp, _ := tc.MarshalFrame(Msg{Text: "pong:" + msg.Text})
        return c.Write(resp)
    },
}, yakio.WithCodec(tc))

Handler Interface

type Handler interface {
    OnOpen(c Conn) error              // New connection; return non-nil to reject and close
    OnData(c Conn, data []byte) error // Data received (after Codec decode); return non-nil to close
    OnClose(c Conn, err error)        // Connection closed; err == nil means normal close
}

Functional alternative (Events):

yakio.Events{
    Open:  func(c yakio.Conn) error          { ... },
    Data:  func(c yakio.Conn, b []byte) error { ... },
    Close: func(c yakio.Conn, err error)     { ... },
}

Optional extension interfaces:

Interface Method Trigger
BatchFlushHandler OnDataEnd(c Conn) Reactor: after each batch of read events, for deferred flush (application-layer Nagle alternative)
ShutdownHandler OnShutdown(c Conn) During Stop(ctx), called per active connection for protocol-level goodbye frames

Conn Interface

Method Description
ID() uint64 Globally unique ID (monotonically increasing from 1 within the process); usable as map key or hex trace ID
Write([]byte) error Write data (reactor: async; stdnet: sync)
Close() error Close the connection
RemoteAddr() net.Addr Remote address
LocalAddr() net.Addr Local address
SetDeadline(time.Time) error Per-conn deadline (orthogonal to global IdleTimeout)
PauseRead() / ResumeRead() Pause / resume read events for this connection (backpressure)
Context() any / SetContext(any) Bind / read user data (connection-level session state)
Fd() int Underlying file descriptor (stdnet returns -1)
NetConn() net.Conn Underlying net.Conn (stdnet only)
TLSState() *tls.ConnectionState TLS negotiation state (version, cipher suite, certificate chain, etc.)
TLSClientHello() *tls.ClientHelloInfo Client's ClientHello extensions (SNI, supported ALPN, etc.), available in OnOpen
TLSConn() *tls.Conn Underlying *tls.Conn (OCSP stapling, hostname verification, etc.)

Connection state management (two equivalent approaches):

var sessions sync.Map

// Approach 1: inline in Handler
yakio.Events{
    Open:  func(c yakio.Conn) error    { sessions.Store(c.ID(), c); return nil },
    Close: func(c yakio.Conn, _ error) { sessions.Delete(c.ID()) },
}

// Approach 2: ConnChange global hooks (decouples Handler from connection registry)
yakio.TCP(":8080", handler,
    yakio.ConnChange(
        func(c yakio.Conn)         { sessions.Store(c.ID(), c) },
        func(c yakio.Conn, _ error){ sessions.Delete(c.ID()) },
    ),
)

// O(1) targeted push (no Range iteration)
if conn, ok := eng.Get(targetID); ok {
    conn.Write(msg)
}

When ConnClose fires, eng.Get(id) already correctly returns false.


Protocol Codecs

Built-in Codec Overview

Codec Factory Description
Length prefix codec.LengthField(n) n=1/2/4 byte big-endian; LengthFieldWith(n, order, maxSize) for custom byte order and limit
Line delimiter codec.Line() / codec.LineMax(n) \n-delimited; LineMax limits max line length
Custom delimiter codec.Delimiter(sep) / codec.DelimiterMax(sep, n) Any byte as delimiter
Fixed-length codec.Fixed(n) Fixed-size frames
RESP2/3 codec.RESP() / codec.RESPMax(bulkMax, lineMax) Redis request frame decoder; Encode is pass-through (responses constructed by app)
JSON (generic) codec.JSON[T]() / codec.NDJSON[T]() 4B length-prefix+JSON / \n-delimited; JSONWith[T](inner) for custom inner codec
Protobuf yakproto.New(factory) Standalone sub-module, see below

JSON Message Codec

type Msg struct{ Text string `json:"text"` }
tc := codec.JSON[Msg]()

eng := yakio.TCP(":8080", yakio.Events{
    Data: func(c yakio.Conn, b []byte) error {
        msg, _ := tc.Unmarshal(b)                                  // decode (frame header stripped)
        resp, _ := tc.MarshalFrame(Msg{Text: "pong:" + msg.Text}) // encode (with frame header)
        return c.Write(resp)
    },
}, yakio.WithCodec(tc))
Method Use Case
tc.Marshal(msg) + c.Write(data) WithCodec(tc) — engine auto-frames
tc.MarshalFrame(msg) + c.Write(frame) Codec = nil — manual frame control

Variants:

tc := codec.NDJSON[Msg]()                                                          // newline-delimited (NDJSON)
tc := codec.JSONWith[Msg](codec.LengthFieldWith(2, binary.LittleEndian, 1<<20))   // custom frame

Protobuf Sub-module

go get github.com/uniyakcom/yakio/codec/proto  # import only when needed; main module has zero external deps
import yakproto "github.com/uniyakcom/yakio/codec/proto"

tc := yakproto.New(func() *pb.Req { return &pb.Req{} })

eng := yakio.TCP(":9090", yakio.Events{
    Data: func(c yakio.Conn, b []byte) error {
        req, _ := tc.Unmarshal(b)
        resp, _ := tc.Marshal(process(req))
        return c.Write(resp)
    },
}, yakio.WithCodec(tc))
Method Description
Unmarshal([]byte) (T, error) Frame content → T
Marshal(T) ([]byte, error) T → proto (without frame header)
MarshalFrame(T) ([]byte, error) T → proto + frame header
Size(T) int Serialized byte count (without frame header)

RESP Protocol

eng := yakio.TCP(":6379", yakio.Events{
    Data: func(c yakio.Conn, frame []byte) error {
        return handleRESP(c, frame) // frame is a complete RESP request (array or inline command)
    },
}, yakio.WithCodec(codec.RESP()))

TLS

cert, _ := tls.LoadX509KeyPair("cert.pem", "key.pem")
eng := yakio.TCP(":8443", handler,
    yakio.TLS(&tls.Config{Certificates: []tls.Certificate{cert}}),
)

Accessing handshake info (available in OnOpen):

st  := c.TLSState()       // negotiated: version, cipher suite, ALPN, peer certificates
ch  := c.TLSClientHello() // client extensions: SNI, supported versions, ALPN
raw := c.TLSConn()        // underlying *tls.Conn: OCSPResponse(), VerifyHostname(), etc.

Session cache (1-RTT resumption):

cache := yakio.NewSessionCache(4096) // 4096 tickets, TTL 12h, no background goroutines

cli, _ := yakio.Connect("example.com:443", handler,
    yakio.TLS(&tls.Config{ServerName: "example.com"}),
    yakio.TLSSessionCache(cache),
)
stats := cache.Stats() // HitRate(), Size()

TLS-related options:

Option Description Default
TLS(cfg) Enable TLS (automatically enforces minimum TLS 1.2)
TLS13Only() Enforce TLS 1.3
Hello(fn) ClientHello callback (SNI routing, dynamic certificate selection)
TLSSessionCache(c) Inject session ticket cache
KernelTLS() kTLS TX+RX offload (Linux 4.13+, AES-128/256-GCM)
TLSHandshakeTimeout(d) Handshake timeout; takes min(d, IdleTimeout) 10s
TLSHandshakeConcurrency(n) Concurrent handshake limit (0 = auto: MaxConns/4, [256, 8192]); applies to reactor and io_uring auto
MaxTLSBridgeBuf(n) TLS bridge read buffer cap (prevents OOM during handler blocking) 8 MB

TCP Client

// Basic usage
cli, err := yakio.Connect("127.0.0.1:8080", yakio.Events{
    Data:  func(c yakio.Conn, b []byte) error { fmt.Printf("<- %s\n", b); return nil },
    Close: func(c yakio.Conn, err error)     { fmt.Println("closed", err) },
})
defer cli.Close()
cli.Write([]byte("hello"))

// Dial with context
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
cli, err = yakio.ConnectContext(ctx, addr, handler)

// Auto-reconnect (exponential backoff: base×2^n + jitter)
cli, err = yakio.Connect(addr, handler,
    yakio.WithReconnect(-1, 500*time.Millisecond), // -1 = infinite retries
    yakio.ReconnectMaxDelay(30*time.Second),
)

// TLS client
cli, err = yakio.Connect("example.com:443", handler,
    yakio.TLS(&tls.Config{ServerName: "example.com"}),
)

Client methods: Write([]byte) error, Close() error, Conn() Conn, Metrics() *Metrics

Client-only option Description Default
DialTimeout(d) Dial timeout OS default
WithReconnect(maxRetry, baseDelay) Auto-reconnect; maxRetry=-1 infinite, 0 disabled disabled
ReconnectMaxDelay(d) Reconnect backoff cap 30s

Each Client has its own *Metrics instance (via cli.Metrics()), independent from global eng.Metrics().


Engine Lifecycle

eng := yakio.TCP(":8080", handler, opts...)

go eng.Run()         // start listening (blocking; call in a goroutine)
<-eng.Ready()        // wait for listen-ready (use eng.Addr() for actual port when port 0)
eng.Conns()          // current active connection count
eng.Get(id)          // O(1) connection lookup by ID
eng.Range(fn)        // iterate all active connections (fn returns false to stop early)
eng.Drain()          // stop accepting new connections (existing connections unaffected)
eng.Stop(ctx)        // graceful shutdown: drain or DrainTimeout, then force-close
eng.Shutting()       // whether shutdown has been initiated
eng.ListenerFd()     // get listener fd (for graceful restart)

Broadcast & targeted push:

// Broadcast
eng.Range(func(c yakio.Conn) bool { c.Write(msg); return true })

// O(1) targeted push
if conn, ok := eng.Get(targetID); ok { conn.Write(msg) }

Semantic guarantee: When OnClose fires, eng.Get(id) already correctly returns false.

Worker pool (Submit / SubmitCtx):

Offload heavy work from the eventLoop to a dedicated goroutine pool (default GOMAXPROCS×4 workers):

conn.PauseRead()
ok := eng.Submit(func() {
    defer conn.ResumeRead()
    conn.Write(db.Query(key))
})
// ok == false: engine shut down, or queue full (when WithSubmitQueueSize is set)

SubmitCtx(ctx, fn) checks ctx cancellation before executing fn, useful for skipping stale requests.


Configuration Options Reference

Connection & Concurrency

Option Description Default
Workers(n) Max concurrent OnData calls (global semaphore; 0 = unlimited) unlimited
WorkerIdleTimeout(d) Idle worker goroutine lifetime before reaping 10s
MaxConns(n) Global max connections (0 = unlimited) unlimited
PerIP(n) Max connections per IP (also enables TopNConns hotspot data) unlimited
Backlog(n) Listen backlog queue depth 128
Reuse() SO_REUSEPORT (Linux 3.9+) false
Loops(n) Reactor sub-loop count (0 = GOMAXPROCS); useful for prefork to avoid CPU over-subscription GOMAXPROCS
WithAffinityKey(fn) Affinity routing: same-IP connections pinned to one eventLoop round-robin
Quota(n) Max frames per eventLoop iteration per connection (requires WithCodec) unlimited

Timeouts

Option Description Default
Idle(d) Idle connection timeout (use with NoIdle() to disable) 75s
NoIdle() Explicitly disable idle timeout (long-lived connections)
KeepAlive(d) TCP keepalive interval (0 = OS default) OS default
WriteTimeout(d) Write timeout; closes connection on expiry; 0 = disabled disabled
FirstReadTimeout(d) First-read timeout (Slowloris mitigation); takes min(d, IdleTimeout); all three backends supported 30s
TLSHandshakeTimeout(d) TLS handshake timeout; takes min(d, IdleTimeout) 10s
DrainTimeout(d) Stop(ctx) drain timeout; takes min(d, ctx.Deadline) 5s
IdleScanInterval(d) Reactor idle scan interval (IdleTimeout/SetDeadline/WriteTimeout detection granularity) 100ms

Buffers

Option Description Default
ReadBuf(n) Read buffer size (bytes) 8192
MaxWrite(n) Per-connection write buffer cap (exceeding 256 MiB causes Run() error) 8 MB
WithMaxReadBuf(n) Per-connection read buffer accumulation cap for codec pipelines (prevents slow-consumer OOM; exceeding 256 MiB causes Run() error) 8 MB
SndBuf(n) SO_SNDBUF (exceeding 128 MiB causes Run() error) OS default
RcvBuf(n) SO_RCVBUF (exceeding 128 MiB causes Run() error) OS default
MaxTLSBridgeBuf(n) Reactor TLS bridge read buffer cap (prevents OOM during handler blocking) 8 MB

Rate Limiting

Option Description Default
PerMsg(n) Max messages/sec per connection; exceeds close with ErrTooManyMsgs unlimited
PerIPMsg(n) Max messages/sec aggregated per IP; also enables TopNMsgRates hotspot data unlimited
PerIPRateTTL(d) Per-IP rate limiter entry TTL (effective when PerIPMsg is enabled) 5min

TLS

Option Description Default
TLS(cfg) Enable TLS (auto-enforces minimum TLS 1.2)
TLS13Only() Enforce TLS 1.3
Hello(fn) ClientHello callback
TLSSessionCache(c) Inject session ticket cache
KernelTLS() kTLS offload (Linux 4.13+)
TLSHandshakeConcurrency(n) Concurrent handshake limit (0 = auto) auto
TLSHandshakeTimeout(d) Handshake timeout 10s
MaxTLSBridgeBuf(n) Bridge buffer cap 8 MB

Reactor-specific

Option Description Default
ConnTableFastSize(n) Connection table fast-index size; fd < n → O(1) atomic array, else sharded map fallback; align with ulimit -n for high concurrency 65536
WithEventChannelSize(n) Reactor internal event channel depth (wakeupRead / pendingCodec) 2048
StdNet() / Reactor() Force stdnet / reactor backend auto

io_uring Backend

Option Description Default
Uring(opts ...UringOpts) Enable io_uring backend (Linux 5.6+, auto-fallback to reactor if unsupported) disabled
ZeroCopy(cfg ...BufPoolCfg) Enable kernel-managed receive buffer pool (PROVIDE_BUFFERS + BUFFER_SELECT recv); single-ring and multi-ring (per-worker pool). Uses single-shot recv to prevent buffer pool exhaustion under high concurrency. disabled

UringOpts fields:

Field Description Default
Entries SQ/CQ ring size (power of 2) 2048
NumRings Worker ring count; 0 = auto (GOMAXPROCS/3, capped 2–8) 0
SQPoll Enable SQPOLL kernel thread (requires CAP_SYS_NICE) false
SQThreadIdle SQPOLL idle timeout 1s
FixedBuffers Register fixed buffers for read I/O 0
DirectFd Register listen fd to fixed file table false
SendZC Enable SEND_ZC zero-copy send (kernel 6.0+) false
SendZCThreshold Minimum bytes to trigger SEND_ZC (avoids overhead on small writes) 4096
eng := yakio.TCP(":7000", handler,
    yakio.Uring(yakio.UringOpts{NumRings: 4, SendZC: true}),
)

Runtime probe:

probe := yakio.ProbeUring()
fmt.Println(probe.Supported, probe.SendZC)

**High-concurrency tuning (ulimit -n > 65536):

yakio.TCP(":7000", handler,
    yakio.MaxConns(200_000),
    yakio.ConnTableFastSize(262144), // align with ulimit -n, eliminate sync.Map fallback
)

Worker Pool

Option Description Default
WithSubmitWorkers(n) Submit pool worker count GOMAXPROCS×4
WithSubmitQueueSize(n) Queue cap; Submit returns false and increments Metrics.DroppedTasks when full unlimited

Hooks & Observability

Option Description
ConnOpen(fn) Global connection-open hook (fires after OnOpen succeeds)
ConnClose(fn) Global connection-close hook (fires after OnClose completes)
ConnChange(open, close) Shorthand for ConnOpen + ConnClose
Panic(fn) Handler panic callback (default: recover + close connection)
WithLogger(l) Inject *yaklog.Logger (Info: lifecycle events; Warn: write timeout/panic; Debug: diagnostics)
WithAdaptiveGC(fn) Adaptive GC: periodically samples MetricsSnapshot, returns new GOGC percentage (-1 = skip)
AdaptiveGCInterval(d) WithAdaptiveGC sampling interval (default 5s)

Other

Option Description
WithCodec(c) Set frame codec (nil = raw bytes)
WithListenerFd(fd) Inherit listener fd (graceful restart; -1 = no inherit)

Client-only

Option Description Default
DialTimeout(d) Dial timeout OS default
WithReconnect(maxRetry, baseDelay) Auto-reconnect; -1 = infinite, 0 = disabled disabled
ReconnectMaxDelay(d) Reconnect backoff cap 30s

Metrics

s := eng.Metrics().Snapshot()
fmt.Println(s.TotalConns, s.ActiveConns, s.BytesRead, s.Uptime)
Field Precision Description
TotalConns Exact Total accepted connections
ActiveConns Exact Current active connections
ClosedConns Exact Total closed connections
RejectedConns Exact Connections rejected by throttling
BytesRead / BytesWritten Approximate Total bytes read/written (percpu.Counter, efficient multi-core writes)
MessagesIn / MessagesOut Approximate Total messages received/sent
ReadErrors / WriteErrors Exact Read/write error count
PanicCount Exact Handler panic count
RateLimitedConns Exact Connections closed by rate limiting
DroppedTasks Exact Tasks dropped by Submit when queue is full
WriteTimeoutConns Exact Connections closed by WriteTimeout
FirstReadTimeoutConns Exact Connections closed by FirstReadTimeout (Slowloris defense)
Uptime Engine uptime

Exact vs Approximate: connection/error counts use atomic.Int64 (exact, suitable for throttling decisions); byte/message counts use percpu.Counter (distributed writes, approximate reads, sufficient for monitoring).

Hotspot Diagnosis

// Top-N IPs by connection count (requires PerIP(n) to be enabled)
for _, e := range eng.TopNConns(5) {
    fmt.Printf("ip=%-20s conns=%d\n", e.IP, e.Conns)
}

// Top-N IPs by message volume (requires PerIPMsg(n) to be enabled)
for _, e := range eng.TopNMsgRates(5) {
    fmt.Printf("ip=%-20s hits=%d\n", e.IP, e.Hits)
}

Protection Mechanisms

Message Rate Limiting (PerMsg / PerIPMsg)

eng := yakio.TCP(":8080", handler,
    yakio.WithCodec(codec.LengthField(4)), // recommended with Codec (rate counts frames)
    yakio.PerMsg(500),                     // max 500 frames/sec per connection
    yakio.PerIPMsg(2000),                  // max 2000 frames/sec aggregated per IP
)

Two independent layers; either exceeding limit closes the connection with ErrTooManyMsgs and increments Metrics.RateLimitedConns.

Without Codec: rate counts read() calls; TCP coalescing may undercount — use with Codec. PerIPMsg shares a count window across all connections from the same IP; combined with PerMsg it defends against both single-connection flooding and multi-connection aggregate attacks.

Write Timeout Protection (WriteTimeout)

eng := yakio.TCP(":8080", handler,
    yakio.WriteTimeout(10 * time.Second),
    yakio.MaxWrite(4 << 20), // write buffer cap 4 MB
)
Backend Behavior
stdnet Sets system-level write deadline before each Write
reactor/TLS Refreshes write deadline before each tlsc.Write
reactor/non-TLS Timer starts on first outBuf accumulation; idle scan every IdleScanInterval; closes on timeout

Timeout triggers OnClose(c, ErrWriteTimeout) and increments Metrics.WriteTimeoutConns.

Slowloris Defense (FirstReadTimeout)

FirstReadTimeout(d) (default 30s) limits the maximum wait time after connection establishment before the first read. This prevents attackers from holding connections open indefinitely without sending data. At runtime, the effective value is min(FirstReadTimeout, IdleTimeout). All three backends (reactor / io_uring / stdnet) enforce this timeout.


Advanced Capabilities

Backpressure Control

func (h *myHandler) OnData(c yakio.Conn, data []byte) error {
    c.PauseRead() // stop receiving new data from this connection
    eng.Submit(func() {
        defer c.ResumeRead()
        c.Write(heavyCompute(data))
    })
    return nil
}

Reactor: uses the wakeupRead channel to re-arm the read event (solves EPOLLET re-arm); stdnet: wakes the blocked read goroutine via sync.Cond.Broadcast.

Scatter-Gather Writes

if vw, ok := conn.(yakio.VectorWriter); ok {
    vw.WriteMany(header, body) // single writev(2), no intermediate copying
} else {
    conn.Write(append(header, body...))
}

Reactor backend delegates to writev(2); stdnet writes sequentially.

Pipeline Fair Scheduling

yakio.TCP(addr, handler,
    yakio.WithCodec(respCodec),
    yakio.Quota(200),                  // max 200 frames per eventLoop iteration
    yakio.WithEventChannelSize(2048),  // increase for ultra-high-concurrency (default 2048)
)

Excess frames are pushed to the pendingCodec channel for the next iteration, preventing pipeline clients from monopolizing the eventLoop.

Connection Affinity Routing

yakio.WithAffinityKey(func(remoteAddr string) uint64 {
    host, _, _ := net.SplitHostPort(remoteAddr)
    return fnv1a64(host) // same IP always lands on the same eventLoop → lock-free per-IP state
})

Default uses round-robin.

Batch Flush Hook

// Reactor: called once per connection after each batch of read events (application-layer Nagle alternative)
func (h *myHandler) OnDataEnd(c yakio.Conn) { bw.Flush() }

Graceful Shutdown Hook

// Called per active connection during Stop(ctx); send protocol-level goodbye before returning
func (h *myHandler) OnShutdown(c yakio.Conn) { c.Write(serverGoingDownMsg) }

Per-Connection Deadline

func (h *myHandler) OnOpen(c yakio.Conn) error {
    c.SetDeadline(time.Now().Add(30 * time.Second)) // must authenticate within 30s
    return nil
}

func (h *myHandler) OnData(c yakio.Conn, data []byte) error {
    if isAuth(data) { c.SetDeadline(time.Time{}) } // auth success, clear deadline
    return nil
}
// OnClose receives ErrDeadlineExceeded on timeout

Orthogonal to global IdleTimeout — suitable for authentication timeouts, session-level deadlines, etc.

Adaptive GC Tuning

yakio.WithAdaptiveGC(func(m yakio.MetricsSnapshot) int {
    switch {
    case m.ActiveConns > 10_000: return 400 // high load: reduce GC frequency → better throughput
    case m.ActiveConns > 1_000:  return 200
    default:                     return 100 // idle: restore default → save memory
    }
}),
yakio.AdaptiveGCInterval(10 * time.Second),

Returning -1 skips the current round (keeps current GOGC). The tuner goroutine starts with eng.Run() and exits after Stop() — no goroutine leaks.

Graceful Restart

// Parent process: get fd and pass it to child (via env var or ExtraFiles)
fd, _ := eng.ListenerFd()
cmd.ExtraFiles = []*os.File{os.NewFile(uintptr(fd), "listener")}
cmd.Start()
eng.Stop(ctx)

// Child process: inherit fd, skip bind+listen, zero-downtime switchover
eng := yakio.TCP(addr, handler, yakio.WithListenerFd(fd))
eng.Run()

Debugging & Logging

YAKIO_DEBUG=1 ./your-server   # enable internal diagnostic logs (stderr)
yakio.EnableDebug()  // toggle on dynamically
yakio.DisableDebug() // toggle off dynamically

Structured logging (yaklog):

import "github.com/uniyakcom/yaklog"

l := yaklog.New(yaklog.Options{
    Out:   yaklog.Console(os.Stdout),
    Level: yaklog.Info,
}).Label("component", "yakio")

eng := yakio.TCP(":8080", handler, yakio.WithLogger(l))
Level Content
Info Engine start, listen address, normal shutdown
Warn Write timeout close, panic recovery, kTLS fallback
Error Severe runtime errors
Debug Internal diagnostics (when YAKIO_DEBUG=1)

Testing

Makefile shortcuts:

Target Command Description
make / make all vet + test Default: vet then test
make test go test ./... Full test suite
make short go test -short ./... Skip slow tests (fast smoke)
make race CGO_ENABLED=1 go test -race ./... Race detection (required in CI)
make race3 race × 3 rounds CI stability validation
make cover race + HTML report Coverage report (opens browser)
make vet go vet ./... Static analysis
make lint [ARGS=--fix] ./lint.sh gofmt + vet + golangci-lint + gocyclo (includes cyclomatic complexity at threshold 20; supports --fix, --fmt, --vet)
make cyclo [CYCLO=15] gocyclo Standalone cyclomatic complexity scan with custom threshold (CYCLO=15); known event-loop core paths are exempt
make bench [ARGS=-g|ARGS=-u] ./bench.sh Benchmarks: record results; -g guard/regression check (auto-generate baseline if missing); -u update baseline; env: BENCHTIME, BENCH_COUNT, BENCH_THRESHOLD
make fuzz [TARGET=FuzzFoo TIME=2m] ./fuzz.sh Auto-discover all Fuzz targets, crash logs saved
make clean go clean -testcache Clear test cache

Raw commands:

go test ./...                               # full test suite
CGO_ENABLED=1 go test -race ./...           # race detection (required in CI)
go test -short ./...                        # skip slow tests
go test -run TestTLS -v ./...               # filter by name with verbose output

# Coverage
CGO_ENABLED=1 go test -race -coverprofile=coverage.out ./...
go tool cover -html=coverage.out

Fuzz testing:

# Seed regression (CI: run all Fuzz targets once)
go test -run '^Fuzz' .

# Continuous fuzzing (single target, local long runs; Go tooling limitation: one target at a time)
go test -fuzz=FuzzLengthFieldDecode -fuzztime=5m .

# Batch: auto-discover all targets, Ctrl+C interrupts current and continues to next
./fuzz.sh               # all targets, 5m each
./fuzz.sh FuzzLineDecode 2m  # single target, custom duration

Performance benchmarks:

go test -bench='^BenchmarkTCP_Echo' -benchmem -count=3 .
go test -bench=. -benchmem ./codec/...

./bench.sh        # record results → bench_<os>_<Nc><Mt>.txt
./bench.sh -u     # update baseline (bench_<os>_<Nc><Mt>_baseline.txt)
./bench.sh -g     # guard: compare vs baseline, exit 1 on regression
# BENCHTIME=5s BENCH_COUNT=5 BENCH_THRESHOLD=15 ./bench.sh -g   # custom params

Benchmarks

The _benchmarks/ directory contains comparative benchmarks against gnet v2 and standard library goroutine-per-conn:

cd _benchmarks && ./run.sh   # all benchmarks, 5s/item × 3 rounds

io_uring vs Reactor (epoll) — yakhttp plaintext HTTP, wrk -t4 -c100 -d10s, HP Z2 G4 (Xeon E-2176G):

Backend RPS
Reactor (epoll) ~237,000
io_uring (4 rings + BUFFER_SELECT recv) ~244,000 (+3%)
Prefork 4×uring (4 engines × 2 rings) ~305,000
TLS HTTPS — wrk -t4 -c100 -d10s, Xeon E-2186G 12C:
Backend HTTPS RPS
Reactor (epoll) ~155,000 RPS
io_uring (4 rings) ~155,000 RPS

TLS 场景下传输层差异被 Go crypto/tls 开销主导,两种后端吞吐基本持平。 Root-level regression guard (CI):

make bench ARGS=-u   # update baseline after intentional changes
make bench ARGS=-g   # guard: exit 1 if any benchmark regresses > 10%

See _benchmarks/README.md for details.


Examples

# Multiplayer chat room (NDJSON, nc-compatible)
go run ./_examples/chat
# After connecting, send: {"name":"alice","text":"hello"}

# Protobuf echo
go run ./_examples/proto-echo
Directory Description
_examples/chat/ Chat room: Conn.ID() + sync.Map + Engine.Range broadcast + codec.NDJSON
_examples/proto-echo/ Protobuf echo: codec/proto + Engine.Get O(1) targeted write

Error Reference

All errors implement errors.Is chaining (yakutil.ErrStr) — distinguish them precisely in OnClose callbacks.

Main Package (yakio)

Constant Description
ErrClosed Connection or engine already closed
ErrStarted Engine already started; Run cannot be called again
ErrOverflow Write buffer full or data exceeds limit
ErrNotSupported Feature not supported on current platform or configuration
ErrTimeout Operation timed out
ErrConnReset Connection reset by peer
ErrTooManyConns Exceeded global or per-IP max connections
ErrTooManyMsgs Message rate exceeded (PerMsg / PerIPMsg)
ErrDeadlineExceeded SetDeadline expired
ErrWriteTimeout WriteTimeout exceeded
ErrTLSHandshakeTimeout TLS handshake timed out

Codec Package (codec)

Constant Description Fatal?
codec.ErrIncomplete Insufficient data; continue buffering No
codec.ErrOverflow Frame exceeds max length Yes (close connection)
codec.ErrInvalidProtocol Invalid protocol format Yes (close connection)

Positioning & Dependencies

yakio is a pure TCP layer framework — no HTTP, WebSocket, UDP, or QUIC. Design goals: small footprint, zero unnecessary dependencies, embeddable as a low-level engine for higher-layer frameworks.

Main module runtime dependencies:

Library Purpose
github.com/uniyakcom/yakutil bufpool, ring, wpool, ErrStr
github.com/uniyakcom/yakjson codec.JSON[T]
github.com/uniyakcom/yaklog WithLogger structured logging (optional injection)

codec/proto sub-module (optional): depends on google.golang.org/protobuf; only imported via go get .../codec/proto — main module has zero external dependencies.


License

See LICENSE.

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors