From e628b76c1659c0948e8bb70499ddf18f1ab499a4 Mon Sep 17 00:00:00 2001 From: Donkey Date: Sat, 1 Nov 2025 01:52:21 +1100 Subject: [PATCH] fix (docs): resolve changelog conflict markers --- .env.example | 1 + CHANGELOG.md | 10 +-- README.md | 8 ++- docs/index.html | 1 + install.sh | 2 + internal/config/env_test.go | 28 +++++++- internal/config/queue.go | 22 +++++++ main.go | 62 +++++++++++------- queue/manager.go | 127 ++++++++++++++++++++++++------------ queue/manager_test.go | 77 +++++++++++++++++++++- storage/files.go | 14 ++-- storage/files_test.go | 4 +- 12 files changed, 271 insertions(+), 85 deletions(-) create mode 100644 internal/config/queue.go diff --git a/.env.example b/.env.example index 6175a9a..cd79533 100644 --- a/.env.example +++ b/.env.example @@ -7,6 +7,7 @@ SMTP_HEALTH_ADDR=:8080 SMTP_HEALTH_PORT= SMTP_HEALTH_DISABLE=false SMTP_QUEUE_PATH=./data/spool +SMTP_QUEUE_WORKERS= # Access control SMTP_ALLOW_NETWORKS=127.0.0.1/32 diff --git a/CHANGELOG.md b/CHANGELOG.md index 5515399..846fa68 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,11 +29,11 @@ - Website: Add dedicated contact section with email and GitHub links. - Docs: Consolidate feature list in README (single "Features" section). - Website: Mirror unified feature list on the marketing page. -<<<<<<< HEAD -- Brand: Refresh GopherPost logo and favicon with updated gopher-and-envelope concept; refine site header hover styling. -======= -- Brand: Refresh GopherPost logo and favicon with updated gopher-and-envelope concept; add site styling for ringed logo hover state. ->>>>>>> origin/design/brand/logo-refresh +- SMTP: Send a 554 5.7.1 rejection prior to closing unauthorized sessions so operators see explicit failures when hosts are misconfigured. +- Storage: Roll back partially persisted messages when any recipient write fails to keep the spool aligned with the delivery queue. +- Queue: Introduce a configurable worker pool with idempotent shutdown semantics and tests that verify real concurrency instead of timing heuristics. +- Config: Document and surface the new `SMTP_QUEUE_WORKERS` environment variable across README, `.env.example`, install tooling, and the marketing site. +- Brand: Refresh GopherPost logo and favicon with updated gopher-and-envelope concept; refine site header hover styling and add ringed-logo hover treatment. ## v0.4.0 - Added subscription-based audit fan-out so `/healthz` can stream live debug logs when `SMTP_DEBUG=true`. diff --git a/README.md b/README.md index 967e05b..d8e5b8f 100644 --- a/README.md +++ b/README.md @@ -9,6 +9,7 @@ Current release: `v0.4.0` - Enforces allow-listed access by host/IP before any banner is sent, adding a coarse ingress control layer for the unauthenticated listener. - Persists accepted messages to disk with per-recipient hashing so stored artefacts are private yet available for later inspection or reprocessing. - Performs outbound delivery via MX resolution, randomised equal-priority retries, opportunistic STARTTLS, and jittered exponential backoff managed by the in-memory queue. +- Scales queue throughput with a configurable pool of concurrent delivery workers so busy deployments keep pace with inbound traffic. - Optionally signs outbound mail with DKIM when selector, key, and domain values are supplied through environment variables. - Provides built-in observability: a health server exposes readiness, `/metrics` instrumentation, and an optional live audit log stream; audit logging can be toggled at runtime and fanned out to subscribers. - Loads configuration entirely from environment variables (with `.env` support) covering ports, banner text, TLS/DKIM assets, and queue storage paths, simplifying containerised or systemd deployments. @@ -45,9 +46,10 @@ SMTP_HOSTNAME # Hostname advertised in SMTP banners and HELO/EHLO (default syste SMTP_BANNER # Custom greeting appended to the initial 220 response (default GopherPost ready). SMTP_DEBUG # Enable verbose audit logging when `true` (default `false`). SMTP_HEALTH_ADDR # Listen address for the health server (default :8080). -SMTP_HEALTH_PORT # Override only the port component of the health address (e.g. 9090). -SMTP_HEALTH_DISABLE # Disable the health endpoint when `true` (default `false`). -SMTP_QUEUE_PATH # Directory used to persist inbound messages (default ./data/spool). +SMTP_HEALTH_PORT # Override only the port component of the health address (e.g. 9090). +SMTP_HEALTH_DISABLE # Disable the health endpoint when `true` (default `false`). +SMTP_QUEUE_PATH # Directory used to persist inbound messages (default ./data/spool). +SMTP_QUEUE_WORKERS # Number of concurrent delivery workers processing the outbound queue (default logical CPU count). ``` #### Access control diff --git a/docs/index.html b/docs/index.html index 4e9bafc..6951ada 100644 --- a/docs/index.html +++ b/docs/index.html @@ -118,6 +118,7 @@

