Skip to content
211 changes: 147 additions & 64 deletions cmd/rxtls/main.go

Large diffs are not rendered by default.

55 changes: 52 additions & 3 deletions internal/certlib/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,15 @@ func processOldFormat(ctlResponse *CTLResponse) ([]CTLogInfo, error) {
// GetLogInfo retrieves the tree size from a CT log.
// Operation: Network bound. Allocates during HTTP fetch and JSON parsing.
func GetLogInfo(ctlog *CTLogInfo) error {
return GetLogInfoWithContext(context.Background(), ctlog)
}

// GetLogInfoWithContext retrieves the tree size from a CT log using the provided context.
// Operation: Network bound. Allocates during HTTP fetch and JSON parsing.
func GetLogInfoWithContext(ctx context.Context, ctlog *CTLogInfo) error {
if ctx == nil {
ctx = context.Background()
}
// Use shared HTTP client
httpClient := client.GetHTTPClient()

Expand All @@ -293,9 +302,21 @@ func GetLogInfo(ctlog *CTLogInfo) error {
var err error
maxRetries := 3
retryDelay := 100 * time.Millisecond
var retryTimer *time.Timer
defer func() {
if retryTimer != nil {
retryTimer.Stop()
}
}()

for attempt := range maxRetries {
resp, err = httpClient.Get(url)
req, reqErr := http.NewRequestWithContext(ctx, "GET", url, nil)
if reqErr != nil {
return fmt.Errorf("error creating request: %w", reqErr)
}
req.Header.Set("User-Agent", "rxtls (+https://github.com/x-stp/rxtls)")

resp, err = httpClient.Do(req)
if err == nil && resp.StatusCode == http.StatusOK {
break
}
Expand All @@ -304,10 +325,27 @@ func GetLogInfo(ctlog *CTLogInfo) error {
resp.Body.Close()
}

// Check if context is cancelled before retrying
if ctx.Err() != nil {
return ctx.Err()
}

if attempt < maxRetries-1 {
log.Printf("Retrying GetLogInfo for %s after error: %v (attempt %d/%d)",
ctlog.URL, err, attempt+1, maxRetries)
time.Sleep(retryDelay)

// Use context-aware sleep
if retryTimer == nil {
retryTimer = time.NewTimer(retryDelay)
} else {
retryTimer.Reset(retryDelay)
}
select {
case <-retryTimer.C:
case <-ctx.Done():
return ctx.Err()
}

retryDelay *= 2 // Exponential backoff
}
}
Expand Down Expand Up @@ -355,6 +393,12 @@ func DownloadEntries(ctx context.Context, ctlog *CTLogInfo, start, end int) (*En
var resp *http.Response
maxRetries := 3
retryDelay := 500 * time.Millisecond
var retryTimer *time.Timer
defer func() {
if retryTimer != nil {
retryTimer.Stop()
}
}()

for attempt := range maxRetries {
resp, err = httpClient.Do(req)
Expand All @@ -376,8 +420,13 @@ func DownloadEntries(ctx context.Context, ctlog *CTLogInfo, start, end int) (*En
ctlog.URL, start, end, err, attempt+1, maxRetries)

// Use context-aware sleep
if retryTimer == nil {
retryTimer = time.NewTimer(retryDelay)
} else {
retryTimer.Reset(retryDelay)
}
select {
case <-time.After(retryDelay):
case <-retryTimer.C:
retryDelay *= 2 // Exponential backoff
case <-ctx.Done():
return nil, ctx.Err()
Expand Down
11 changes: 7 additions & 4 deletions internal/certlib/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,12 +297,15 @@ func NormalizeDomain(domain string) string {
if domain == "" {
return ""
}
if len(domain) > 2 && domain[:2] == "*." {
domain = domain[2:] // Strip leading wildcard
}

// Preserve wildcard labels. We normalize case/dots but do not strip leading "*.".
// This keeps inputs like "*.example.com" stable, while still rejecting clearly invalid labels below.
parts := strings.SplitSeq(domain, ".")
for part := range parts {
if strings.HasPrefix(part, "-") || strings.HasSuffix(part, "-") || strings.HasPrefix(part, "*") {
if strings.HasPrefix(part, "-") || strings.HasSuffix(part, "-") {
return domain // Invalid label structure
}
if strings.HasPrefix(part, "*") && part != "*" {
return domain // Invalid label structure after potential stripping
}
}
Expand Down
43 changes: 42 additions & 1 deletion internal/client/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ type Config struct {
IdleConnTimeout time.Duration
// MaxIdleConns controls the maximum number of idle (keep-alive) connections across all hosts.
MaxIdleConns int
// MaxIdleConnsPerHost is the maximum number of idle (keep-alive) connections to keep per host.
MaxIdleConnsPerHost int
// MaxConnsPerHost controls the maximum number of connections per host, including connections in the dialing,
// active, and idle states. On limit violation, dials will block.
MaxConnsPerHost int
Expand All @@ -100,6 +102,7 @@ func DefaultConfig() *Config {
KeepAliveTimeout: defaultKeepAliveTimeout,
IdleConnTimeout: defaultIdleConnTimeout,
MaxIdleConns: defaultMaxIdleConns,
MaxIdleConnsPerHost: MaxIdleConnsPerHost,
MaxConnsPerHost: defaultMaxConnsPerHost,
RequestTimeout: defaultRequestTimeout,
}
Expand All @@ -120,6 +123,39 @@ func InitHTTPClient(config *Config) {
config = DefaultConfig()
}

// Any non zero vals coming in from e.g. ConfigureFigureMode
// or potential libs calling this - set something; don't
// assume.
if config.DialTimeout == 0 {
config.DialTimeout = defaultDialTimeout
}
if config.KeepAliveTimeout == 0 {
config.KeepAliveTimeout = defaultKeepAliveTimeout
}
if config.IdleConnTimeout == 0 {
config.IdleConnTimeout = defaultIdleConnTimeout
}
if config.MaxIdleConns == 0 {
config.MaxIdleConns = defaultMaxIdleConns
}
if config.MaxIdleConnsPerHost == 0 {
config.MaxIdleConnsPerHost = MaxIdleConnsPerHost
}
if config.MaxConnsPerHost == 0 {
config.MaxConnsPerHost = defaultMaxConnsPerHost
}
if config.RequestTimeout == 0 {
config.RequestTimeout = defaultRequestTimeout
}

// If we're reinitializing an existing client, close idle connections on the old transport.
// This helps avoid leaking idle keep-alive connections across reconfigs.
if sharedClient != nil {
if oldTransport, ok := sharedClient.Transport.(*http.Transport); ok && oldTransport != nil {
oldTransport.CloseIdleConnections()
}
}

// Configure the transport with timeouts and connection pooling options.
// ForceAttemptHTTP2 is enabled to prefer HTTP/2 if available.
transport := &http.Transport{
Expand All @@ -129,8 +165,12 @@ func InitHTTPClient(config *Config) {
KeepAlive: config.KeepAliveTimeout, // Enables TCP keep-alives.
}).DialContext,
MaxIdleConns: config.MaxIdleConns,
MaxIdleConnsPerHost: config.MaxConnsPerHost,
MaxIdleConnsPerHost: config.MaxIdleConnsPerHost,
MaxConnsPerHost: config.MaxConnsPerHost,
IdleConnTimeout: config.IdleConnTimeout,
TLSHandshakeTimeout: 10 * time.Second,
ResponseHeaderTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
DisableCompression: false, // Enable compression (e.g., gzip) by default.
ForceAttemptHTTP2: true, // Try to use HTTP/2.
}
Expand Down Expand Up @@ -178,6 +218,7 @@ func ConfigureTurboMode() {
KeepAliveTimeout: 120 * time.Second, // Keep connections alive longer.
IdleConnTimeout: 120 * time.Second, // Allow idle connections to persist longer.
MaxIdleConns: 500, // Larger overall idle connection pool.
MaxIdleConnsPerHost: 200, // Larger per-host idle pool.
MaxConnsPerHost: 200, // More connections allowed per host.
RequestTimeout: 30 * time.Second, // Slightly longer request timeout for potentially slower turbo operations.
}
Expand Down
48 changes: 48 additions & 0 deletions internal/client/http_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package client

import (
"net/http"
"testing"
)

func TestInitHTTPClientFillsDefaults(t *testing.T) {
sharedClient = nil
clientInitialized = false

InitHTTPClient(&Config{})
c := GetHTTPClient()

tr, ok := c.Transport.(*http.Transport)
if !ok || tr == nil {
t.Fatalf("expected *http.Transport, got %T", c.Transport)
}
if tr.MaxIdleConns == 0 {
t.Fatalf("expected MaxIdleConns defaulted, got %d", tr.MaxIdleConns)
}
if tr.MaxIdleConnsPerHost == 0 {
t.Fatalf("expected MaxIdleConnsPerHost defaulted, got %d", tr.MaxIdleConnsPerHost)
}
if tr.MaxConnsPerHost == 0 {
t.Fatalf("expected MaxConnsPerHost defaulted, got %d", tr.MaxConnsPerHost)
}
}

func TestConfigureTurboModeSetsPerHostIdleConns(t *testing.T) {
sharedClient = nil
clientInitialized = false

ConfigureTurboMode()
c := GetHTTPClient()

tr, ok := c.Transport.(*http.Transport)
if !ok || tr == nil {
t.Fatalf("expected *http.Transport, got %T", c.Transport)
}
if tr.MaxIdleConnsPerHost == 0 {
t.Fatalf("expected MaxIdleConnsPerHost set, got %d", tr.MaxIdleConnsPerHost)
}
if tr.MaxConnsPerHost == 0 {
t.Fatalf("expected MaxConnsPerHost set, got %d", tr.MaxConnsPerHost)
}
}

129 changes: 64 additions & 65 deletions internal/core/domain_extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type DomainExtractor struct {
// setupComplete indicates atomically whether the initial setup phase (STH fetching for all logs) has finished.
// This is used by the Shutdown method to decide whether to rename temporary output files.
setupComplete atomic.Bool
shutdownOnce sync.Once
}

// DomainExtractorConfig holds configuration parameters specific to the domain extraction process.
Expand Down Expand Up @@ -721,84 +722,82 @@ func (de *DomainExtractor) domainExtractorCallback(item *WorkItem) error {
// File operations (flush, close, rename) can block briefly on I/O.
// This method is idempotent; calling it multiple times will not cause issues.
func (de *DomainExtractor) Shutdown() {
// Check if already shutting down or shut down by inspecting the context.
if de.ctx.Err() != nil {
return
}
log.Println("Shutting down Domain Extractor...")
de.cancel() // Signal all operations using this DomainExtractor's context to stop.
de.shutdownOnce.Do(func() {
log.Println("Shutting down Domain Extractor...")
de.cancel() // Signal all operations using this DomainExtractor's context to stop.

if de.scheduler != nil {
// This will cancel worker contexts and wait for them to finish their current items.
de.scheduler.Shutdown()
}

log.Println("Flushing and closing output writers...")
var successCount, errorCount int
// Iterate over all output writers and flush/close them.
de.outputMap.Range(func(key, value interface{}) bool {
if value == nil { // Should not happen with proper map usage.
return true
}
lw, ok := value.(*lockedWriter)
if !ok || lw == nil {
log.Printf("Warning: Invalid type found in outputMap for key %v during shutdown", key)
return true
if de.scheduler != nil {
// This will cancel worker contexts and wait for them to finish their current items.
de.scheduler.Shutdown()
}

// Perform close operations under lock for each writer.
func() {
lw.mu.Lock()
defer lw.mu.Unlock()
log.Println("Flushing and closing output writers...")
var successCount, errorCount int
// Iterate over all output writers and flush/close them.
de.outputMap.Range(func(key, value interface{}) bool {
if value == nil { // Should not happen with proper map usage.
return true
}
lw, ok := value.(*lockedWriter)
if !ok || lw == nil {
log.Printf("Warning: Invalid type found in outputMap for key %v during shutdown", key)
return true
}

var opErrors []string
// Perform close operations under lock for each writer.
func() {
lw.mu.Lock()
defer lw.mu.Unlock()

// Flush the primary buffered writer.
if lw.writer != nil {
if err := lw.writer.Flush(); err != nil {
msg := fmt.Sprintf("Error flushing writer for %s: %v", key.(string), err)
log.Println(msg)
opErrors = append(opErrors, msg)
var opErrors []string

// Flush the primary buffered writer.
if lw.writer != nil {
if err := lw.writer.Flush(); err != nil {
msg := fmt.Sprintf("Error flushing writer for %s: %v", key.(string), err)
log.Println(msg)
opErrors = append(opErrors, msg)
}
}
}
// Close the gzip writer if it exists (this also flushes it).
if lw.gzWriter != nil {
if err := lw.gzWriter.Close(); err != nil {
msg := fmt.Sprintf("Error closing gzip writer for %s: %v", key.(string), err)
log.Println(msg)
opErrors = append(opErrors, msg)
// Close the gzip writer if it exists (this also flushes it).
if lw.gzWriter != nil {
if err := lw.gzWriter.Close(); err != nil {
msg := fmt.Sprintf("Error closing gzip writer for %s: %v", key.(string), err)
log.Println(msg)
opErrors = append(opErrors, msg)
}
}
}
// Close the underlying file.
if lw.file != nil {
if err := lw.file.Close(); err != nil {
msg := fmt.Sprintf("Error closing file for %s: %v", key.(string), err)
log.Println(msg)
opErrors = append(opErrors, msg)
// Close the underlying file.
if lw.file != nil {
if err := lw.file.Close(); err != nil {
msg := fmt.Sprintf("Error closing file for %s: %v", key.(string), err)
log.Println(msg)
opErrors = append(opErrors, msg)
}
}
}

// Rename the temporary file to its final name only if all ops were successful and setup was complete.
if len(opErrors) == 0 && de.setupComplete.Load() && lw.filePath != "" && lw.finalPath != "" {
if err := os.Rename(lw.filePath, lw.finalPath); err != nil {
log.Printf("Error renaming temp file %s to %s: %v", lw.filePath, lw.finalPath, err)
// Rename the temporary file to its final name only if all ops were successful and setup was complete.
if len(opErrors) == 0 && de.setupComplete.Load() && lw.filePath != "" && lw.finalPath != "" {
if err := os.Rename(lw.filePath, lw.finalPath); err != nil {
log.Printf("Error renaming temp file %s to %s: %v", lw.filePath, lw.finalPath, err)
errorCount++
} else {
successCount++
}
} else if len(opErrors) > 0 {
errorCount++
} else {
successCount++
}
} else if len(opErrors) > 0 {
errorCount++
// If there were errors, attempt to remove the temporary file to avoid leaving corrupt data.
if lw.filePath != "" {
if removeErr := os.Remove(lw.filePath); removeErr != nil {
log.Printf("Warning: Failed to remove temporary file %s after errors: %v", lw.filePath, removeErr)
// If there were errors, attempt to remove the temporary file to avoid leaving corrupt data.
if lw.filePath != "" {
if removeErr := os.Remove(lw.filePath); removeErr != nil {
log.Printf("Warning: Failed to remove temporary file %s after errors: %v", lw.filePath, removeErr)
}
}
}
}
}()
return true // Continue iterating over the map.
}()
return true // Continue iterating over the map.
})
log.Printf("Domain Extractor shutdown complete. Finalized %d files with %d errors.", successCount, errorCount)
})
log.Printf("Domain Extractor shutdown complete. Finalized %d files with %d errors.", successCount, errorCount)
}

// GetStats returns a pointer to the DomainExtractorStats struct, allowing callers to read current statistics.
Expand Down
Loading
Loading