From 40ef785aac1d8e51adcb7fc0f8fe722e8529830a Mon Sep 17 00:00:00 2001 From: xcsoft Date: Sat, 1 Nov 2025 02:05:23 +0800 Subject: [PATCH] =?UTF-8?q?docs:=20=E8=AF=B4=E6=98=8E=20Redis=20=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E6=98=A0=E5=B0=84=E8=87=B3=20SQLite=20=E7=9A=84?= =?UTF-8?q?=E6=96=B9=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 30 ++ README.zh_CN.md | 28 ++ config.yaml | 5 + main.go | 5 + process/persist/persist.go | 688 +++++++++++++++++++++++++++++++++++++ 5 files changed, 756 insertions(+) create mode 100644 process/persist/persist.go diff --git a/README.md b/README.md index 7011977..b6aa310 100644 --- a/README.md +++ b/README.md @@ -40,6 +40,36 @@ UV and PV data are stored in the following keys: | pagePv | ZSet | bsz:page_pv:md5(host) / md5(path) | | pageUv | HyperLogLog | bsz:site_uv:md5(host):md5(path) | +## SQLite persistence + +Busuanzi can periodically snapshot Redis data into a SQLite database in order to keep +long term history or recover from Redis failures. The following configuration options +control the behaviour (all values can also be provided via environment variables): + +| key | type | default | description | +| --- | --- | --- | --- | +| `Persistence.Enable` | bool | `false` | Enable background SQLite snapshots. | +| `Persistence.DBPath` | string | `data/busuanzi.db` | Path to the SQLite database file. | +| `Persistence.Interval` | number | `300` | Snapshot interval in seconds. | +| `Persistence.RestoreOnStart` | bool | `false` | Restore the most recent snapshot during startup. | + +Snapshots include PV, UV and HyperLogLog states. Restoring a snapshot replaces the +corresponding Redis keys using the stored values and TTL information. + +### Snapshot payload mapping + +Each Redis data type is normalised into the SQLite `snapshots` table as follows: + +| Redis key | Redis type | SQLite fields | Notes | +| --- | --- | --- | --- | +| `site_pv` | String counter | `count` | The plain integer PV value. | +| `site_uv` | HyperLogLog | `count`, `payload` | `payload` stores the raw bytes returned by `DUMP`, encoded as hexadecimal. | +| `page_pv` | ZSet | `count`, `payload` | `payload` is a JSON array of `{path_unique,count}` pairs so that the ranking can be restored. | +| `page_uv` | HyperLogLog | `count`, `payload` | Same as `site_uv`. | + +The `ttl_ms` column stores the remaining key lifetime in milliseconds (or `0` for keys without an expiration) so that the Redis +TTL semantics are preserved when restoring a snapshot. + ## Data Migration - You can use the [busuanzi-sync](https://github.com/soxft/busuanzi-sync) tool to sync data from the [original busuanzi](http://busuanzi.ibruce.info) to the self-hosted busuanzi. diff --git a/README.zh_CN.md b/README.zh_CN.md index f80d6da..0b3e61e 100644 --- a/README.zh_CN.md +++ b/README.zh_CN.md @@ -32,6 +32,34 @@ | pagePv | ZSet | bsz:page_pv:md5(host) / md5(path) | | pageUv | HyperLogLog | bsz:site_uv:md5(host):md5(path) | +## SQLite 持久化 + +Busuanzi 可以按照指定的时间间隔,将 Redis 中的 PV、UV 与 HyperLogLog 数据自动快照到 +SQLite 数据库文件中,便于长期保存与快速恢复。所有配置项均支持使用环境变量覆盖: + +| 配置项 | 类型 | 默认值 | 说明 | +| --- | --- | --- | --- | +| `Persistence.Enable` | bool | `false` | 是否开启后台快照任务 | +| `Persistence.DBPath` | string | `data/busuanzi.db` | SQLite 数据库文件路径 | +| `Persistence.Interval` | number | `300` | 快照间隔(秒) | +| `Persistence.RestoreOnStart` | bool | `false` | 启动时是否自动恢复最近一次快照 | + +快照会保存每个统计键当前的数值、序列化的 HyperLogLog 数据以及剩余 TTL。恢复快照时 +会用存储的数据覆盖 Redis 中对应的键。 + +### 快照数据映射 + +不同的 Redis 数据类型会按下表映射到 SQLite `snapshots` 表中: + +| Redis 键 | Redis 类型 | SQLite 字段 | 说明 | +| --- | --- | --- | --- | +| `site_pv` | 字符串计数器 | `count` | 直接保存站点 PV 的整数值。 | +| `site_uv` | HyperLogLog | `count`, `payload` | `payload` 中保存 `DUMP` 返回的原始二进制,并以十六进制编码。 | +| `page_pv` | ZSet | `count`, `payload` | `payload` 是 `{path_unique,count}` 组成的 JSON 数组,用于恢复排名明细。 | +| `page_uv` | HyperLogLog | `count`, `payload` | 与 `site_uv` 相同的序列化方式。 | + +`ttl_ms` 字段会记录剩余的过期时间(毫秒,永不过期时为 `0`),以便在恢复时继续保持 Redis 的 TTL 语义。 + ## 其他 diff --git a/config.yaml b/config.yaml index cb5fade..40a8153 100644 --- a/config.yaml +++ b/config.yaml @@ -13,6 +13,11 @@ Redis: MaxActive: 100 # 最大连接数 MinIdle: 25 # 最小空闲连接数 MaxRetries: 3 # 最大重试次数 +Persistence: + Enable: false # 是否开启持久化 + DBPath: data/busuanzi.db # SQLite 数据库存储路径 + Interval: 300 # 自动持久化间隔(秒) + RestoreOnStart: false # 启动时是否自动从最近的快照恢复数据 Bsz: Expire: 0 # 统计数据过期时间 单位秒, 请输入整数 (无任何访问, 超过这个时间后, 统计数据将被清空, 0为不过期) Secret: "bsz" # JWT签名密钥 // 请设置为任意长度的随机值 diff --git a/main.go b/main.go index f9daf2c..05b014d 100644 --- a/main.go +++ b/main.go @@ -1,8 +1,11 @@ package main import ( + "context" + "github.com/soxft/busuanzi/config" "github.com/soxft/busuanzi/core" + "github.com/soxft/busuanzi/process/persist" "github.com/soxft/busuanzi/process/redisutil" "github.com/soxft/busuanzi/process/webutil" ) @@ -13,5 +16,7 @@ func main() { core.InitExpire() + persist.Init(context.Background()) + webutil.Init() } diff --git a/process/persist/persist.go b/process/persist/persist.go new file mode 100644 index 0000000..a17fe44 --- /dev/null +++ b/process/persist/persist.go @@ -0,0 +1,688 @@ +package persist + +import ( + "bytes" + "context" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "log" + "os" + "os/exec" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/redis/go-redis/v9" + "github.com/soxft/busuanzi/process/redisutil" + "github.com/spf13/viper" +) + +const ( + categorySitePV = "site_pv" + categorySiteUV = "site_uv" + categoryPagePV = "page_pv" + categoryPageUV = "page_uv" + payloadCounter = "counter" + payloadHyperLog = "hyperloglog" + payloadZSet = "sorted_set" +) + +var ( + store *sqliteStore + storeOnce sync.Once +) + +type sqliteStore struct { + path string + mu sync.Mutex +} + +func newStore(path string) (*sqliteStore, error) { + if err := ensureDir(path); err != nil { + return nil, err + } + + s := &sqliteStore{path: path} + if err := s.exec("PRAGMA journal_mode=WAL;", "PRAGMA foreign_keys = ON;"); err != nil { + return nil, err + } + + stmts := []string{ + `CREATE TABLE IF NOT EXISTS snapshots ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + key TEXT NOT NULL, + category TEXT NOT NULL, + site_unique TEXT NOT NULL, + path_unique TEXT, + count INTEGER NOT NULL, + payload TEXT, + payload_type TEXT NOT NULL, + ttl_ms INTEGER NOT NULL, + checksum TEXT NOT NULL, + created_at INTEGER NOT NULL + );`, + `CREATE TABLE IF NOT EXISTS snapshot_cursors ( + key TEXT PRIMARY KEY, + checksum TEXT NOT NULL, + count INTEGER NOT NULL, + payload_type TEXT NOT NULL, + updated_at INTEGER NOT NULL + );`, + `CREATE INDEX IF NOT EXISTS idx_snapshots_key_created ON snapshots(key, created_at);`, + } + + if err := s.exec(stmts...); err != nil { + return nil, err + } + + return s, nil +} + +func (s *sqliteStore) exec(statements ...string) error { + var script strings.Builder + for _, stmt := range statements { + trimmed := strings.TrimSpace(stmt) + if trimmed == "" { + continue + } + script.WriteString(trimmed) + if !strings.HasSuffix(trimmed, ";") { + script.WriteString(";") + } + script.WriteByte('\n') + } + + if script.Len() == 0 { + return nil + } + + s.mu.Lock() + defer s.mu.Unlock() + + cmd := exec.Command("sqlite3", s.path) + cmd.Stdin = strings.NewReader(script.String()) + var stderr bytes.Buffer + cmd.Stderr = &stderr + if err := cmd.Run(); err != nil { + return fmt.Errorf("sqlite exec: %w: %s", err, strings.TrimSpace(stderr.String())) + } + return nil +} + +func (s *sqliteStore) queryJSON(query string) ([]byte, error) { + s.mu.Lock() + defer s.mu.Unlock() + + cmd := exec.Command("sqlite3", "-json", s.path, query) + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + if err := cmd.Run(); err != nil { + return nil, fmt.Errorf("sqlite query: %w: %s", err, strings.TrimSpace(stderr.String())) + } + return stdout.Bytes(), nil +} + +func ensureDir(path string) error { + dir := filepath.Dir(path) + if dir == "." || dir == "" { + return nil + } + return os.MkdirAll(dir, 0o755) +} + +// Init configures the persistence layer and starts the background snapshot routine. +func Init(ctx context.Context) { + if !viper.GetBool("persistence.enable") { + return + } + + storeOnce.Do(func() { + dbPath := viper.GetString("persistence.dbPath") + if dbPath == "" { + log.Printf("[ERROR] persistence.dbPath is empty") + return + } + + s, err := newStore(dbPath) + if err != nil { + log.Printf("[ERROR] persistence init failed: %v", err) + return + } + + store = s + log.Printf("[INFO] persistence sqlite initialised at %s", dbPath) + + if viper.GetBool("persistence.restoreOnStart") { + if err := RestoreLatest(ctx); err != nil { + log.Printf("[WARN] persistence restore failed: %v", err) + } + } + + go runScheduler(ctx) + }) +} + +func runScheduler(ctx context.Context) { + intervalSeconds := viper.GetInt("persistence.interval") + if intervalSeconds <= 0 { + intervalSeconds = 300 + } + + ticker := time.NewTicker(time.Duration(intervalSeconds) * time.Second) + defer ticker.Stop() + + log.Printf("[INFO] persistence scheduler started, interval=%ds", intervalSeconds) + for { + select { + case <-ctx.Done(): + log.Printf("[INFO] persistence scheduler stopped") + return + case <-ticker.C: + if err := PersistOnce(ctx); err != nil { + log.Printf("[WARN] persistence snapshot failed: %v", err) + } + } + } +} + +// snapshotRecord 对 Redis 中的多种数据结构做统一抽象: +// - 字符串计数器(`site_pv`) 直接写入 Count 字段; +// - HyperLogLog(`site_uv`、`page_uv`) 通过 DUMP 的二进制结果写入 Payload; +// - 排行类 ZSet(`page_pv`) 会将成员明细转成 JSON 后保存在 Payload。 +// +// Payload 会在写入 SQLite 时转换为十六进制文本,恢复时再还原为原始字节序列。 +type snapshotRecord struct { + Key string + Category string + SiteUnique string + PathUnique string + HasPath bool + Count int64 + Payload []byte + PayloadType string + TTL int64 + Checksum string + CreatedAt time.Time +} + +type snapshotCursor struct { + Checksum string +} + +// PersistOnce collects Redis data and writes incremental snapshots to SQLite. +func PersistOnce(ctx context.Context) error { + if store == nil { + return errors.New("persistence store not initialised") + } + + client := redisutil.RDB + if client == nil { + return errors.New("redis client not initialised") + } + + cursorMap, err := loadCursorMap(ctx) + if err != nil { + return err + } + + records, err := collectSnapshots(ctx, client) + if err != nil { + return err + } + + if len(records) == 0 { + return nil + } + + var script []string + script = append(script, "BEGIN") + inserted := 0 + + for _, rec := range records { + if cur, ok := cursorMap[rec.Key]; ok && cur.Checksum == rec.Checksum { + continue + } + + script = append(script, buildInsertSnapshotSQL(rec)) + script = append(script, buildUpsertCursorSQL(rec)) + cursorMap[rec.Key] = snapshotCursor{Checksum: rec.Checksum} + inserted++ + } + + if inserted == 0 { + return nil + } + + script = append(script, "COMMIT") + + if err := store.exec(script...); err != nil { + return err + } + + log.Printf("[INFO] persistence stored %d snapshot(s)", inserted) + return nil +} + +func loadCursorMap(ctx context.Context) (map[string]snapshotCursor, error) { + result := make(map[string]snapshotCursor) + if store == nil { + return result, nil + } + + data, err := store.queryJSON("SELECT key, checksum FROM snapshot_cursors") + if err != nil { + return nil, err + } + if len(bytes.TrimSpace(data)) == 0 { + return result, nil + } + + var rows []struct { + Key string `json:"key"` + Checksum string `json:"checksum"` + } + if err := json.Unmarshal(data, &rows); err != nil { + return nil, err + } + for _, row := range rows { + result[row.Key] = snapshotCursor{Checksum: row.Checksum} + } + return result, nil +} + +func buildInsertSnapshotSQL(rec snapshotRecord) string { + payloadExpr := "NULL" + if len(rec.Payload) > 0 { + payloadExpr = fmt.Sprintf("'%s'", escapeSQL(hex.EncodeToString(rec.Payload))) + } + + pathExpr := "NULL" + if rec.HasPath { + pathExpr = fmt.Sprintf("'%s'", escapeSQL(rec.PathUnique)) + } + + return fmt.Sprintf( + "INSERT INTO snapshots (key, category, site_unique, path_unique, count, payload, payload_type, ttl_ms, checksum, created_at) VALUES ('%s','%s','%s',%s,%d,%s,'%s',%d,'%s',%d)", + escapeSQL(rec.Key), + escapeSQL(rec.Category), + escapeSQL(rec.SiteUnique), + pathExpr, + rec.Count, + payloadExpr, + escapeSQL(rec.PayloadType), + rec.TTL, + escapeSQL(rec.Checksum), + rec.CreatedAt.Unix(), + ) +} + +func buildUpsertCursorSQL(rec snapshotRecord) string { + return fmt.Sprintf( + "INSERT INTO snapshot_cursors (key, checksum, count, payload_type, updated_at) VALUES ('%s','%s',%d,'%s',%d) ON CONFLICT(key) DO UPDATE SET checksum=excluded.checksum, count=excluded.count, payload_type=excluded.payload_type, updated_at=excluded.updated_at", + escapeSQL(rec.Key), + escapeSQL(rec.Checksum), + rec.Count, + escapeSQL(rec.PayloadType), + rec.CreatedAt.Unix(), + ) +} + +func escapeSQL(value string) string { + return strings.ReplaceAll(value, "'", "''") +} + +func collectSnapshots(ctx context.Context, client *redis.Client) ([]snapshotRecord, error) { + prefix := viper.GetString("redis.prefix") + var records []snapshotRecord + + collectors := []struct { + pattern string + handler func(context.Context, *redis.Client, string) (*snapshotRecord, error) + }{ + {fmt.Sprintf("%s:%s:*", prefix, categorySitePV), buildSitePVRecord}, + {fmt.Sprintf("%s:%s:*", prefix, categorySiteUV), buildSiteUVRecord}, + {fmt.Sprintf("%s:%s:*", prefix, categoryPagePV), buildPagePVRecord}, + {fmt.Sprintf("%s:%s:*", prefix, categoryPageUV), buildPageUVRecord}, + } + + for _, collector := range collectors { + if err := scanKeys(ctx, client, collector.pattern, func(key string) error { + rec, err := collector.handler(ctx, client, key) + if err != nil { + if errors.Is(err, redis.Nil) { + return nil + } + log.Printf("[WARN] persistence skip key %s: %v", key, err) + return nil + } + if rec != nil { + records = append(records, *rec) + } + return nil + }); err != nil { + return nil, err + } + } + + return records, nil +} + +func scanKeys(ctx context.Context, client *redis.Client, pattern string, fn func(string) error) error { + var cursor uint64 + for { + keys, nextCursor, err := client.Scan(ctx, cursor, pattern, 500).Result() + if err != nil { + if errors.Is(err, redis.Nil) { + return nil + } + return err + } + + for _, key := range keys { + if err := fn(key); err != nil { + return err + } + } + + cursor = nextCursor + if cursor == 0 { + break + } + } + return nil +} + +func buildSitePVRecord(ctx context.Context, client *redis.Client, key string) (*snapshotRecord, error) { + value, err := client.Get(ctx, key).Int64() + if err != nil { + return nil, err + } + + ttl, err := client.PTTL(ctx, key).Result() + if err != nil { + return nil, err + } + + return &snapshotRecord{ + Key: key, + Category: categorySitePV, + SiteUnique: parseTailSegment(key), + Count: value, + PayloadType: payloadCounter, + TTL: ttlToMillis(ttl), + Checksum: fmt.Sprintf("%d", value), + CreatedAt: time.Now().UTC(), + }, nil +} + +func buildSiteUVRecord(ctx context.Context, client *redis.Client, key string) (*snapshotRecord, error) { + count, err := client.PFCount(ctx, key).Result() + if err != nil { + return nil, err + } + + raw, err := client.Dump(ctx, key).Result() + if err != nil { + return nil, err + } + + ttl, err := client.PTTL(ctx, key).Result() + if err != nil { + return nil, err + } + + payload := []byte(raw) + return &snapshotRecord{ + Key: key, + Category: categorySiteUV, + SiteUnique: parseTailSegment(key), + Count: count, + Payload: payload, + PayloadType: payloadHyperLog, + TTL: ttlToMillis(ttl), + Checksum: sha256Hex(payload), + CreatedAt: time.Now().UTC(), + }, nil +} + +func buildPageUVRecord(ctx context.Context, client *redis.Client, key string) (*snapshotRecord, error) { + count, err := client.PFCount(ctx, key).Result() + if err != nil { + return nil, err + } + + raw, err := client.Dump(ctx, key).Result() + if err != nil { + return nil, err + } + + ttl, err := client.PTTL(ctx, key).Result() + if err != nil { + return nil, err + } + + siteUnique, pathUnique := parseSiteAndPath(key) + payload := []byte(raw) + return &snapshotRecord{ + Key: key, + Category: categoryPageUV, + SiteUnique: siteUnique, + PathUnique: pathUnique, + HasPath: pathUnique != "", + Count: count, + Payload: payload, + PayloadType: payloadHyperLog, + TTL: ttlToMillis(ttl), + Checksum: sha256Hex(payload), + CreatedAt: time.Now().UTC(), + }, nil +} + +type pagePVEntry struct { + Path string `json:"path_unique"` + PV int64 `json:"count"` +} + +func buildPagePVRecord(ctx context.Context, client *redis.Client, key string) (*snapshotRecord, error) { + zset, err := client.ZRangeWithScores(ctx, key, 0, -1).Result() + if err != nil { + return nil, err + } + + ttl, err := client.PTTL(ctx, key).Result() + if err != nil { + return nil, err + } + + entries := make([]pagePVEntry, 0, len(zset)) + var total int64 + for _, item := range zset { + member := fmt.Sprint(item.Member) + pv := int64(item.Score) + entries = append(entries, pagePVEntry{Path: member, PV: pv}) + total += pv + } + + payload, err := json.Marshal(entries) + if err != nil { + return nil, err + } + + return &snapshotRecord{ + Key: key, + Category: categoryPagePV, + SiteUnique: parseTailSegment(key), + Count: total, + Payload: payload, + PayloadType: payloadZSet, + TTL: ttlToMillis(ttl), + Checksum: sha256Hex(payload), + CreatedAt: time.Now().UTC(), + }, nil +} + +func ttlToMillis(ttl time.Duration) int64 { + return ttl.Milliseconds() +} + +func sha256Hex(data []byte) string { + h := sha256.Sum256(data) + return hex.EncodeToString(h[:]) +} + +func parseTailSegment(key string) string { + parts := strings.Split(key, ":") + if len(parts) == 0 { + return "" + } + return parts[len(parts)-1] +} + +func parseSiteAndPath(key string) (string, string) { + parts := strings.Split(key, ":") + if len(parts) < 4 { + if len(parts) >= 3 { + return parts[len(parts)-1], "" + } + return "", "" + } + site := parts[len(parts)-2] + path := parts[len(parts)-1] + return site, path +} + +// RestoreLatest loads the most recent snapshot for each key back into Redis. +func RestoreLatest(ctx context.Context) error { + if store == nil { + return errors.New("persistence store not initialised") + } + + client := redisutil.RDB + if client == nil { + return errors.New("redis client not initialised") + } + + query := `SELECT s.key, s.category, s.site_unique, s.path_unique, s.count, s.payload, s.payload_type, s.ttl_ms + FROM snapshots s + INNER JOIN ( + SELECT key, MAX(created_at) AS created_at + FROM snapshots + GROUP BY key + ) latest ON latest.key = s.key AND latest.created_at = s.created_at` + + data, err := store.queryJSON(query) + if err != nil { + return err + } + if len(bytes.TrimSpace(data)) == 0 { + return nil + } + + var rows []struct { + Key string `json:"key"` + Category string `json:"category"` + SiteUnique string `json:"site_unique"` + PathUnique *string `json:"path_unique"` + Count int64 `json:"count"` + Payload string `json:"payload"` + PayloadTyp string `json:"payload_type"` + TTL int64 `json:"ttl_ms"` + } + + if err := json.Unmarshal(data, &rows); err != nil { + return err + } + + restored := 0 + for _, row := range rows { + payloadBytes, err := decodeBlob(row.Payload) + if err != nil { + log.Printf("[WARN] persistence decode payload failed for key %s: %v", row.Key, err) + continue + } + + rec := snapshotRecord{ + Key: row.Key, + Category: row.Category, + SiteUnique: row.SiteUnique, + Count: row.Count, + Payload: payloadBytes, + PayloadType: row.PayloadTyp, + TTL: row.TTL, + } + if row.PathUnique != nil { + rec.PathUnique = *row.PathUnique + rec.HasPath = true + } + + if err := restoreRecord(ctx, client, rec); err != nil { + log.Printf("[WARN] persistence restore key %s failed: %v", rec.Key, err) + continue + } + restored++ + } + + if restored > 0 { + log.Printf("[INFO] persistence restored %d key(s) from sqlite", restored) + } + return nil +} + +func decodeBlob(value string) ([]byte, error) { + value = strings.TrimSpace(value) + if value == "" { + return nil, nil + } + return hex.DecodeString(value) +} + +func restoreRecord(ctx context.Context, client *redis.Client, rec snapshotRecord) error { + ttl := durationFromMillis(rec.TTL) + + switch rec.Category { + case categorySitePV: + return client.Set(ctx, rec.Key, rec.Count, ttl).Err() + case categoryPagePV: + if err := client.Del(ctx, rec.Key).Err(); err != nil { + return err + } + if len(rec.Payload) > 0 { + var entries []pagePVEntry + if err := json.Unmarshal(rec.Payload, &entries); err != nil { + return err + } + if len(entries) > 0 { + members := make([]redis.Z, 0, len(entries)) + for _, entry := range entries { + members = append(members, redis.Z{Member: entry.Path, Score: float64(entry.PV)}) + } + if err := client.ZAdd(ctx, rec.Key, members...).Err(); err != nil { + return err + } + } + } + if ttl > 0 { + return client.PExpire(ctx, rec.Key, ttl).Err() + } + return nil + case categorySiteUV, categoryPageUV: + if len(rec.Payload) == 0 { + return errors.New("empty payload for hyperloglog restore") + } + return client.RestoreReplace(ctx, rec.Key, ttl, rec.Payload).Err() + default: + return fmt.Errorf("unknown snapshot category: %s", rec.Category) + } +} + +func durationFromMillis(ms int64) time.Duration { + if ms <= 0 { + return 0 + } + return time.Duration(ms) * time.Millisecond +}