From 98aa9d50eebeb367bbe90262ded061eb8c61ec07 Mon Sep 17 00:00:00 2001 From: Ischca Date: Sat, 3 Jan 2026 17:48:11 +0900 Subject: [PATCH] =?UTF-8?q?=E6=8E=A5=E7=B6=9A=E3=82=A8=E3=83=A9=E3=83=BC?= =?UTF-8?q?=E6=99=82=E3=81=AE=E8=87=AA=E5=8B=95=E5=86=8D=E6=8E=A5=E7=B6=9A?= =?UTF-8?q?=E6=A9=9F=E8=83=BD=E3=82=92=E8=BF=BD=E5=8A=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - IsConnectionError(): 接続エラー(closed, EOF, timeout等)を検出 - Reconnect(): Redis接続プールを再作成 - 接続エラー時に最大3回リトライ(指数バックオフ) - エラー時のreturnをcontinueに変更し、残りのキー処理を継続 - 再接続時のスレッドセーフ性を確保するためmutexを追加 ネットワーク断やシステムスリープ後に接続エラーが発生しても、 スキャナーが完全に停止せず処理を継続できるようになります。 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- cmd/app/index.go | 2 +- cmd/app/inventory.go | 2 +- src/adapter/rservice.go | 61 ++++++++++++++++++++++++++++++++++++----- src/scanner/scanner.go | 37 +++++++++++++++++++++++-- 4 files changed, 90 insertions(+), 12 deletions(-) diff --git a/cmd/app/index.go b/cmd/app/index.go index 3925791..6dea383 100644 --- a/cmd/app/index.go +++ b/cmd/app/index.go @@ -28,7 +28,7 @@ var indexCmd = &cobra.Command{ } redisScanner := scanner.NewScanner( - adapter.NewRedisService(clientSource), + adapter.NewRedisService(clientSource, args[0]), adapter.NewPrettyProgressWriter(os.Stdout), consoleLogger, ) diff --git a/cmd/app/inventory.go b/cmd/app/inventory.go index 0389991..7d8e2d1 100644 --- a/cmd/app/inventory.go +++ b/cmd/app/inventory.go @@ -28,7 +28,7 @@ var scanCmd = &cobra.Command{ } redisScanner := scanner.NewScanner( - adapter.NewRedisService(clientSource), + adapter.NewRedisService(clientSource, args[0]), adapter.NewPrettyProgressWriter(os.Stdout), consoleLogger, ) diff --git a/src/adapter/rservice.go b/src/adapter/rservice.go index f8514ee..919fda6 100644 --- a/src/adapter/rservice.go +++ b/src/adapter/rservice.go @@ -2,6 +2,8 @@ package adapter import ( "context" + "strings" + "sync" "time" "github.com/mediocregopher/radix/v4" @@ -15,19 +17,56 @@ type ScanOptions struct { } // NewRedisService creates RedisService -func NewRedisService(client radix.Client) RedisService { - return RedisService{ - client: client, +func NewRedisService(client radix.Client, redisURL string) *RedisService { + return &RedisService{ + client: client, + redisURL: redisURL, } } // RedisService implementation for iteration over redis type RedisService struct { - client radix.Client + client radix.Client + redisURL string + mu sync.RWMutex +} + +// IsConnectionError checks if the error is a connection-related error +func IsConnectionError(err error) bool { + if err == nil { + return false + } + errStr := err.Error() + return strings.Contains(errStr, "closed") || + strings.Contains(errStr, "EOF") || + strings.Contains(errStr, "connection refused") || + strings.Contains(errStr, "connection reset") || + strings.Contains(errStr, "broken pipe") || + strings.Contains(errStr, "timeout") +} + +// Reconnect attempts to create a new Redis connection +func (s *RedisService) Reconnect() error { + s.mu.Lock() + defer s.mu.Unlock() + + // Close existing client if possible + if s.client != nil { + s.client.Close() + } + + // Create new client + newClient, err := (radix.PoolConfig{}).New(context.Background(), "tcp", s.redisURL) + if err != nil { + return err + } + + s.client = newClient + return nil } // ScanKeys scans keys asynchroniously and sends them to the returned channel -func (s RedisService) ScanKeys(ctx context.Context, options ScanOptions) <-chan string { +func (s *RedisService) ScanKeys(ctx context.Context, options ScanOptions) <-chan string { resultChan := make(chan string) scanOpts := radix.ScannerConfig{ @@ -42,7 +81,9 @@ func (s RedisService) ScanKeys(ctx context.Context, options ScanOptions) <-chan go func() { defer close(resultChan) var key string + s.mu.RLock() radixScanner := scanOpts.New(s.client) + s.mu.RUnlock() for radixScanner.Next(ctx, &key) { resultChan <- key if options.Throttle > 0 { @@ -55,7 +96,10 @@ func (s RedisService) ScanKeys(ctx context.Context, options ScanOptions) <-chan } // GetKeysCount returns number of keys in the current database -func (s RedisService) GetKeysCount(ctx context.Context) (int64, error) { +func (s *RedisService) GetKeysCount(ctx context.Context) (int64, error) { + s.mu.RLock() + defer s.mu.RUnlock() + var keysCount int64 err := s.client.Do(context.Background(), radix.Cmd(&keysCount, "DBSIZE")) if err != nil { @@ -66,7 +110,10 @@ func (s RedisService) GetKeysCount(ctx context.Context) (int64, error) { } // GetMemoryUsage returns memory usage of given key -func (s RedisService) GetMemoryUsage(ctx context.Context, key string) (int64, error) { +func (s *RedisService) GetMemoryUsage(ctx context.Context, key string) (int64, error) { + s.mu.RLock() + defer s.mu.RUnlock() + var res int64 err := s.client.Do(context.Background(), radix.Cmd(&res, "MEMORY", "USAGE", key)) if err != nil { diff --git a/src/scanner/scanner.go b/src/scanner/scanner.go index 019c30d..d63c3b4 100644 --- a/src/scanner/scanner.go +++ b/src/scanner/scanner.go @@ -4,6 +4,7 @@ import ( "context" "runtime" "sync" + "time" "github.com/rs/zerolog" "github.com/spinute/redis-inventory/src/adapter" @@ -15,6 +16,7 @@ type RedisServiceInterface interface { ScanKeys(ctx context.Context, options adapter.ScanOptions) <-chan string GetKeysCount(ctx context.Context) (int64, error) GetMemoryUsage(ctx context.Context, key string) (int64, error) + Reconnect() error } // RedisScanner scans redis keys and puts them in a trie @@ -53,10 +55,39 @@ func (s *RedisScanner) Scan(options adapter.ScanOptions, result *trie.Trie) { for key := range keys { s.scanProgress.Increment() - res, err := s.redisService.GetMemoryUsage(context.Background(), key) + + var res int64 + var err error + maxRetries := 3 + + for attempt := 0; attempt < maxRetries; attempt++ { + res, err = s.redisService.GetMemoryUsage(context.Background(), key) + if err == nil { + break + } + + // Check if it's a connection error + if adapter.IsConnectionError(err) { + s.logger.Warn().Err(err).Msgf("Connection error on key %s (attempt %d/%d), reconnecting...", key, attempt+1, maxRetries) + + // Wait before reconnecting + time.Sleep(time.Second * time.Duration(attempt+1)) + + // Attempt to reconnect + if reconnErr := s.redisService.Reconnect(); reconnErr != nil { + s.logger.Error().Err(reconnErr).Msg("Failed to reconnect") + continue + } + s.logger.Info().Msg("Successfully reconnected to Redis") + } else { + // Non-connection error, don't retry + break + } + } + if err != nil { - s.logger.Error().Err(err).Msgf("Error dumping key %s", key) - return + s.logger.Warn().Err(err).Msgf("Error dumping key %s after %d attempts, skipping", key, maxRetries) + continue } result.Add(