diff --git a/pkg/config/setup/common_settings.go b/pkg/config/setup/common_settings.go index 2dfeb3bb732828..103f1a453f564b 100644 --- a/pkg/config/setup/common_settings.go +++ b/pkg/config/setup/common_settings.go @@ -1845,6 +1845,13 @@ func logsagent(config pkgconfigmodel.Setup) { // Pipeline failover configuration config.BindEnvAndSetDefault("logs_config.pipeline_failover.enabled", false) config.BindEnvAndSetDefault("logs_config.pipeline_failover.router_channel_size", 5) + + // Disk retry: save payloads to disk when the destination is unreachable instead of dropping them. + // Set max_size_bytes to a non-zero value to enable. 0 = disabled (default). + config.BindEnvAndSetDefault("logs_config.disk_retry.max_size_bytes", 0) + config.BindEnvAndSetDefault("logs_config.disk_retry.path", "") + config.BindEnvAndSetDefault("logs_config.disk_retry.max_disk_ratio", 0.80) + config.BindEnvAndSetDefault("logs_config.disk_retry.file_ttl_days", 7) } // vector integration diff --git a/pkg/logs/sender/diskretry/retrier.go b/pkg/logs/sender/diskretry/retrier.go new file mode 100644 index 00000000000000..e7f7b862d0ba0f --- /dev/null +++ b/pkg/logs/sender/diskretry/retrier.go @@ -0,0 +1,397 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +// Package diskretry persists log payloads to disk while destinations are unavailable +// and replays them when connectivity recovers. +package diskretry + +import ( + "errors" + "fmt" + "math" + "os" + "path/filepath" + "sort" + "sync" + "time" + + "github.com/DataDog/datadog-agent/pkg/logs/message" + "github.com/DataDog/datadog-agent/pkg/telemetry" + "github.com/DataDog/datadog-agent/pkg/util/filesystem" + "github.com/DataDog/datadog-agent/pkg/util/log" +) + +const ( + retryFileExtension = ".retry" + // minSizeBytes is the minimum configurable disk budget. Prevents pathologically small limits. + minSizeBytes = 2 * 1024 * 1024 // 2 MB + // replayPollInterval is how often the replay loop checks for files when idle. + replayPollInterval = 1 * time.Second + // replayRetryInterval is how long the replay loop waits when it can't send (destination retrying). + replayRetryInterval = 5 * time.Second + // dirPermissions for the retry directory. + dirPermissions = 0700 +) + +var ( + tlmPayloadsWritten = telemetry.NewCounter("disk_retry", "payloads_written", []string{}, "Payloads written to disk") + tlmPayloadsRead = telemetry.NewCounter("disk_retry", "payloads_read", []string{}, "Payloads read from disk for replay") + tlmWriteErrors = telemetry.NewCounter("disk_retry", "write_errors", []string{}, "Disk write failures") + tlmReadErrors = telemetry.NewCounter("disk_retry", "read_errors", []string{}, "Disk read/deserialization failures") + tlmQueueFull = telemetry.NewCounter("disk_retry", "queue_full", []string{}, "Payloads dropped because disk is at capacity") + tlmUsageBytes = telemetry.NewGauge("disk_retry", "usage_bytes", []string{}, "Current disk usage in bytes") + tlmFileCount = telemetry.NewGauge("disk_retry", "file_count", []string{}, "Number of retry files on disk") +) + +// Retrier is the interface for the disk retry mechanism. +// Implementations: DiskRetryManager (enabled) and noopRetrier (disabled). +type Retrier interface { + // Store writes a payload to disk. Returns nil on success. + // Returns an error if the write fails (disk full, etc.) If so, the caller should drop the payload. + Store(payload *message.Payload) error + // StartReplayLoop starts a goroutine that replays retry files using the provided send function. + StartReplayLoop(send SendFunc) + // Stop signals the replay loop to stop and waits for it to exit. + Stop() +} + +// SendFunc is the function signature for attempting to send a payload to a destination. +// It mirrors DestinationSender.NonBlockingSend: returns true if the payload was accepted. +type SendFunc func(payload *message.Payload) bool + +// DiskRetryManager writes payloads to disk when the destination is unreachable, +// and replays them when connectivity recovers. +type DiskRetryManager struct { + storagePath string + maxSizeBytes int64 + maxDiskRatio float64 + fileTTLDays int + + mu sync.Mutex + filenames []string // FIFO ordered list of retry file paths + currentSize int64 // total bytes of retry files on disk + + disk filesystem.Disk + done chan struct{} + wg sync.WaitGroup +} + +// NewDiskRetryManager creates a new DiskRetryManager. The storagePath directory +// is created if it doesn't exist. Existing retry files from previous runs are reloaded. +func NewDiskRetryManager(storagePath string, maxSizeBytes int64, maxDiskRatio float64, fileTTLDays int) (*DiskRetryManager, error) { + if maxSizeBytes < minSizeBytes { + maxSizeBytes = minSizeBytes + } + + if err := os.MkdirAll(storagePath, dirPermissions); err != nil { + return nil, fmt.Errorf("failed to create disk retry directory %s: %w", storagePath, err) + } + + manager := &DiskRetryManager{ + storagePath: storagePath, + maxSizeBytes: maxSizeBytes, + maxDiskRatio: maxDiskRatio, + fileTTLDays: fileTTLDays, + disk: filesystem.NewDisk(), + done: make(chan struct{}), + } + + manager.reloadExistingFiles() + return manager, nil +} + +// Store serializes the payload and writes it atomically to disk. +func (manager *DiskRetryManager) Store(payload *message.Payload) error { + data, err := SerializePayload(payload) + if err != nil { + tlmWriteErrors.Inc() + return fmt.Errorf("serialization failed: %w", err) + } + + manager.mu.Lock() + defer manager.mu.Unlock() + + // Expire old files before checking capacity + manager.expireTTLLocked() + + // Check capacity + if !manager.hasCapacityLocked(int64(len(data))) { + tlmQueueFull.Inc() + return fmt.Errorf("disk retry queue full (%d bytes used of %d max)", manager.currentSize, manager.maxSizeBytes) + } + + // Atomic write: create temp file in same directory, write, close + file, err := os.CreateTemp(manager.storagePath, fmt.Sprintf("%d_*%s", time.Now().UnixNano(), retryFileExtension)) + if err != nil { + tlmWriteErrors.Inc() + return fmt.Errorf("failed to create temp file: %w", err) + } + + if _, err := file.Write(data); err != nil { + file.Close() + os.Remove(file.Name()) + tlmWriteErrors.Inc() + return fmt.Errorf("failed to write retry file: %w", err) + } + + if err := file.Close(); err != nil { + os.Remove(file.Name()) + tlmWriteErrors.Inc() + return fmt.Errorf("failed to close retry file: %w", err) + } + + manager.filenames = append(manager.filenames, file.Name()) + manager.currentSize += int64(len(data)) + manager.updateGauges() + + tlmPayloadsWritten.Inc() + log.Debugf("Disk retry: wrote payload to %s (%d bytes, %d files on disk)", file.Name(), len(data), len(manager.filenames)) + return nil +} + +// StartReplayLoop starts a goroutine that replays retry files using the provided send function. +// The send function should attempt a non-blocking send to the destination. +// Call Stop() to terminate the replay loop. +func (manager *DiskRetryManager) StartReplayLoop(send SendFunc) { + manager.wg.Add(1) + go manager.replayLoop(send) +} + +// Stop signals the replay loop to exit and waits for it to finish. +func (manager *DiskRetryManager) Stop() { + close(manager.done) + manager.wg.Wait() +} + +func (manager *DiskRetryManager) replayLoop(send SendFunc) { + defer manager.wg.Done() + + for { + select { + case <-manager.done: + return + default: + } + + payload, filePath := manager.readOldest() + if payload == nil { + // No files to replay; poll periodically + select { + case <-manager.done: + return + case <-time.After(replayPollInterval): + } + continue + } + + // Try to send. If the destination is retrying (returns false), + // back off and retry the same file. + if send(payload) { + // Success -> remove the file + manager.mu.Lock() + manager.removeFileLocked(filePath) + manager.mu.Unlock() + tlmPayloadsRead.Inc() + log.Debugf("Disk retry: replayed and removed %s", filePath) + } else { + // Destination not ready -> put the file back and wait + manager.mu.Lock() + // Re-insert at the front (it was the oldest) + manager.filenames = append([]string{filePath}, manager.filenames...) + manager.mu.Unlock() + + select { + case <-manager.done: + return + case <-time.After(replayRetryInterval): + } + } + } +} + +// readOldest reads and deserializes the oldest retry file. +// Returns (nil, "") if no files are available. +// The file is removed from the filenames list but NOT deleted from disk yet -- +// the caller must call removeFileLocked on success or re-insert on failure. +func (manager *DiskRetryManager) readOldest() (*message.Payload, string) { + manager.mu.Lock() + if len(manager.filenames) == 0 { + manager.mu.Unlock() + return nil, "" + } + filePath := manager.filenames[0] + manager.filenames = manager.filenames[1:] + manager.mu.Unlock() + + data, err := os.ReadFile(filePath) + if err != nil { + log.Warnf("Disk retry: failed to read %s: %v", filePath, err) + tlmReadErrors.Inc() + // File is unreadable -> remove it from disk + manager.mu.Lock() + manager.removeFileLocked(filePath) + manager.mu.Unlock() + return nil, "" + } + + payload, err := DeserializePayload(data) + if err != nil { + log.Warnf("Disk retry: failed to deserialize %s: %v", filePath, err) + tlmReadErrors.Inc() + // Corrupted file -> remove it + manager.mu.Lock() + manager.removeFileLocked(filePath) + manager.mu.Unlock() + return nil, "" + } + + return payload, filePath +} + +// removeFileLocked deletes a file from disk and updates the size counter. +// Must be called with manager.mu held. +func (manager *DiskRetryManager) removeFileLocked(filePath string) { + size, err := filesystem.GetFileSize(filePath) + if err == nil { + manager.currentSize -= size + if manager.currentSize < 0 { + manager.currentSize = 0 + } + } + if err := os.Remove(filePath); err != nil && !os.IsNotExist(err) { + log.Warnf("Disk retry: failed to remove %s: %v", filePath, err) + } + manager.updateGauges() +} + +// hasCapacityLocked checks whether there is room for a file of the given size. +// Must be called with manager.mu held. +func (manager *DiskRetryManager) hasCapacityLocked(fileSize int64) bool { + // Check configured limit + if manager.currentSize+fileSize > manager.maxSizeBytes { + return false + } + + // Check filesystem capacity + usage, err := manager.disk.GetUsage(manager.storagePath) + if err != nil { + log.Warnf("Disk retry: failed to check disk usage: %v", err) + return false + } + diskReserved := float64(usage.Total) * (1.0 - manager.maxDiskRatio) + available := int64(usage.Available) - int64(math.Ceil(diskReserved)) + return available >= fileSize +} + +// expireTTLLocked removes files older than fileTTLDays. Must be called with manager.mu held. +func (manager *DiskRetryManager) expireTTLLocked() { + if manager.fileTTLDays <= 0 { + return + } + cutoff := time.Now().Add(-time.Duration(manager.fileTTLDays) * 24 * time.Hour) + var kept []string + for _, f := range manager.filenames { + modTime, err := filesystem.GetFileModTime(f) + if err != nil { + // Can't stat -> remove it + manager.removeFileLocked(f) + continue + } + if modTime.Before(cutoff) { + log.Debugf("Disk retry: expiring old file %s (modified %s)", f, modTime) + manager.removeFileLocked(f) + } else { + kept = append(kept, f) + } + } + manager.filenames = kept +} + +// reloadExistingFiles scans the storage directory for retry files from previous runs. +// Files older than fileTTLDays are removed instead of being reloaded. +func (manager *DiskRetryManager) reloadExistingFiles() { + entries, err := os.ReadDir(manager.storagePath) + if err != nil { + log.Warnf("Disk retry: failed to scan directory %s: %v", manager.storagePath, err) + return + } + + type fileInfo struct { + path string + modTime time.Time + size int64 + } + var files []fileInfo + + var ttlCutoff time.Time + if manager.fileTTLDays > 0 { + ttlCutoff = time.Now().Add(-time.Duration(manager.fileTTLDays) * 24 * time.Hour) + } + + expired := 0 + for _, entry := range entries { + if entry.IsDir() || filepath.Ext(entry.Name()) != retryFileExtension { + continue + } + info, err := entry.Info() + if err != nil { + continue + } + // Apply TTL: remove files older than the configured limit + if !ttlCutoff.IsZero() && info.ModTime().Before(ttlCutoff) { + filePath := filepath.Join(manager.storagePath, entry.Name()) + os.Remove(filePath) + expired++ + continue + } + files = append(files, fileInfo{ + path: filepath.Join(manager.storagePath, entry.Name()), + modTime: info.ModTime(), + size: info.Size(), + }) + } + + if expired > 0 { + log.Infof("Disk retry: removed %d expired files from %s", expired, manager.storagePath) + } + + // Sort by modification time (oldest first = FIFO) + sort.Slice(files, func(i, j int) bool { + return files[i].modTime.Before(files[j].modTime) + }) + + manager.mu.Lock() + defer manager.mu.Unlock() + for _, f := range files { + manager.filenames = append(manager.filenames, f.path) + manager.currentSize += f.size + } + manager.updateGauges() + + if len(files) > 0 { + log.Infof("Disk retry: reloaded %d files (%d bytes) from %s", len(files), manager.currentSize, manager.storagePath) + } +} + +func (manager *DiskRetryManager) updateGauges() { + tlmUsageBytes.Set(float64(manager.currentSize)) + tlmFileCount.Set(float64(len(manager.filenames))) +} + +// noopRetrier is used when disk retry is disabled (max_size_bytes == 0). +type noopRetrier struct{} + +// NewNoopRetrier returns a Retrier that always fails to store, causing the caller to drop the payload. +func NewNoopRetrier() Retrier { + return &noopRetrier{} +} + +func (n *noopRetrier) Store(_ *message.Payload) error { + return errors.New("disk retry is disabled") +} + +func (n *noopRetrier) StartReplayLoop(_ SendFunc) {} + +func (n *noopRetrier) Stop() {} diff --git a/pkg/logs/sender/diskretry/retrier_test.go b/pkg/logs/sender/diskretry/retrier_test.go new file mode 100644 index 00000000000000..e3db93e880fdc9 --- /dev/null +++ b/pkg/logs/sender/diskretry/retrier_test.go @@ -0,0 +1,294 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package diskretry + +import ( + "os" + "path/filepath" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/DataDog/datadog-agent/pkg/logs/message" +) + +func testPayload(data string, count int) *message.Payload { + return makeTestPayload([]byte(data), "gzip", len(data), count) +} + +func TestNewDiskRetryManager(t *testing.T) { + dir := t.TempDir() + m, err := NewDiskRetryManager(dir, 10*1024*1024, 0.8, 7) + require.NoError(t, err) + defer m.Stop() + + assert.Equal(t, dir, m.storagePath) + assert.Equal(t, int64(10*1024*1024), m.maxSizeBytes) +} + +func TestNewDiskRetryManagerEnforcesMinSize(t *testing.T) { + dir := t.TempDir() + m, err := NewDiskRetryManager(dir, 100, 0.8, 7) // 100 bytes < minSizeBytes + require.NoError(t, err) + defer m.Stop() + + assert.Equal(t, int64(minSizeBytes), m.maxSizeBytes) +} + +func TestNewDiskRetryManagerCreatesDirectory(t *testing.T) { + dir := filepath.Join(t.TempDir(), "subdir", "retry") + m, err := NewDiskRetryManager(dir, 10*1024*1024, 0.8, 7) + require.NoError(t, err) + defer m.Stop() + + info, err := os.Stat(dir) + require.NoError(t, err) + assert.True(t, info.IsDir()) +} + +func TestStoreWritesFile(t *testing.T) { + dir := t.TempDir() + m, err := NewDiskRetryManager(dir, 10*1024*1024, 0.8, 7) + require.NoError(t, err) + defer m.Stop() + + payload := testPayload("test-data", 3) + err = m.Store(payload) + require.NoError(t, err) + + m.mu.Lock() + assert.Equal(t, 1, len(m.filenames)) + assert.True(t, m.currentSize > 0) + m.mu.Unlock() + + // Verify file exists on disk + entries, err := os.ReadDir(dir) + require.NoError(t, err) + retryFiles := 0 + for _, e := range entries { + if filepath.Ext(e.Name()) == retryFileExtension { + retryFiles++ + } + } + assert.Equal(t, 1, retryFiles) +} + +func TestStoreMultiplePayloads(t *testing.T) { + dir := t.TempDir() + m, err := NewDiskRetryManager(dir, 10*1024*1024, 0.8, 7) + require.NoError(t, err) + defer m.Stop() + + for i := 0; i < 5; i++ { + err = m.Store(testPayload("payload-data", 1)) + require.NoError(t, err) + } + + m.mu.Lock() + assert.Equal(t, 5, len(m.filenames)) + m.mu.Unlock() +} + +func TestStoreRejectsWhenAtCapacity(t *testing.T) { + dir := t.TempDir() + // Small capacity -- use minSizeBytes since that's the minimum enforced + m, err := NewDiskRetryManager(dir, minSizeBytes, 0.8, 7) + require.NoError(t, err) + defer m.Stop() + + // Write a payload that's close to the capacity + bigData := make([]byte, minSizeBytes-100) // leaves very little room + payload := makeTestPayload(bigData, "gzip", len(bigData), 1) + err = m.Store(payload) + require.NoError(t, err) + + // Next write should fail (exceeds maxSizeBytes) + err = m.Store(payload) + assert.Error(t, err) + assert.Contains(t, err.Error(), "disk retry queue full") +} + +func TestReloadExistingFiles(t *testing.T) { + dir := t.TempDir() + + // Write some retry files manually + payload := testPayload("existing-data", 2) + for i := 0; i < 3; i++ { + data, err := SerializePayload(payload) + require.NoError(t, err) + err = os.WriteFile(filepath.Join(dir, filepath.Base(t.TempDir())+retryFileExtension), data, 0600) + require.NoError(t, err) + } + + // Create manager -- should reload existing files + m, err := NewDiskRetryManager(dir, 10*1024*1024, 0.8, 7) + require.NoError(t, err) + defer m.Stop() + + m.mu.Lock() + assert.Equal(t, 3, len(m.filenames)) + assert.True(t, m.currentSize > 0) + m.mu.Unlock() +} + +func TestReplayLoopSendsAndDeletes(t *testing.T) { + dir := t.TempDir() + m, err := NewDiskRetryManager(dir, 10*1024*1024, 0.8, 7) + require.NoError(t, err) + + // Store a payload + err = m.Store(testPayload("replay-me", 1)) + require.NoError(t, err) + + // Start replay with a send function that always succeeds + var received atomic.Int32 + m.StartReplayLoop(func(_ *message.Payload) bool { + received.Add(1) + return true + }) + + // Wait for replay + assert.Eventually(t, func() bool { + return received.Load() >= 1 + }, 5*time.Second, 100*time.Millisecond) + + m.Stop() + + // File should be deleted + m.mu.Lock() + assert.Equal(t, 0, len(m.filenames)) + m.mu.Unlock() + + entries, _ := os.ReadDir(dir) + retryFiles := 0 + for _, e := range entries { + if filepath.Ext(e.Name()) == retryFileExtension { + retryFiles++ + } + } + assert.Equal(t, 0, retryFiles) +} + +func TestReplayLoopRetriesOnSendFailure(t *testing.T) { + dir := t.TempDir() + m, err := NewDiskRetryManager(dir, 10*1024*1024, 0.8, 7) + require.NoError(t, err) + + err = m.Store(testPayload("retry-me", 1)) + require.NoError(t, err) + + // Send function that fails 3 times then succeeds + var attempts atomic.Int32 + m.StartReplayLoop(func(_ *message.Payload) bool { + n := attempts.Add(1) + return n > 3 + }) + + assert.Eventually(t, func() bool { + return attempts.Load() > 3 + }, 30*time.Second, 100*time.Millisecond) + + m.Stop() + + // File should be deleted after successful send + m.mu.Lock() + assert.Equal(t, 0, len(m.filenames)) + m.mu.Unlock() +} + +func TestReplayLoopFIFOOrder(t *testing.T) { + dir := t.TempDir() + m, err := NewDiskRetryManager(dir, 10*1024*1024, 0.8, 7) + require.NoError(t, err) + + // Store 3 payloads with identifiable data + for i := 0; i < 3; i++ { + err = m.Store(testPayload(string(rune('A'+i)), 1)) + require.NoError(t, err) + time.Sleep(10 * time.Millisecond) // ensure distinct timestamps + } + + var mu sync.Mutex + var order []string + m.StartReplayLoop(func(payload *message.Payload) bool { + mu.Lock() + order = append(order, string(payload.Encoded)) + mu.Unlock() + return true + }) + + assert.Eventually(t, func() bool { + mu.Lock() + defer mu.Unlock() + return len(order) >= 3 + }, 5*time.Second, 100*time.Millisecond) + + m.Stop() + + mu.Lock() + assert.Equal(t, []string{"A", "B", "C"}, order) + mu.Unlock() +} + +func TestStopTerminatesReplayLoop(t *testing.T) { + dir := t.TempDir() + m, err := NewDiskRetryManager(dir, 10*1024*1024, 0.8, 7) + require.NoError(t, err) + + m.StartReplayLoop(func(_ *message.Payload) bool { return true }) + + // Stop should return promptly even if no files to replay + done := make(chan struct{}) + go func() { + m.Stop() + close(done) + }() + + select { + case <-done: + // OK + case <-time.After(5 * time.Second): + t.Fatal("Stop did not return in time") + } +} + +func TestNoopRetrierAlwaysFails(t *testing.T) { + r := NewNoopRetrier() + err := r.Store(testPayload("data", 1)) + assert.Error(t, err) + // Stop is safe to call + r.Stop() +} + +func TestExpireTTL(t *testing.T) { + dir := t.TempDir() + m, err := NewDiskRetryManager(dir, 10*1024*1024, 0.8, 0) // 0 TTL = no expiry + require.NoError(t, err) + defer m.Stop() + + err = m.Store(testPayload("data", 1)) + require.NoError(t, err) + + // With TTL 0, expiry doesn't run + m.mu.Lock() + m.expireTTLLocked() + assert.Equal(t, 1, len(m.filenames)) + m.mu.Unlock() + + // Now set TTL and backdate the file + m.fileTTLDays = 1 + // Touch the file to the past + oldTime := time.Now().Add(-48 * time.Hour) + m.mu.Lock() + os.Chtimes(m.filenames[0], oldTime, oldTime) + m.expireTTLLocked() + assert.Equal(t, 0, len(m.filenames)) + m.mu.Unlock() +} diff --git a/pkg/logs/sender/diskretry/serialization.go b/pkg/logs/sender/diskretry/serialization.go new file mode 100644 index 00000000000000..387639038de7b0 --- /dev/null +++ b/pkg/logs/sender/diskretry/serialization.go @@ -0,0 +1,173 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package diskretry + +import ( + "encoding/binary" + "errors" + "fmt" + + "github.com/DataDog/datadog-agent/comp/logs/agent/config" + "github.com/DataDog/datadog-agent/pkg/logs/message" + "github.com/DataDog/datadog-agent/pkg/logs/sources" +) + +// Binary format per file: +// +// [4 bytes] magic number +// [4 bytes] version (uint32 LE) +// [4 bytes] encoding string length (uint32 LE) +// [N bytes] encoding string +// [4 bytes] unencoded size (uint32 LE) +// [4 bytes] encoded payload length (uint32 LE) +// [N bytes] encoded payload (raw bytes) +// [4 bytes] message count (uint32 LE) + +const ( + fileMagic = uint32(0x44524554) // "DRET" (Disk RETry) + formatVersion = uint32(1) + headerSize = 4 + 4 // magic + version + minFileSize = headerSize + 4 + 4 + 4 // + encoding len + unencoded size + encoded len + count + maxPayloadSize = 100 * 1024 * 1024 // 100 MB sanity limit + maxMessageCount = 100_000 // sanity limit for message metadata count +) + +// SerializePayload serializes a message.Payload into the binary disk retry format. +func SerializePayload(payload *message.Payload) ([]byte, error) { + encodingBytes := []byte(payload.Encoding) + messageCount := uint32(len(payload.MessageMetas)) + + // Calculate total size + totalSize := headerSize + + 4 + len(encodingBytes) + // encoding string length + encoding + 4 + // unencoded size + 4 + len(payload.Encoded) + // encoded payload length + payload + 4 // message count + + buf := make([]byte, totalSize) + offset := 0 + + // Magic number + binary.LittleEndian.PutUint32(buf[offset:], fileMagic) + offset += 4 + + // Version + binary.LittleEndian.PutUint32(buf[offset:], formatVersion) + offset += 4 + + // Encoding string + binary.LittleEndian.PutUint32(buf[offset:], uint32(len(encodingBytes))) + offset += 4 + copy(buf[offset:], encodingBytes) + offset += len(encodingBytes) + + // Unencoded size + binary.LittleEndian.PutUint32(buf[offset:], uint32(payload.UnencodedSize)) + offset += 4 + + // Encoded payload + binary.LittleEndian.PutUint32(buf[offset:], uint32(len(payload.Encoded))) + offset += 4 + copy(buf[offset:], payload.Encoded) + offset += len(payload.Encoded) + + // Message count + binary.LittleEndian.PutUint32(buf[offset:], messageCount) + + return buf, nil +} + +// diskRetrySource is a shared LogSource used for deserialized payloads. +// It provides the minimum non-nil structure needed so that deserialized payloads +// can flow through the auditor without nil-pointer panics. The auditor skips +// registry updates for payloads with an empty Origin.Identifier. +var diskRetrySource = sources.NewLogSource("disk_retry", &config.LogsConfig{}) + +// DeserializePayload deserializes bytes produced by SerializePayload back into +// a message.Payload. The returned payload has minimal MessageMetadata entries +// (with empty Origin identifiers) so the auditor safely skips them. +func DeserializePayload(data []byte) (*message.Payload, error) { + if len(data) < minFileSize { + return nil, fmt.Errorf("file too small: %d bytes", len(data)) + } + + offset := 0 + + // Magic number + magic := binary.LittleEndian.Uint32(data[offset:]) + if magic != fileMagic { + return nil, fmt.Errorf("invalid magic number: 0x%08X", magic) + } + offset += 4 + + // Version + version := binary.LittleEndian.Uint32(data[offset:]) + if version != formatVersion { + return nil, fmt.Errorf("unsupported format version: %d", version) + } + offset += 4 + + // Encoding string + if offset+4 > len(data) { + return nil, errors.New("truncated file: missing encoding length") + } + encodingLen := binary.LittleEndian.Uint32(data[offset:]) + offset += 4 + if offset+int(encodingLen) > len(data) { + return nil, errors.New("truncated file: encoding string") + } + encoding := string(data[offset : offset+int(encodingLen)]) + offset += int(encodingLen) + + // Unencoded size + if offset+4 > len(data) { + return nil, errors.New("truncated file: missing unencoded size") + } + unencodedSize := binary.LittleEndian.Uint32(data[offset:]) + offset += 4 + + // Encoded payload + if offset+4 > len(data) { + return nil, errors.New("truncated file: missing encoded length") + } + encodedLen := binary.LittleEndian.Uint32(data[offset:]) + offset += 4 + if encodedLen > maxPayloadSize { + return nil, fmt.Errorf("encoded payload too large: %d bytes", encodedLen) + } + if offset+int(encodedLen) > len(data) { + return nil, errors.New("truncated file: encoded payload") + } + encoded := make([]byte, encodedLen) + copy(encoded, data[offset:offset+int(encodedLen)]) + offset += int(encodedLen) + + // Message count + if offset+4 > len(data) { + return nil, errors.New("truncated file: missing message count") + } + messageCount := binary.LittleEndian.Uint32(data[offset:]) + if messageCount > maxMessageCount { + return nil, fmt.Errorf("message count too large: %d (max %d)", messageCount, maxMessageCount) + } + + // Build minimal MessageMetadata entries so payload.Count() returns the + // correct value and the auditor doesn't panic on nil Origin pointers. + // The auditor skips updates when Origin.Identifier is empty. + metas := make([]*message.MessageMetadata, messageCount) + for i := range metas { + metas[i] = &message.MessageMetadata{ + Origin: message.NewOrigin(diskRetrySource), + } + } + + return &message.Payload{ + MessageMetas: metas, + Encoded: encoded, + Encoding: encoding, + UnencodedSize: int(unencodedSize), + }, nil +} diff --git a/pkg/logs/sender/diskretry/serialization_test.go b/pkg/logs/sender/diskretry/serialization_test.go new file mode 100644 index 00000000000000..7a8036c4478c20 --- /dev/null +++ b/pkg/logs/sender/diskretry/serialization_test.go @@ -0,0 +1,140 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package diskretry + +import ( + "encoding/binary" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/DataDog/datadog-agent/pkg/logs/message" +) + +func makeTestPayload(encoded []byte, encoding string, unencodedSize int, msgCount int) *message.Payload { + metas := make([]*message.MessageMetadata, msgCount) + for i := range metas { + metas[i] = &message.MessageMetadata{} + } + return &message.Payload{ + MessageMetas: metas, + Encoded: encoded, + Encoding: encoding, + UnencodedSize: unencodedSize, + } +} + +func TestSerializeDeserializeRoundTrip(t *testing.T) { + encoded := []byte("compressed-log-data-here") + payload := makeTestPayload(encoded, "gzip", 1024, 5) + + data, err := SerializePayload(payload) + require.NoError(t, err) + + result, err := DeserializePayload(data) + require.NoError(t, err) + + assert.Equal(t, payload.Encoded, result.Encoded) + assert.Equal(t, payload.Encoding, result.Encoding) + assert.Equal(t, payload.UnencodedSize, result.UnencodedSize) + assert.Equal(t, payload.Count(), result.Count()) +} + +func TestSerializeDeserializeEmptyEncoding(t *testing.T) { + payload := makeTestPayload([]byte("data"), "", 100, 1) + + data, err := SerializePayload(payload) + require.NoError(t, err) + + result, err := DeserializePayload(data) + require.NoError(t, err) + + assert.Equal(t, "", result.Encoding) + assert.Equal(t, []byte("data"), result.Encoded) +} + +func TestSerializeDeserializeEmptyPayload(t *testing.T) { + payload := makeTestPayload([]byte{}, "deflate", 0, 0) + + data, err := SerializePayload(payload) + require.NoError(t, err) + + result, err := DeserializePayload(data) + require.NoError(t, err) + + assert.Equal(t, []byte{}, result.Encoded) + assert.Equal(t, int64(0), result.Count()) +} + +func TestSerializeDeserializeLargePayload(t *testing.T) { + // 1 MB payload + encoded := make([]byte, 1024*1024) + for i := range encoded { + encoded[i] = byte(i % 256) + } + payload := makeTestPayload(encoded, "zstd", 2*1024*1024, 100) + + data, err := SerializePayload(payload) + require.NoError(t, err) + + result, err := DeserializePayload(data) + require.NoError(t, err) + + assert.Equal(t, encoded, result.Encoded) + assert.Equal(t, int64(100), result.Count()) + assert.Equal(t, 2*1024*1024, result.UnencodedSize) +} + +func TestDeserializeInvalidMagic(t *testing.T) { + data := make([]byte, 32) + binary.LittleEndian.PutUint32(data[0:], 0xDEADBEEF) + _, err := DeserializePayload(data) + assert.ErrorContains(t, err, "invalid magic number") +} + +func TestDeserializeUnsupportedVersion(t *testing.T) { + data := make([]byte, 32) + binary.LittleEndian.PutUint32(data[0:], fileMagic) + binary.LittleEndian.PutUint32(data[4:], 99) + _, err := DeserializePayload(data) + assert.ErrorContains(t, err, "unsupported format version") +} + +func TestDeserializeTruncatedFile(t *testing.T) { + // Too small + _, err := DeserializePayload([]byte{1, 2, 3}) + assert.ErrorContains(t, err, "file too small") +} + +func TestDeserializeTruncatedEncodedPayload(t *testing.T) { + payload := makeTestPayload([]byte("data"), "gzip", 100, 1) + data, err := SerializePayload(payload) + require.NoError(t, err) + + // Truncate the file mid-payload + _, err = DeserializePayload(data[:len(data)-5]) + assert.Error(t, err) +} + +func TestDeserializedPayloadHasValidOrigin(t *testing.T) { + payload := makeTestPayload([]byte("data"), "gzip", 100, 3) + + data, err := SerializePayload(payload) + require.NoError(t, err) + + result, err := DeserializePayload(data) + require.NoError(t, err) + + // Verify each meta has a non-nil Origin with empty Identifier + // (so the auditor skips registry updates without panicking) + for _, meta := range result.MessageMetas { + require.NotNil(t, meta.Origin) + assert.Empty(t, meta.Origin.Identifier) + require.NotNil(t, meta.Origin.LogSource) + require.NotNil(t, meta.Origin.LogSource.Config) + } +} diff --git a/pkg/logs/sender/go.mod b/pkg/logs/sender/go.mod index f39bf33fb54e2b..4b7e26a2925e27 100644 --- a/pkg/logs/sender/go.mod +++ b/pkg/logs/sender/go.mod @@ -14,6 +14,7 @@ require ( github.com/DataDog/datadog-agent/pkg/logs/status/statusinterface v0.61.0 github.com/DataDog/datadog-agent/pkg/telemetry v0.64.1 github.com/DataDog/datadog-agent/pkg/util/compression v0.56.0-rc.3 + github.com/DataDog/datadog-agent/pkg/util/filesystem v0.61.0 github.com/DataDog/datadog-agent/pkg/util/log v0.73.0-rc.5 github.com/benbjohnson/clock v1.3.5 github.com/stretchr/testify v1.11.1 @@ -49,7 +50,6 @@ require ( github.com/DataDog/datadog-agent/pkg/util/backoff v0.61.0 // indirect github.com/DataDog/datadog-agent/pkg/util/defaultpaths v0.64.0-devel // indirect github.com/DataDog/datadog-agent/pkg/util/executable v0.61.0 // indirect - github.com/DataDog/datadog-agent/pkg/util/filesystem v0.61.0 // indirect github.com/DataDog/datadog-agent/pkg/util/flavor v0.0.0-00010101000000-000000000000 // indirect github.com/DataDog/datadog-agent/pkg/util/fxutil v0.73.0-rc.5 // indirect github.com/DataDog/datadog-agent/pkg/util/http v0.61.0 // indirect diff --git a/pkg/logs/sender/http/http_sender.go b/pkg/logs/sender/http/http_sender.go index 54bbd48aa160b5..05b81395bfb95a 100644 --- a/pkg/logs/sender/http/http_sender.go +++ b/pkg/logs/sender/http/http_sender.go @@ -66,6 +66,7 @@ func NewHTTPSender( queueCount, workersPerQueue, pipelineMonitor, + nil, // retrier: created at the pipeline level, nil defaults to noop ) } diff --git a/pkg/logs/sender/sender.go b/pkg/logs/sender/sender.go index 5afba9d09e7a17..39e7225c41a167 100644 --- a/pkg/logs/sender/sender.go +++ b/pkg/logs/sender/sender.go @@ -7,12 +7,14 @@ package sender import ( "fmt" + "path/filepath" "sync" pkgconfigmodel "github.com/DataDog/datadog-agent/pkg/config/model" "github.com/DataDog/datadog-agent/pkg/logs/client" "github.com/DataDog/datadog-agent/pkg/logs/message" "github.com/DataDog/datadog-agent/pkg/logs/metrics" + "github.com/DataDog/datadog-agent/pkg/logs/sender/diskretry" "github.com/DataDog/datadog-agent/pkg/util/log" "go.uber.org/atomic" @@ -57,6 +59,7 @@ type PipelineComponent interface { type Sender struct { workers []*worker queues []chan *message.Payload + retrier diskretry.Retrier pipelineMonitor metrics.PipelineMonitor idx *atomic.Uint32 @@ -120,7 +123,12 @@ func NewSender( queueCount int, workersPerQueue int, pipelineMonitor metrics.PipelineMonitor, + retrier diskretry.Retrier, ) *Sender { + if retrier == nil { + retrier = newRetrierFromConfig(config) + } + var workers []*worker if queueCount <= 0 { queueCount = DefaultQueuesCount @@ -145,6 +153,7 @@ func NewSender( serverlessMeta, pipelineMonitor, workerID, + retrier, ) workers = append(workers, worker) } @@ -152,6 +161,7 @@ func NewSender( return &Sender{ workers: workers, + retrier: retrier, pipelineMonitor: pipelineMonitor, queues: queues, idx: &atomic.Uint32{}, @@ -169,16 +179,26 @@ func (s *Sender) PipelineMonitor() metrics.PipelineMonitor { return s.pipelineMonitor } -// Start starts all sender workers. +// Start starts all sender workers and the disk retry replay loop. func (s *Sender) Start() { for _, worker := range s.workers { worker.start() } + // Disk retry replay loop -> replayed payloads are fed back through the worker input channel. + s.retrier.StartReplayLoop(func(payload *message.Payload) bool { + select { + case s.queues[0] <- payload: + return true + default: + return false + } + }) } -// Stop stops all sender workers +// Stop stops all sender workers and the disk retry replay loop. func (s *Sender) Stop() { log.Debug("sender mux stopping") + s.retrier.Stop() for _, s := range s.workers { s.stop() } @@ -186,3 +206,29 @@ func (s *Sender) Stop() { close(q) } } + +// newRetrierFromConfig creates a disk retrier based on agent configuration. +// Returns a noopRetrier when disk retry is disabled (max_size_bytes == 0). +func newRetrierFromConfig(cfg pkgconfigmodel.Reader) diskretry.Retrier { + maxSizeBytes := cfg.GetInt64("logs_config.disk_retry.max_size_bytes") + if maxSizeBytes <= 0 { + return diskretry.NewNoopRetrier() + } + + storagePath := cfg.GetString("logs_config.disk_retry.path") + if storagePath == "" { + storagePath = filepath.Join(cfg.GetString("logs_config.run_path"), "logs-retry") + } + maxDiskRatio := cfg.GetFloat64("logs_config.disk_retry.max_disk_ratio") + fileTTLDays := cfg.GetInt("logs_config.disk_retry.file_ttl_days") + + retrier, err := diskretry.NewDiskRetryManager(storagePath, maxSizeBytes, maxDiskRatio, fileTTLDays) + if err != nil { + log.Errorf("Disk retry: failed to initialize, falling back to noop: %v", err) + return diskretry.NewNoopRetrier() + } + + log.Infof("Disk retry: enabled with max_size_bytes=%d, path=%s, max_disk_ratio=%.2f, file_ttl_days=%d", + maxSizeBytes, storagePath, maxDiskRatio, fileTTLDays) + return retrier +} diff --git a/pkg/logs/sender/sender_test.go b/pkg/logs/sender/sender_test.go index 6901f6947e42cd..ae77635561de68 100644 --- a/pkg/logs/sender/sender_test.go +++ b/pkg/logs/sender/sender_test.go @@ -56,6 +56,7 @@ func TestNewSenderWorkerDistribution(t *testing.T) { tc.queuesCount, tc.workersPerQueue, pipelineMonitor, + nil, // retrier: defaults to noop ) assert.Equal(t, tc.expectedWorkers, len(sender.workers)) diff --git a/pkg/logs/sender/tcp/tcp_sender.go b/pkg/logs/sender/tcp/tcp_sender.go index 4920483de46b77..9838b40d7bc6cc 100644 --- a/pkg/logs/sender/tcp/tcp_sender.go +++ b/pkg/logs/sender/tcp/tcp_sender.go @@ -44,6 +44,7 @@ func NewTCPSender( queueCount, workersPerQueue, pipelineMonitor, + nil, // retrier: created at the pipeline level, nil defaults to noop ) } diff --git a/pkg/logs/sender/worker.go b/pkg/logs/sender/worker.go index af286120e447ef..ea868a94f80e91 100644 --- a/pkg/logs/sender/worker.go +++ b/pkg/logs/sender/worker.go @@ -14,6 +14,7 @@ import ( "github.com/DataDog/datadog-agent/pkg/logs/client" "github.com/DataDog/datadog-agent/pkg/logs/message" "github.com/DataDog/datadog-agent/pkg/logs/metrics" + "github.com/DataDog/datadog-agent/pkg/logs/sender/diskretry" "github.com/DataDog/datadog-agent/pkg/telemetry" "github.com/DataDog/datadog-agent/pkg/util/log" ) @@ -43,6 +44,7 @@ type worker struct { flushWg *sync.WaitGroup sink Sink workerID string + retrier diskretry.Retrier pipelineMonitor metrics.PipelineMonitor utilization metrics.UtilizationMonitor @@ -57,6 +59,7 @@ func newWorker( serverlessMeta ServerlessMeta, pipelineMonitor metrics.PipelineMonitor, workerID string, + retrier diskretry.Retrier, ) *worker { var senderDoneChan chan *sync.WaitGroup var flushWg *sync.WaitGroup @@ -76,6 +79,7 @@ func newWorker( done: make(chan struct{}), finished: make(chan struct{}), workerID: workerID, + retrier: retrier, // Telemetry pipelineMonitor: pipelineMonitor, @@ -117,6 +121,7 @@ func (s *worker) run() { senderDoneWg := &sync.WaitGroup{} sent := false + storedToDisk := false for !sent { for _, destSender := range reliableDestinations { // Drop non-MRF payloads to MRF destinations @@ -139,44 +144,57 @@ func (s *worker) run() { } if !sent { - // Throttle the poll loop while waiting for a send to succeed - // This will only happen when all reliable destinations - // are blocked so logs have no where to go. + // All reliable destinations are retrying (network outage). + // Try to save to disk instead of blocking the pipeline. + if err := s.retrier.Store(payload); err == nil { + sent = true + storedToDisk = true + break + } + // Disk write failed or disabled we throttle and keep waiting for a destination. time.Sleep(100 * time.Millisecond) } } - for i, destSender := range reliableDestinations { - // Drop non-MRF payloads to MRF destinations - if destSender.destination.IsMRF() && !payload.IsMRF() { - log.Debugf("Dropping non-MRF payload to MRF destination: %s", destSender.destination.Target()) - sent = true - continue - } - // If an endpoint is stuck in the previous step, try to buffer the payloads if we have room to mitigate - // loss on intermittent failures. - if !destSender.lastSendSucceeded { - if !destSender.NonBlockingSend(payload) { - tlmPayloadsDropped.Inc("true", strconv.Itoa(i)) - tlmMessagesDropped.Add(float64(payload.Count()), "true", strconv.Itoa(i)) + // Skip remaining send attempts if the payload was persisted to disk. + // It will be replayed later; sending it now would cause duplicates. + if !storedToDisk { + for i, destSender := range reliableDestinations { + // Drop non-MRF payloads to MRF destinations + if destSender.destination.IsMRF() && !payload.IsMRF() { + log.Debugf("Dropping non-MRF payload to MRF destination: %s", destSender.destination.Target()) + sent = true + continue + } + // If an endpoint is stuck in the previous step, try to buffer the payloads if we have room to mitigate + // loss on intermittent failures. + if !destSender.lastSendSucceeded { + if !destSender.NonBlockingSend(payload) { + // Buffer is full we attempt to save to disk instead of dropping. + if err := s.retrier.Store(payload); err != nil { + // Disk write failed too; payload is truly lost + tlmPayloadsDropped.Inc("true", strconv.Itoa(i)) + tlmMessagesDropped.Add(float64(payload.Count()), "true", strconv.Itoa(i)) + } + } } } - } - // Attempt to send to unreliable destinations - for i, destSender := range unreliableDestinations { - // Drop non-MRF payloads to MRF destinations - if destSender.destination.IsMRF() && !payload.IsMRF() { - log.Debugf("Dropping non-MRF payload to MRF destination: %s", destSender.destination.Target()) - sent = true - continue - } - if !destSender.NonBlockingSend(payload) { - tlmPayloadsDropped.Inc("false", strconv.Itoa(i)) - tlmMessagesDropped.Add(float64(payload.Count()), "false", strconv.Itoa(i)) - if s.senderDoneChan != nil { - senderDoneWg.Add(1) - s.senderDoneChan <- senderDoneWg + // Attempt to send to unreliable destinations + for i, destSender := range unreliableDestinations { + // Drop non-MRF payloads to MRF destinations + if destSender.destination.IsMRF() && !payload.IsMRF() { + log.Debugf("Dropping non-MRF payload to MRF destination: %s", destSender.destination.Target()) + sent = true + continue + } + if !destSender.NonBlockingSend(payload) { + tlmPayloadsDropped.Inc("false", strconv.Itoa(i)) + tlmMessagesDropped.Add(float64(payload.Count()), "false", strconv.Itoa(i)) + if s.senderDoneChan != nil { + senderDoneWg.Add(1) + s.senderDoneChan <- senderDoneWg + } } } } diff --git a/pkg/logs/sender/worker_test.go b/pkg/logs/sender/worker_test.go index aee0549d50fc22..a3583d14354bcb 100644 --- a/pkg/logs/sender/worker_test.go +++ b/pkg/logs/sender/worker_test.go @@ -18,6 +18,7 @@ import ( "github.com/DataDog/datadog-agent/pkg/logs/client/tcp" "github.com/DataDog/datadog-agent/pkg/logs/message" "github.com/DataDog/datadog-agent/pkg/logs/metrics" + "github.com/DataDog/datadog-agent/pkg/logs/sender/diskretry" "github.com/DataDog/datadog-agent/pkg/logs/sources" "github.com/DataDog/datadog-agent/pkg/logs/status/statusinterface" ) @@ -64,7 +65,7 @@ func TestSender(t *testing.T) { } cfg := configmock.New(t) - worker := newWorker(cfg, input, auditor, destinationFactory, 0, NewMockServerlessMeta(false), metrics.NewNoopPipelineMonitor(""), "test") + worker := newWorker(cfg, input, auditor, destinationFactory, 0, NewMockServerlessMeta(false), metrics.NewNoopPipelineMonitor(""), "test", diskretry.NewNoopRetrier()) worker.start() expectedMessage := newMessage([]byte("fake line"), source, "") @@ -95,7 +96,7 @@ func TestSenderSingleDestination(t *testing.T) { return client.NewDestinations([]client.Destination{server.Destination}, nil) } - worker := newWorker(cfg, input, auditor, destinationFactory, 10, NewMockServerlessMeta(false), metrics.NewNoopPipelineMonitor(""), "test") + worker := newWorker(cfg, input, auditor, destinationFactory, 10, NewMockServerlessMeta(false), metrics.NewNoopPipelineMonitor(""), "test", diskretry.NewNoopRetrier()) worker.start() input <- &message.Payload{} @@ -128,7 +129,7 @@ func TestSenderDualReliableDestination(t *testing.T) { return client.NewDestinations([]client.Destination{server1.Destination, server2.Destination}, nil) } - worker := newWorker(cfg, input, auditor, destinationFactory, 10, NewMockServerlessMeta(false), metrics.NewNoopPipelineMonitor(""), "test") + worker := newWorker(cfg, input, auditor, destinationFactory, 10, NewMockServerlessMeta(false), metrics.NewNoopPipelineMonitor(""), "test", diskretry.NewNoopRetrier()) worker.start() input <- &message.Payload{} @@ -166,7 +167,7 @@ func TestSenderUnreliableAdditionalDestination(t *testing.T) { return client.NewDestinations([]client.Destination{server1.Destination}, []client.Destination{server2.Destination}) } - worker := newWorker(cfg, input, auditor, destinationFactory, 10, NewMockServerlessMeta(false), metrics.NewNoopPipelineMonitor(""), "test") + worker := newWorker(cfg, input, auditor, destinationFactory, 10, NewMockServerlessMeta(false), metrics.NewNoopPipelineMonitor(""), "test", diskretry.NewNoopRetrier()) worker.start() input <- &message.Payload{} @@ -202,7 +203,7 @@ func TestSenderUnreliableStopsWhenMainFails(t *testing.T) { return client.NewDestinations([]client.Destination{reliableServer.Destination}, []client.Destination{unreliableServer.Destination}) } - worker := newWorker(cfg, input, auditor, destinationFactory, 10, NewMockServerlessMeta(false), metrics.NewNoopPipelineMonitor(""), "test") + worker := newWorker(cfg, input, auditor, destinationFactory, 10, NewMockServerlessMeta(false), metrics.NewNoopPipelineMonitor(""), "test", diskretry.NewNoopRetrier()) worker.start() input <- &message.Payload{} @@ -254,7 +255,7 @@ func TestSenderReliableContinuseWhenOneFails(t *testing.T) { return client.NewDestinations([]client.Destination{reliableServer1.Destination, reliableServer2.Destination}, nil) } - worker := newWorker(cfg, input, auditor, destinationFactory, 10, NewMockServerlessMeta(false), metrics.NewNoopPipelineMonitor(""), "test") + worker := newWorker(cfg, input, auditor, destinationFactory, 10, NewMockServerlessMeta(false), metrics.NewNoopPipelineMonitor(""), "test", diskretry.NewNoopRetrier()) worker.start() input <- &message.Payload{} @@ -303,7 +304,7 @@ func TestSenderReliableWhenOneFailsAndRecovers(t *testing.T) { return client.NewDestinations([]client.Destination{reliableServer1.Destination, reliableServer2.Destination}, nil) } - worker := newWorker(cfg, input, auditor, destinationFactory, 10, NewMockServerlessMeta(false), metrics.NewNoopPipelineMonitor(""), "test") + worker := newWorker(cfg, input, auditor, destinationFactory, 10, NewMockServerlessMeta(false), metrics.NewNoopPipelineMonitor(""), "test", diskretry.NewNoopRetrier()) worker.start() input <- &message.Payload{} @@ -369,7 +370,7 @@ func TestMRFPayloads(t *testing.T) { return client.NewDestinations([]client.Destination{reliableServer1.Destination}, nil) } - worker := newWorker(cfg, input, auditor, destinationFactory, 10, NewMockServerlessMeta(false), metrics.NewNoopPipelineMonitor(""), "test") + worker := newWorker(cfg, input, auditor, destinationFactory, 10, NewMockServerlessMeta(false), metrics.NewNoopPipelineMonitor(""), "test", diskretry.NewNoopRetrier()) worker.start() input <- &message.Payload{} diff --git a/releasenotes/notes/journal-payloads-to-disk-5b355f28a7998deb.yaml b/releasenotes/notes/journal-payloads-to-disk-5b355f28a7998deb.yaml new file mode 100644 index 00000000000000..77967898d6ea61 --- /dev/null +++ b/releasenotes/notes/journal-payloads-to-disk-5b355f28a7998deb.yaml @@ -0,0 +1,19 @@ +# Each section from every release note are combined when the +# CHANGELOG.rst is rendered. So the text needs to be worded so that +# it does not depend on any information only available in another +# section. This may mean repeating some details, but each section +# must be readable independently of the other. +# +# Each section note must be formatted as reStructuredText. +--- +features: + - | + Add opt-in disk retry for logs. During network outages, the logs sender now + writes payloads that would otherwise be dropped to disk and replays them in + FIFO order when connectivity recovers. Enable by setting + ``logs_config.disk_retry.max_size_bytes`` to a non-zero value in + ``datadog.yaml``. Additional configuration: + ``logs_config.disk_retry.path`` (default: ``/logs-retry``), + ``logs_config.disk_retry.max_disk_ratio`` (default: ``0.80``), + ``logs_config.disk_retry.file_ttl_days`` (default: ``7``). + When disabled (default), behavior is identical to previous releases.