Features

  • Enforces host/IP allow-lists before sending the banner for reliable ingress control.
  • Persists accepted messages with per-recipient hashing so stored artefacts stay private yet reviewable.
  • Delivers outbound mail via MX resolution, randomized equal-priority retries, opportunistic STARTTLS, and jittered exponential backoff.
  • +
  • Scales with configurable concurrent delivery workers so the queue keeps up with bursty inbound traffic.
  • Supports DKIM signing when selector, key, and domain values are configured.
  • Provides observability through the health server, `/metrics`, and optional live audit streaming with runtime-configurable logging.
  • Configures entirely via environment variables (with `.env` support) for ports, banners, TLS/DKIM assets, and queue storage paths.
  • diff --git a/install.sh b/install.sh index a119152..ce57429 100755 --- a/install.sh +++ b/install.sh @@ -128,6 +128,7 @@ SMTP_HEALTH_PORT="$(ask SMTP_HEALTH_PORT 'SMTP_HEALTH_PORT' '8877')" SMTP_HEALTH_DISABLE="$(ask SMTP_HEALTH_DISABLE 'SMTP_HEALTH_DISABLE (true/false)' 'false')" SMTP_QUEUE_PATH="$(ask SMTP_QUEUE_PATH 'SMTP_QUEUE_PATH' "${DEFAULT_QUEUE_DIR}")" +SMTP_QUEUE_WORKERS="$(ask SMTP_QUEUE_WORKERS 'SMTP_QUEUE_WORKERS (concurrent workers, blank for auto)' '')" SMTP_ALLOW_NETWORKS="$(ask SMTP_ALLOW_NETWORKS 'SMTP_ALLOW_NETWORKS (CIDR list)' '')" SMTP_ALLOW_HOSTS="$(ask SMTP_ALLOW_HOSTS 'SMTP_ALLOW_HOSTS (comma-separated)' '127.0.0.1')" SMTP_REQUIRE_LOCAL_DOMAIN="$(ask SMTP_REQUIRE_LOCAL_DOMAIN 'SMTP_REQUIRE_LOCAL_DOMAIN (true/false)' 'true')" @@ -171,6 +172,7 @@ env_lines+=("Environment=\"SMTP_DEBUG=$(esc_env_val "$SMTP_DEBUG")\"") env_lines+=("Environment=\"SMTP_HEALTH_PORT=$(esc_env_val "$SMTP_HEALTH_PORT")\"") env_lines+=("Environment=\"SMTP_HEALTH_DISABLE=$(esc_env_val "$SMTP_HEALTH_DISABLE")\"") env_lines+=("Environment=\"SMTP_QUEUE_PATH=$(esc_env_val "$SMTP_QUEUE_PATH")\"") +[[ -n "$SMTP_QUEUE_WORKERS" ]] && env_lines+=("Environment=\"SMTP_QUEUE_WORKERS=$(esc_env_val "$SMTP_QUEUE_WORKERS")\"") env_lines+=("Environment=\"SMTP_ALLOW_NETWORKS=$(esc_env_val "$SMTP_ALLOW_NETWORKS")\"") env_lines+=("Environment=\"SMTP_ALLOW_HOSTS=$(esc_env_val "$SMTP_ALLOW_HOSTS")\"") env_lines+=("Environment=\"SMTP_REQUIRE_LOCAL_DOMAIN=$(esc_env_val "$SMTP_REQUIRE_LOCAL_DOMAIN")\"") diff --git a/internal/config/env_test.go b/internal/config/env_test.go index 51a54e2..1784da3 100644 --- a/internal/config/env_test.go +++ b/internal/config/env_test.go @@ -1,6 +1,9 @@ package config -import "testing" +import ( + "runtime" + "testing" +) func TestBool(t *testing.T) { t.Setenv("BOOL_TRUE", "true") @@ -20,3 +23,26 @@ func TestBool(t *testing.T) { t.Fatalf("unexpected override for unsupported values") } } + +func TestQueueWorkers(t *testing.T) { + t.Setenv("SMTP_QUEUE_WORKERS", "") + expectedDefault := runtime.NumCPU() + if got := QueueWorkers(); got != expectedDefault { + t.Fatalf("expected default workers %d, got %d", expectedDefault, got) + } + + t.Setenv("SMTP_QUEUE_WORKERS", "3") + if got := QueueWorkers(); got != 3 { + t.Fatalf("expected configured workers 3, got %d", got) + } + + t.Setenv("SMTP_QUEUE_WORKERS", "-5") + if got := QueueWorkers(); got != expectedDefault { + t.Fatalf("expected fallback to default for negative value, got %d", got) + } + + t.Setenv("SMTP_QUEUE_WORKERS", "noise") + if got := QueueWorkers(); got != expectedDefault { + t.Fatalf("expected fallback to default for invalid value, got %d", got) + } +} diff --git a/internal/config/queue.go b/internal/config/queue.go new file mode 100644 index 0000000..872371a --- /dev/null +++ b/internal/config/queue.go @@ -0,0 +1,22 @@ +package config + +import ( + "os" + "runtime" + "strconv" + "strings" +) + +// QueueWorkers returns the configured number of concurrent delivery workers. +// Defaults to the number of logical CPUs when unset or invalid. +func QueueWorkers() int { + value := strings.TrimSpace(os.Getenv("SMTP_QUEUE_WORKERS")) + if value == "" { + return runtime.NumCPU() + } + workers, err := strconv.Atoi(value) + if err != nil || workers < 1 { + return runtime.NumCPU() + } + return workers +} diff --git a/main.go b/main.go index 4ed3bc9..7198cac 100644 --- a/main.go +++ b/main.go @@ -19,22 +19,22 @@ import ( "github.com/joho/godotenv" - health "gopherpost/health" - audit "gopherpost/internal/audit" - "gopherpost/internal/config" - "gopherpost/internal/dkim" - "gopherpost/internal/email" - "gopherpost/internal/metrics" - "gopherpost/internal/version" - "gopherpost/queue" - "gopherpost/storage" - tlsconfig "gopherpost/tlsconfig" + health "gopherpost/health" + audit "gopherpost/internal/audit" + "gopherpost/internal/config" + "gopherpost/internal/dkim" + "gopherpost/internal/email" + "gopherpost/internal/metrics" + "gopherpost/internal/version" + "gopherpost/queue" + "gopherpost/storage" + tlsconfig "gopherpost/tlsconfig" ) const ( defaultSMTPPort = "2525" defaultHealthAddr = ":8080" - defaultBanner = "GopherPost ready" + defaultBanner = "GopherPost ready" maxMessageBytes = 10 << 20 // 10 MiB commandDeadline = 15 * time.Minute ) @@ -42,7 +42,7 @@ const ( func main() { _ = godotenv.Load() audit.RefreshFromEnv() - log.Printf("GopherPost version %s starting", version.Number) + log.Printf("GopherPost version %s starting", version.Number) audit.Log("version %s boot", version.Number) port := defaultSMTPPort @@ -89,11 +89,14 @@ func main() { log.Printf("Health endpoint listening on %s/healthz", healthListener.Addr().String()) } - q := queue.NewManager() + workerCount := config.QueueWorkers() + q := queue.NewManager(queue.WithWorkers(workerCount)) if dir := strings.TrimSpace(os.Getenv("SMTP_QUEUE_PATH")); dir != "" { storage.SetBaseDir(dir) log.Printf("Queue storage path set to %s", dir) } + log.Printf("Queue workers configured: %d", workerCount) + audit.Log("queue workers %d", workerCount) q.Start() defer q.Stop() @@ -141,7 +144,18 @@ func handleSession(conn net.Conn, q *queue.Manager, greeting string, hostname st prefixArgs := append([]any{sessionID}, args...) audit.Log("session %s "+format, prefixArgs...) } + send := func(code int, msg string) bool { + t := fmt.Sprintf("%d %s", code, msg) + if err := tp.PrintfLine(t); err != nil { + log.Printf("send error to %s: %v", remote, err) + alog("send error: %v", err) + return false + } + alog("sent %d %s", code, msg) + return true + } if !connAllowed(remoteAddr) { + _ = send(554, "5.7.1 Access denied") audit.Log("session %s rejected remote %s", sessionID, remote) return } @@ -159,17 +173,6 @@ func handleSession(conn net.Conn, q *queue.Manager, greeting string, hostname st return } - send := func(code int, msg string) bool { - t := fmt.Sprintf("%d %s", code, msg) - if err := tp.PrintfLine(t); err != nil { - log.Printf("send error to %s: %v", remote, err) - alog("send error: %v", err) - return false - } - alog("sent %d %s", code, msg) - return true - } - if !send(220, greeting) { return } @@ -319,15 +322,18 @@ func handleSession(conn net.Conn, q *queue.Manager, greeting string, hostname st } payload := queue.NewPayload(messageBytes) var queued []queue.QueuedMessage + var persistedPaths []string var persistErr error for _, rcpt := range to { - if err := storage.SaveMessage(messageID, from, rcpt, messageBytes); err != nil { + path, err := storage.SaveMessage(messageID, from, rcpt, messageBytes) + if err != nil { log.Printf("failed to persist message for %s: %v", rcpt, err) alog("storage error for %s: %v", rcpt, err) persistErr = err break } + persistedPaths = append(persistedPaths, path) queued = append(queued, queue.QueuedMessage{ ID: messageID, From: from, @@ -336,6 +342,12 @@ func handleSession(conn net.Conn, q *queue.Manager, greeting string, hostname st }) } if persistErr != nil { + for _, path := range persistedPaths { + if err := os.Remove(path); err != nil && !errors.Is(err, os.ErrNotExist) { + log.Printf("failed to roll back persisted message %s: %v", path, err) + alog("rollback error %s: %v", path, err) + } + } if !send(451, "Requested action aborted: storage failure") { return } diff --git a/queue/manager.go b/queue/manager.go index 146f3d9..b08866a 100644 --- a/queue/manager.go +++ b/queue/manager.go @@ -3,12 +3,13 @@ package queue import ( "log" "math/rand" + "runtime" "sync" "time" - "gopherpost/delivery" - audit "gopherpost/internal/audit" - "gopherpost/internal/metrics" + "gopherpost/delivery" + audit "gopherpost/internal/audit" + "gopherpost/internal/metrics" ) var deliverFunc = delivery.DeliverMessage @@ -19,18 +20,40 @@ func init() { // Manager manages a queue of outgoing messages with retry logic. type Manager struct { - queue []QueuedMessage - mu sync.Mutex - quit chan struct{} - once sync.Once + queue []QueuedMessage + mu sync.Mutex + quit chan struct{} + once sync.Once + stopOnce sync.Once + workers int +} + +// Option configures a Manager. +type Option func(*Manager) + +// WithWorkers overrides the number of concurrent delivery workers. +func WithWorkers(workers int) Option { + return func(m *Manager) { + if workers > 0 { + m.workers = workers + } + } } // NewManager creates a new delivery queue manager. -func NewManager() *Manager { - return &Manager{ - queue: make([]QueuedMessage, 0), - quit: make(chan struct{}), +func NewManager(opts ...Option) *Manager { + m := &Manager{ + queue: make([]QueuedMessage, 0), + quit: make(chan struct{}), + workers: runtime.NumCPU(), + } + for _, opt := range opts { + opt(m) } + if m.workers < 1 { + m.workers = 1 + } + return m } // Enqueue adds a message to the queue. @@ -41,8 +64,8 @@ func (m *Manager) Enqueue(msg QueuedMessage) { } m.mu.Lock() defer m.mu.Unlock() - if msg.Attempts == 0 { - msg.NextRetry = time.Now().Add(initialBackoff()) + if msg.Attempts == 0 && msg.NextRetry.IsZero() { + msg.NextRetry = time.Now() } m.queue = append(m.queue, msg) log.Printf("Queued message %s for %s (attempt %d)", msg.ID, msg.To, msg.Attempts) @@ -72,7 +95,9 @@ func (m *Manager) Start() { // Stop shuts down the queue processor. func (m *Manager) Stop() { - close(m.quit) + m.stopOnce.Do(func() { + close(m.quit) + }) } // processQueue attempts to deliver messages that are due. @@ -94,35 +119,55 @@ func (m *Manager) processQueue() { metrics.SetQueueDepth(len(m.queue)) m.mu.Unlock() + if len(due) == 0 { + return + } + + workerCount := m.workers + if workerCount < 1 { + workerCount = 1 + } + sem := make(chan struct{}, workerCount) + var wg sync.WaitGroup + for _, msg := range due { - payload := msg.Payload - if payload == nil { - log.Printf("Skipping message %s for %s: missing payload", msg.ID, msg.To) - audit.Log("queue skip %s -> %s missing payload", msg.ID, msg.To) - continue - } + msg := msg + wg.Add(1) + sem <- struct{}{} + go func() { + defer wg.Done() + defer func() { <-sem }() + + payload := msg.Payload + if payload == nil { + log.Printf("Skipping message %s for %s: missing payload", msg.ID, msg.To) + audit.Log("queue skip %s -> %s missing payload", msg.ID, msg.To) + return + } - err := deliverFunc(msg.From, msg.To, payload.Bytes()) - if err != nil { - msg.Attempts++ - msg.NextRetry = time.Now().Add(backoffDuration(msg.Attempts)) - msg.LastError = err.Error() - log.Printf("Retry %d for %s in %v (message %s): %v", msg.Attempts, msg.To, time.Until(msg.NextRetry), msg.ID, err) - metrics.DeliveryFailures.Add(1) - audit.Log("queue retry %s -> %s attempt %d next %s error %v", msg.ID, msg.To, msg.Attempts, msg.NextRetry.Format(time.RFC3339), err) - - m.mu.Lock() - m.queue = append(m.queue, msg) - metrics.SetQueueDepth(len(m.queue)) - m.mu.Unlock() - continue - } + if err := deliverFunc(msg.From, msg.To, payload.Bytes()); err != nil { + msg.Attempts++ + msg.NextRetry = time.Now().Add(backoffDuration(msg.Attempts)) + msg.LastError = err.Error() + log.Printf("Retry %d for %s in %v (message %s): %v", msg.Attempts, msg.To, time.Until(msg.NextRetry), msg.ID, err) + metrics.DeliveryFailures.Add(1) + audit.Log("queue retry %s -> %s attempt %d next %s error %v", msg.ID, msg.To, msg.Attempts, msg.NextRetry.Format(time.RFC3339), err) + + m.mu.Lock() + m.queue = append(m.queue, msg) + metrics.SetQueueDepth(len(m.queue)) + m.mu.Unlock() + return + } - msg.LastError = "" - log.Printf("Delivered message %s to %s", msg.ID, msg.To) - metrics.MessagesDelivered.Add(1) - audit.Log("queue delivered %s -> %s attempts %d", msg.ID, msg.To, msg.Attempts) + msg.LastError = "" + log.Printf("Delivered message %s to %s", msg.ID, msg.To) + metrics.MessagesDelivered.Add(1) + audit.Log("queue delivered %s -> %s attempts %d", msg.ID, msg.To, msg.Attempts) + }() } + + wg.Wait() } // Depth returns the current queue length. @@ -141,10 +186,6 @@ func backoffDuration(attempts int) time.Duration { return base + jitter } -func initialBackoff() time.Duration { - return time.Second -} - func min(a, b int) int { if a < b { return a diff --git a/queue/manager_test.go b/queue/manager_test.go index bc59337..1969d97 100644 --- a/queue/manager_test.go +++ b/queue/manager_test.go @@ -2,10 +2,12 @@ package queue import ( "errors" + "fmt" + "sync" "testing" "time" - "gopherpost/internal/metrics" + "gopherpost/internal/metrics" ) func TestManagerProcessQueueSuccess(t *testing.T) { @@ -96,3 +98,76 @@ func TestManagerProcessQueueFailure(t *testing.T) { t.Fatalf("expected LastError to be recorded") } } + +func TestManagerStopIdempotent(t *testing.T) { + m := NewManager() + m.Start() + m.Stop() + // second stop should not panic + m.Stop() +} + +func TestManagerProcessQueueWorkerConcurrency(t *testing.T) { + metrics.ResetForTests() + + originalDeliver := deliverFunc + defer func() { deliverFunc = originalDeliver }() + + var mu sync.Mutex + current := 0 + max := 0 + + deliverFunc = func(from, to string, data []byte) error { + mu.Lock() + current++ + if current > max { + max = current + } + mu.Unlock() + + time.Sleep(10 * time.Millisecond) + + mu.Lock() + current-- + mu.Unlock() + + return nil + } + + makeMessage := func(id int) QueuedMessage { + return QueuedMessage{ + ID: fmt.Sprintf("msg-%d", id), + From: "sender@example.com", + To: fmt.Sprintf("rcpt-%d@example.net", id), + Payload: NewPayload([]byte("body")), + Attempts: 1, + NextRetry: time.Now().Add(-time.Second), + } + } + + measure := func(workers int) int { + m := NewManager(WithWorkers(workers)) + for i := 0; i < 6; i++ { + m.Enqueue(makeMessage(workers*100 + i)) + } + + mu.Lock() + current = 0 + max = 0 + mu.Unlock() + + m.processQueue() + + mu.Lock() + defer mu.Unlock() + return max + } + + if serialMax := measure(1); serialMax != 1 { + t.Fatalf("expected serial worker to max at 1 concurrent delivery, got %d", serialMax) + } + + if parallelMax := measure(4); parallelMax <= 1 { + t.Fatalf("expected parallel workers to exceed 1 concurrent delivery, got %d", parallelMax) + } +} diff --git a/storage/files.go b/storage/files.go index 89d3940..f3230e3 100644 --- a/storage/files.go +++ b/storage/files.go @@ -13,21 +13,25 @@ import ( var baseDir = "./data/spool" -// SaveMessage stores the message on disk for inspection or retry persistence. -func SaveMessage(id string, from string, to string, data []byte) error { +// SaveMessage stores the message on disk for inspection or retry persistence and +// returns the full path to the persisted file. +func SaveMessage(id string, from string, to string, data []byte) (string, error) { safeID, err := sanitizeComponent(id) if err != nil { - return err + return "", err } recipientToken := hashRecipient(to) dir := filepath.Join(baseDir, time.Now().UTC().Format("2006-01-02")) if err := os.MkdirAll(dir, 0o755); err != nil { - return err + return "", err } filename := filepath.Join(dir, fmt.Sprintf("%s_%s.eml", safeID, recipientToken)) payload := append([]byte(nil), data...) - return os.WriteFile(filename, payload, 0o600) + if err := os.WriteFile(filename, payload, 0o600); err != nil { + return "", err + } + return filename, nil } // SetBaseDir allows overriding the storage location (useful for tests or configuration). diff --git a/storage/files_test.go b/storage/files_test.go index f52796a..7bfe02f 100644 --- a/storage/files_test.go +++ b/storage/files_test.go @@ -12,7 +12,7 @@ func TestSaveMessage(t *testing.T) { SetBaseDir(tmp) t.Cleanup(func() { SetBaseDir("./data/spool") }) - if err := SaveMessage("abc123", "from@example.com", "recipient@example.com", []byte("body")); err != nil { + if _, err := SaveMessage("abc123", "from@example.com", "recipient@example.com", []byte("body")); err != nil { t.Fatalf("SaveMessage returned error: %v", err) } @@ -52,7 +52,7 @@ func TestSaveMessageSanitizesID(t *testing.T) { SetBaseDir(tmp) t.Cleanup(func() { SetBaseDir("./data/spool") }) - if err := SaveMessage("../bad", "from@example.com", "recipient@example.com", []byte("body")); err == nil { + if _, err := SaveMessage("../bad", "from@example.com", "recipient@example.com", []byte("body")); err == nil { t.Fatalf("expected error for invalid identifier") } }