From 898703335256fa8e50f1e44547811834954c8318 Mon Sep 17 00:00:00 2001 From: swarnabhasinha Date: Tue, 27 Jan 2026 18:33:56 +0530 Subject: [PATCH 1/4] feat: --no-dedup flag for publish command --- cmd/publish.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/cmd/publish.go b/cmd/publish.go index 34ea667..b3aa329 100644 --- a/cmd/publish.go +++ b/cmd/publish.go @@ -27,6 +27,7 @@ var ( //optional serviceURL string useGRPCPub bool // gRPC flag for publish + noDedup bool // Flag to disable deduplication (no timestamp) ) // PublishPayload matches the expected JSON body on the server @@ -34,7 +35,7 @@ type PublishRequest struct { ClientID string `json:"client_id"` Topic string `json:"topic"` Message string `json:"message"` - Timestamp int64 `json:"timestamp"` + Timestamp *int64 `json:"timestamp,omitempty"` // Optional timestamp for duplicate detection } // addDebugPrefix adds debug information prefix to message data @@ -194,11 +195,17 @@ var publishCmd = &cobra.Command{ publishData = addDebugPrefix(data, proxyAddr) } + var timestamp *int64 + if !noDedup { + ts := time.Now().UnixMilli() + timestamp = &ts + } + reqData := PublishRequest{ ClientID: clientIDToUse, Topic: pubTopic, Message: string(publishData), // use modified data with debug prefix if enabled - Timestamp: time.Now().UnixMilli(), + Timestamp: timestamp, } reqBytes, err := json.Marshal(reqData) if err != nil { @@ -251,6 +258,7 @@ func init() { publishCmd.Flags().StringVar(&file, "file", "", "Path of the file to publish (max 10MB)") publishCmd.Flags().StringVar(&serviceURL, "service-url", "", "Override the default service URL") publishCmd.Flags().BoolVar(&useGRPCPub, "grpc", false, "Use gRPC for publishing instead of HTTP") + publishCmd.Flags().BoolVar(&noDedup, "no-dedup", false, "Disable deduplication by omitting timestamp (allows sending identical messages)") publishCmd.MarkFlagRequired("topic") //nolint:errcheck rootCmd.AddCommand(publishCmd) } From 65e68cde51c70604718a05a3dadbb2e700b4568d Mon Sep 17 00:00:00 2001 From: swarnabhasinha Date: Tue, 27 Jan 2026 19:38:36 +0530 Subject: [PATCH 2/4] fix --- e2e/publish_test.go | 71 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/e2e/publish_test.go b/e2e/publish_test.go index bd84f32..5f75a2c 100644 --- a/e2e/publish_test.go +++ b/e2e/publish_test.go @@ -163,6 +163,77 @@ func TestPublishCommand(t *testing.T) { require.Contains(t, strings.ToLower(out), "only one", "Expected error about using only one option") }) + // Test --no-dedup flag + t.Run("publish with --no-dedup=false (default, timestamp included)", func(t *testing.T) { + dedupTopic := fmt.Sprintf("%s-dedup-default", testTopic) + subCtx, subCancel := context.WithCancel(context.Background()) + defer subCancel() + + subCmd := exec.CommandContext(subCtx, cliBinaryPath, "subscribe", "--topic="+dedupTopic, "--service-url="+serviceURL) + subCmd.Env = os.Environ() + require.NoError(t, subCmd.Start(), "Failed to start subscriber") + time.Sleep(2 * time.Second) + + // Publish same message twice - should get different message IDs (timestamp changes) + msg := "Test dedup message" + out1, err := RunCommand(cliBinaryPath, "publish", + "--topic="+dedupTopic, + "--message="+msg, + "--service-url="+serviceURL) + require.NoError(t, err, "First publish failed: %v\nOutput: %s", err, out1) + + // Small delay to ensure different timestamp + time.Sleep(100 * time.Millisecond) + + out2, err := RunCommand(cliBinaryPath, "publish", + "--topic="+dedupTopic, + "--message="+msg, + "--service-url="+serviceURL) + require.NoError(t, err, "Second publish failed: %v\nOutput: %s", err, out2) + + // Both should succeed (different timestamps = different message IDs) + require.Contains(t, out1, "published", "First publish should succeed") + require.Contains(t, out2, "published", "Second publish should succeed (different timestamp)") + + subCancel() + subCmd.Wait() + }) + + t.Run("publish with --no-dedup=true (timestamp omitted)", func(t *testing.T) { + dedupTopic := fmt.Sprintf("%s-dedup-no-timestamp", testTopic) + subCtx, subCancel := context.WithCancel(context.Background()) + defer subCancel() + + subCmd := exec.CommandContext(subCtx, cliBinaryPath, "subscribe", "--topic="+dedupTopic, "--service-url="+serviceURL) + subCmd.Env = os.Environ() + require.NoError(t, subCmd.Start(), "Failed to start subscriber") + time.Sleep(2 * time.Second) + + // Publish same message twice with --no-dedup - second should be deduplicated + msg := "Test no-dedup message" + out1, err := RunCommand(cliBinaryPath, "publish", + "--topic="+dedupTopic, + "--message="+msg, + "--no-dedup", + "--service-url="+serviceURL) + require.NoError(t, err, "First publish failed: %v\nOutput: %s", err, out1) + require.Contains(t, out1, "published", "First publish should succeed") + + // Publish same message again + out2, err := RunCommand(cliBinaryPath, "publish", + "--topic="+dedupTopic, + "--message="+msg, + "--no-dedup", + "--service-url="+serviceURL) + require.NoError(t, err, "Second publish should not error (deduplicated): %v\nOutput: %s", err, out2) + + // Second should be deduplicated (same message hash without timestamp) + require.Contains(t, strings.ToLower(out2), "deduplicated", "Second publish should be deduplicated when --no-dedup is used") + + subCancel() + subCmd.Wait() + }) + // Cleanup: stop subscriber cancel() subCmd.Wait() From e5fa2dc1b60fb1fbe9edf35afca70e28c9da6c6e Mon Sep 17 00:00:00 2001 From: swarnabhasinha Date: Tue, 27 Jan 2026 23:50:12 +0530 Subject: [PATCH 3/4] test: add tests for --no-dedup flag with proper JSON status validation --- e2e/publish_test.go | 35 +++++++++++++++++++++++++++++------ 1 file changed, 29 insertions(+), 6 deletions(-) diff --git a/e2e/publish_test.go b/e2e/publish_test.go index 5f75a2c..a2a74e5 100644 --- a/e2e/publish_test.go +++ b/e2e/publish_test.go @@ -2,6 +2,7 @@ package main import ( "context" + "encoding/json" "fmt" "os" "os/exec" @@ -182,6 +183,13 @@ func TestPublishCommand(t *testing.T) { "--service-url="+serviceURL) require.NoError(t, err, "First publish failed: %v\nOutput: %s", err, out1) + // Extract JSON response and verify status is "published" + var resp1 map[string]interface{} + jsonStart := strings.Index(out1, "{") + require.Greater(t, jsonStart, -1, "No JSON response found in output") + require.NoError(t, json.Unmarshal([]byte(out1[jsonStart:]), &resp1), "Failed to parse JSON response") + require.Equal(t, "published", resp1["status"], "First publish should have status 'published'") + // Small delay to ensure different timestamp time.Sleep(100 * time.Millisecond) @@ -191,9 +199,13 @@ func TestPublishCommand(t *testing.T) { "--service-url="+serviceURL) require.NoError(t, err, "Second publish failed: %v\nOutput: %s", err, out2) - // Both should succeed (different timestamps = different message IDs) - require.Contains(t, out1, "published", "First publish should succeed") - require.Contains(t, out2, "published", "Second publish should succeed (different timestamp)") + // Extract JSON response and verify status is "published" (different timestamp = different message ID) + var resp2 map[string]interface{} + jsonStart = strings.Index(out2, "{") + require.Greater(t, jsonStart, -1, "No JSON response found in output") + require.NoError(t, json.Unmarshal([]byte(out2[jsonStart:]), &resp2), "Failed to parse JSON response") + require.Equal(t, "published", resp2["status"], "Second publish should have status 'published' (different timestamp)") + require.NotEqual(t, resp1["message_id"], resp2["message_id"], "Message IDs should be different (different timestamps)") subCancel() subCmd.Wait() @@ -217,7 +229,13 @@ func TestPublishCommand(t *testing.T) { "--no-dedup", "--service-url="+serviceURL) require.NoError(t, err, "First publish failed: %v\nOutput: %s", err, out1) - require.Contains(t, out1, "published", "First publish should succeed") + + // Extract JSON response and verify status is "published" + var resp1 map[string]interface{} + jsonStart := strings.Index(out1, "{") + require.Greater(t, jsonStart, -1, "No JSON response found in output") + require.NoError(t, json.Unmarshal([]byte(out1[jsonStart:]), &resp1), "Failed to parse JSON response") + require.Equal(t, "published", resp1["status"], "First publish should have status 'published'") // Publish same message again out2, err := RunCommand(cliBinaryPath, "publish", @@ -227,8 +245,13 @@ func TestPublishCommand(t *testing.T) { "--service-url="+serviceURL) require.NoError(t, err, "Second publish should not error (deduplicated): %v\nOutput: %s", err, out2) - // Second should be deduplicated (same message hash without timestamp) - require.Contains(t, strings.ToLower(out2), "deduplicated", "Second publish should be deduplicated when --no-dedup is used") + // Extract JSON response and verify status is "deduplicated" (same message hash without timestamp) + var resp2 map[string]interface{} + jsonStart = strings.Index(out2, "{") + require.Greater(t, jsonStart, -1, "No JSON response found in output") + require.NoError(t, json.Unmarshal([]byte(out2[jsonStart:]), &resp2), "Failed to parse JSON response") + require.Equal(t, "deduplicated", resp2["status"], "Second publish should have status 'deduplicated' when --no-dedup is used") + require.Equal(t, resp1["message_id"], resp2["message_id"], "Message IDs should be the same (no timestamp)") subCancel() subCmd.Wait() From 474f4dcf73ee851549fc55cafb64bcb1e6ba202a Mon Sep 17 00:00:00 2001 From: swarnabhasinha Date: Wed, 28 Jan 2026 00:23:04 +0530 Subject: [PATCH 4/4] fix --- cmd/publish.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/cmd/publish.go b/cmd/publish.go index b3aa329..73ca564 100644 --- a/cmd/publish.go +++ b/cmd/publish.go @@ -196,7 +196,12 @@ var publishCmd = &cobra.Command{ } var timestamp *int64 - if !noDedup { + if noDedup { + // When --no-dedup is set, omit timestamp so proxy uses MsgHash(topic, message) + // instead of MsgHashWithTimestamp. This allows same message to be sent once per session + timestamp = nil + } else { + // Default: include timestamp so proxy uses MsgHashWithTimestamp ts := time.Now().UnixMilli() timestamp = &ts } @@ -258,7 +263,7 @@ func init() { publishCmd.Flags().StringVar(&file, "file", "", "Path of the file to publish (max 10MB)") publishCmd.Flags().StringVar(&serviceURL, "service-url", "", "Override the default service URL") publishCmd.Flags().BoolVar(&useGRPCPub, "grpc", false, "Use gRPC for publishing instead of HTTP") - publishCmd.Flags().BoolVar(&noDedup, "no-dedup", false, "Disable deduplication by omitting timestamp (allows sending identical messages)") + publishCmd.Flags().BoolVar(&noDedup, "no-dedup", false, "Omit timestamp to use MsgHash(topic, message) instead of MsgHashWithTimestamp. Allows same message to be sent once per session (useful for stress testing/benchmarking)") publishCmd.MarkFlagRequired("topic") //nolint:errcheck rootCmd.AddCommand(publishCmd) }