Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/app/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ var indexCmd = &cobra.Command{
}

redisScanner := scanner.NewScanner(
adapter.NewRedisService(clientSource),
adapter.NewRedisService(clientSource, args[0]),
adapter.NewPrettyProgressWriter(os.Stdout),
consoleLogger,
)
Expand Down
2 changes: 1 addition & 1 deletion cmd/app/inventory.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ var scanCmd = &cobra.Command{
}

redisScanner := scanner.NewScanner(
adapter.NewRedisService(clientSource),
adapter.NewRedisService(clientSource, args[0]),
adapter.NewPrettyProgressWriter(os.Stdout),
consoleLogger,
)
Expand Down
61 changes: 54 additions & 7 deletions src/adapter/rservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package adapter

import (
"context"
"strings"
"sync"
"time"

"github.com/mediocregopher/radix/v4"
Expand All @@ -15,19 +17,56 @@ type ScanOptions struct {
}

// NewRedisService creates RedisService
func NewRedisService(client radix.Client) RedisService {
return RedisService{
client: client,
func NewRedisService(client radix.Client, redisURL string) *RedisService {
return &RedisService{
client: client,
redisURL: redisURL,
}
}

// RedisService implementation for iteration over redis
type RedisService struct {
client radix.Client
client radix.Client
redisURL string
mu sync.RWMutex
}

// IsConnectionError checks if the error is a connection-related error
func IsConnectionError(err error) bool {
if err == nil {
return false
}
errStr := err.Error()
return strings.Contains(errStr, "closed") ||
strings.Contains(errStr, "EOF") ||
strings.Contains(errStr, "connection refused") ||
strings.Contains(errStr, "connection reset") ||
strings.Contains(errStr, "broken pipe") ||
strings.Contains(errStr, "timeout")
}

// Reconnect attempts to create a new Redis connection
func (s *RedisService) Reconnect() error {
s.mu.Lock()
defer s.mu.Unlock()

// Close existing client if possible
if s.client != nil {
s.client.Close()
}

// Create new client
newClient, err := (radix.PoolConfig{}).New(context.Background(), "tcp", s.redisURL)
if err != nil {
return err
}

s.client = newClient
return nil
}

// ScanKeys scans keys asynchroniously and sends them to the returned channel
func (s RedisService) ScanKeys(ctx context.Context, options ScanOptions) <-chan string {
func (s *RedisService) ScanKeys(ctx context.Context, options ScanOptions) <-chan string {
resultChan := make(chan string)

scanOpts := radix.ScannerConfig{
Expand All @@ -42,7 +81,9 @@ func (s RedisService) ScanKeys(ctx context.Context, options ScanOptions) <-chan
go func() {
defer close(resultChan)
var key string
s.mu.RLock()
radixScanner := scanOpts.New(s.client)
s.mu.RUnlock()
for radixScanner.Next(ctx, &key) {
resultChan <- key
if options.Throttle > 0 {
Expand All @@ -55,7 +96,10 @@ func (s RedisService) ScanKeys(ctx context.Context, options ScanOptions) <-chan
}

// GetKeysCount returns number of keys in the current database
func (s RedisService) GetKeysCount(ctx context.Context) (int64, error) {
func (s *RedisService) GetKeysCount(ctx context.Context) (int64, error) {
s.mu.RLock()
defer s.mu.RUnlock()

var keysCount int64
err := s.client.Do(context.Background(), radix.Cmd(&keysCount, "DBSIZE"))
if err != nil {
Expand All @@ -66,7 +110,10 @@ func (s RedisService) GetKeysCount(ctx context.Context) (int64, error) {
}

// GetMemoryUsage returns memory usage of given key
func (s RedisService) GetMemoryUsage(ctx context.Context, key string) (int64, error) {
func (s *RedisService) GetMemoryUsage(ctx context.Context, key string) (int64, error) {
s.mu.RLock()
defer s.mu.RUnlock()

var res int64
err := s.client.Do(context.Background(), radix.Cmd(&res, "MEMORY", "USAGE", key))
if err != nil {
Expand Down
37 changes: 34 additions & 3 deletions src/scanner/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"runtime"
"sync"
"time"

"github.com/rs/zerolog"
"github.com/spinute/redis-inventory/src/adapter"
Expand All @@ -15,6 +16,7 @@ type RedisServiceInterface interface {
ScanKeys(ctx context.Context, options adapter.ScanOptions) <-chan string
GetKeysCount(ctx context.Context) (int64, error)
GetMemoryUsage(ctx context.Context, key string) (int64, error)
Reconnect() error
}

// RedisScanner scans redis keys and puts them in a trie
Expand Down Expand Up @@ -53,10 +55,39 @@ func (s *RedisScanner) Scan(options adapter.ScanOptions, result *trie.Trie) {

for key := range keys {
s.scanProgress.Increment()
res, err := s.redisService.GetMemoryUsage(context.Background(), key)

var res int64
var err error
maxRetries := 3

for attempt := 0; attempt < maxRetries; attempt++ {
res, err = s.redisService.GetMemoryUsage(context.Background(), key)
if err == nil {
break
}

// Check if it's a connection error
if adapter.IsConnectionError(err) {
s.logger.Warn().Err(err).Msgf("Connection error on key %s (attempt %d/%d), reconnecting...", key, attempt+1, maxRetries)

// Wait before reconnecting
time.Sleep(time.Second * time.Duration(attempt+1))

// Attempt to reconnect
if reconnErr := s.redisService.Reconnect(); reconnErr != nil {
s.logger.Error().Err(reconnErr).Msg("Failed to reconnect")
continue
}
s.logger.Info().Msg("Successfully reconnected to Redis")
} else {
// Non-connection error, don't retry
break
}
}

if err != nil {
s.logger.Error().Err(err).Msgf("Error dumping key %s", key)
return
s.logger.Warn().Err(err).Msgf("Error dumping key %s after %d attempts, skipping", key, maxRetries)
continue
}

result.Add(
Expand Down