diff --git a/cmd/publish.go b/cmd/publish.go index 34ea667..73ca564 100644 --- a/cmd/publish.go +++ b/cmd/publish.go @@ -27,6 +27,7 @@ var ( //optional serviceURL string useGRPCPub bool // gRPC flag for publish + noDedup bool // Flag to disable deduplication (no timestamp) ) // PublishPayload matches the expected JSON body on the server @@ -34,7 +35,7 @@ type PublishRequest struct { ClientID string `json:"client_id"` Topic string `json:"topic"` Message string `json:"message"` - Timestamp int64 `json:"timestamp"` + Timestamp *int64 `json:"timestamp,omitempty"` // Optional timestamp for duplicate detection } // addDebugPrefix adds debug information prefix to message data @@ -194,11 +195,22 @@ var publishCmd = &cobra.Command{ publishData = addDebugPrefix(data, proxyAddr) } + var timestamp *int64 + if noDedup { + // When --no-dedup is set, omit timestamp so proxy uses MsgHash(topic, message) + // instead of MsgHashWithTimestamp. This allows same message to be sent once per session + timestamp = nil + } else { + // Default: include timestamp so proxy uses MsgHashWithTimestamp + ts := time.Now().UnixMilli() + timestamp = &ts + } + reqData := PublishRequest{ ClientID: clientIDToUse, Topic: pubTopic, Message: string(publishData), // use modified data with debug prefix if enabled - Timestamp: time.Now().UnixMilli(), + Timestamp: timestamp, } reqBytes, err := json.Marshal(reqData) if err != nil { @@ -251,6 +263,7 @@ func init() { publishCmd.Flags().StringVar(&file, "file", "", "Path of the file to publish (max 10MB)") publishCmd.Flags().StringVar(&serviceURL, "service-url", "", "Override the default service URL") publishCmd.Flags().BoolVar(&useGRPCPub, "grpc", false, "Use gRPC for publishing instead of HTTP") + publishCmd.Flags().BoolVar(&noDedup, "no-dedup", false, "Omit timestamp to use MsgHash(topic, message) instead of MsgHashWithTimestamp. Allows same message to be sent once per session (useful for stress testing/benchmarking)") publishCmd.MarkFlagRequired("topic") //nolint:errcheck rootCmd.AddCommand(publishCmd) } diff --git a/e2e/publish_test.go b/e2e/publish_test.go index bd84f32..a2a74e5 100644 --- a/e2e/publish_test.go +++ b/e2e/publish_test.go @@ -2,6 +2,7 @@ package main import ( "context" + "encoding/json" "fmt" "os" "os/exec" @@ -163,6 +164,99 @@ func TestPublishCommand(t *testing.T) { require.Contains(t, strings.ToLower(out), "only one", "Expected error about using only one option") }) + // Test --no-dedup flag + t.Run("publish with --no-dedup=false (default, timestamp included)", func(t *testing.T) { + dedupTopic := fmt.Sprintf("%s-dedup-default", testTopic) + subCtx, subCancel := context.WithCancel(context.Background()) + defer subCancel() + + subCmd := exec.CommandContext(subCtx, cliBinaryPath, "subscribe", "--topic="+dedupTopic, "--service-url="+serviceURL) + subCmd.Env = os.Environ() + require.NoError(t, subCmd.Start(), "Failed to start subscriber") + time.Sleep(2 * time.Second) + + // Publish same message twice - should get different message IDs (timestamp changes) + msg := "Test dedup message" + out1, err := RunCommand(cliBinaryPath, "publish", + "--topic="+dedupTopic, + "--message="+msg, + "--service-url="+serviceURL) + require.NoError(t, err, "First publish failed: %v\nOutput: %s", err, out1) + + // Extract JSON response and verify status is "published" + var resp1 map[string]interface{} + jsonStart := strings.Index(out1, "{") + require.Greater(t, jsonStart, -1, "No JSON response found in output") + require.NoError(t, json.Unmarshal([]byte(out1[jsonStart:]), &resp1), "Failed to parse JSON response") + require.Equal(t, "published", resp1["status"], "First publish should have status 'published'") + + // Small delay to ensure different timestamp + time.Sleep(100 * time.Millisecond) + + out2, err := RunCommand(cliBinaryPath, "publish", + "--topic="+dedupTopic, + "--message="+msg, + "--service-url="+serviceURL) + require.NoError(t, err, "Second publish failed: %v\nOutput: %s", err, out2) + + // Extract JSON response and verify status is "published" (different timestamp = different message ID) + var resp2 map[string]interface{} + jsonStart = strings.Index(out2, "{") + require.Greater(t, jsonStart, -1, "No JSON response found in output") + require.NoError(t, json.Unmarshal([]byte(out2[jsonStart:]), &resp2), "Failed to parse JSON response") + require.Equal(t, "published", resp2["status"], "Second publish should have status 'published' (different timestamp)") + require.NotEqual(t, resp1["message_id"], resp2["message_id"], "Message IDs should be different (different timestamps)") + + subCancel() + subCmd.Wait() + }) + + t.Run("publish with --no-dedup=true (timestamp omitted)", func(t *testing.T) { + dedupTopic := fmt.Sprintf("%s-dedup-no-timestamp", testTopic) + subCtx, subCancel := context.WithCancel(context.Background()) + defer subCancel() + + subCmd := exec.CommandContext(subCtx, cliBinaryPath, "subscribe", "--topic="+dedupTopic, "--service-url="+serviceURL) + subCmd.Env = os.Environ() + require.NoError(t, subCmd.Start(), "Failed to start subscriber") + time.Sleep(2 * time.Second) + + // Publish same message twice with --no-dedup - second should be deduplicated + msg := "Test no-dedup message" + out1, err := RunCommand(cliBinaryPath, "publish", + "--topic="+dedupTopic, + "--message="+msg, + "--no-dedup", + "--service-url="+serviceURL) + require.NoError(t, err, "First publish failed: %v\nOutput: %s", err, out1) + + // Extract JSON response and verify status is "published" + var resp1 map[string]interface{} + jsonStart := strings.Index(out1, "{") + require.Greater(t, jsonStart, -1, "No JSON response found in output") + require.NoError(t, json.Unmarshal([]byte(out1[jsonStart:]), &resp1), "Failed to parse JSON response") + require.Equal(t, "published", resp1["status"], "First publish should have status 'published'") + + // Publish same message again + out2, err := RunCommand(cliBinaryPath, "publish", + "--topic="+dedupTopic, + "--message="+msg, + "--no-dedup", + "--service-url="+serviceURL) + require.NoError(t, err, "Second publish should not error (deduplicated): %v\nOutput: %s", err, out2) + + // Extract JSON response and verify status is "deduplicated" (same message hash without timestamp) + var resp2 map[string]interface{} + jsonStart = strings.Index(out2, "{") + require.Greater(t, jsonStart, -1, "No JSON response found in output") + require.NoError(t, json.Unmarshal([]byte(out2[jsonStart:]), &resp2), "Failed to parse JSON response") + require.Equal(t, "deduplicated", resp2["status"], "Second publish should have status 'deduplicated' when --no-dedup is used") + require.Equal(t, resp1["message_id"], resp2["message_id"], "Message IDs should be the same (no timestamp)") + + subCancel() + subCmd.Wait() + }) + // Cleanup: stop subscriber cancel() subCmd.Wait()