Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions cmd/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@ var (
//optional
serviceURL string
useGRPCPub bool // gRPC flag for publish
noDedup bool // Flag to disable deduplication (no timestamp)
)

// PublishPayload matches the expected JSON body on the server
type PublishRequest struct {
ClientID string `json:"client_id"`
Topic string `json:"topic"`
Message string `json:"message"`
Timestamp int64 `json:"timestamp"`
Timestamp *int64 `json:"timestamp,omitempty"` // Optional timestamp for duplicate detection
}

// addDebugPrefix adds debug information prefix to message data
Expand Down Expand Up @@ -194,11 +195,22 @@ var publishCmd = &cobra.Command{
publishData = addDebugPrefix(data, proxyAddr)
}

var timestamp *int64
if noDedup {
// When --no-dedup is set, omit timestamp so proxy uses MsgHash(topic, message)
// instead of MsgHashWithTimestamp. This allows same message to be sent once per session
timestamp = nil
} else {
// Default: include timestamp so proxy uses MsgHashWithTimestamp
ts := time.Now().UnixMilli()
timestamp = &ts
}

reqData := PublishRequest{
ClientID: clientIDToUse,
Topic: pubTopic,
Message: string(publishData), // use modified data with debug prefix if enabled
Timestamp: time.Now().UnixMilli(),
Timestamp: timestamp,
}
reqBytes, err := json.Marshal(reqData)
if err != nil {
Expand Down Expand Up @@ -251,6 +263,7 @@ func init() {
publishCmd.Flags().StringVar(&file, "file", "", "Path of the file to publish (max 10MB)")
publishCmd.Flags().StringVar(&serviceURL, "service-url", "", "Override the default service URL")
publishCmd.Flags().BoolVar(&useGRPCPub, "grpc", false, "Use gRPC for publishing instead of HTTP")
publishCmd.Flags().BoolVar(&noDedup, "no-dedup", false, "Omit timestamp to use MsgHash(topic, message) instead of MsgHashWithTimestamp. Allows same message to be sent once per session (useful for stress testing/benchmarking)")
publishCmd.MarkFlagRequired("topic") //nolint:errcheck
rootCmd.AddCommand(publishCmd)
}
94 changes: 94 additions & 0 deletions e2e/publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"encoding/json"
"fmt"
"os"
"os/exec"
Expand Down Expand Up @@ -163,6 +164,99 @@ func TestPublishCommand(t *testing.T) {
require.Contains(t, strings.ToLower(out), "only one", "Expected error about using only one option")
})

// Test --no-dedup flag
t.Run("publish with --no-dedup=false (default, timestamp included)", func(t *testing.T) {
dedupTopic := fmt.Sprintf("%s-dedup-default", testTopic)
subCtx, subCancel := context.WithCancel(context.Background())
defer subCancel()

subCmd := exec.CommandContext(subCtx, cliBinaryPath, "subscribe", "--topic="+dedupTopic, "--service-url="+serviceURL)
subCmd.Env = os.Environ()
require.NoError(t, subCmd.Start(), "Failed to start subscriber")
time.Sleep(2 * time.Second)

// Publish same message twice - should get different message IDs (timestamp changes)
msg := "Test dedup message"
out1, err := RunCommand(cliBinaryPath, "publish",
"--topic="+dedupTopic,
"--message="+msg,
"--service-url="+serviceURL)
require.NoError(t, err, "First publish failed: %v\nOutput: %s", err, out1)

// Extract JSON response and verify status is "published"
var resp1 map[string]interface{}
jsonStart := strings.Index(out1, "{")
require.Greater(t, jsonStart, -1, "No JSON response found in output")
require.NoError(t, json.Unmarshal([]byte(out1[jsonStart:]), &resp1), "Failed to parse JSON response")
require.Equal(t, "published", resp1["status"], "First publish should have status 'published'")

// Small delay to ensure different timestamp
time.Sleep(100 * time.Millisecond)

out2, err := RunCommand(cliBinaryPath, "publish",
"--topic="+dedupTopic,
"--message="+msg,
"--service-url="+serviceURL)
require.NoError(t, err, "Second publish failed: %v\nOutput: %s", err, out2)

// Extract JSON response and verify status is "published" (different timestamp = different message ID)
var resp2 map[string]interface{}
jsonStart = strings.Index(out2, "{")
require.Greater(t, jsonStart, -1, "No JSON response found in output")
require.NoError(t, json.Unmarshal([]byte(out2[jsonStart:]), &resp2), "Failed to parse JSON response")
require.Equal(t, "published", resp2["status"], "Second publish should have status 'published' (different timestamp)")
require.NotEqual(t, resp1["message_id"], resp2["message_id"], "Message IDs should be different (different timestamps)")

subCancel()
subCmd.Wait()
})

t.Run("publish with --no-dedup=true (timestamp omitted)", func(t *testing.T) {
dedupTopic := fmt.Sprintf("%s-dedup-no-timestamp", testTopic)
subCtx, subCancel := context.WithCancel(context.Background())
defer subCancel()

subCmd := exec.CommandContext(subCtx, cliBinaryPath, "subscribe", "--topic="+dedupTopic, "--service-url="+serviceURL)
subCmd.Env = os.Environ()
require.NoError(t, subCmd.Start(), "Failed to start subscriber")
time.Sleep(2 * time.Second)

// Publish same message twice with --no-dedup - second should be deduplicated
msg := "Test no-dedup message"
out1, err := RunCommand(cliBinaryPath, "publish",
"--topic="+dedupTopic,
"--message="+msg,
"--no-dedup",
"--service-url="+serviceURL)
require.NoError(t, err, "First publish failed: %v\nOutput: %s", err, out1)

// Extract JSON response and verify status is "published"
var resp1 map[string]interface{}
jsonStart := strings.Index(out1, "{")
require.Greater(t, jsonStart, -1, "No JSON response found in output")
require.NoError(t, json.Unmarshal([]byte(out1[jsonStart:]), &resp1), "Failed to parse JSON response")
require.Equal(t, "published", resp1["status"], "First publish should have status 'published'")

// Publish same message again
out2, err := RunCommand(cliBinaryPath, "publish",
"--topic="+dedupTopic,
"--message="+msg,
"--no-dedup",
"--service-url="+serviceURL)
require.NoError(t, err, "Second publish should not error (deduplicated): %v\nOutput: %s", err, out2)

// Extract JSON response and verify status is "deduplicated" (same message hash without timestamp)
var resp2 map[string]interface{}
jsonStart = strings.Index(out2, "{")
require.Greater(t, jsonStart, -1, "No JSON response found in output")
require.NoError(t, json.Unmarshal([]byte(out2[jsonStart:]), &resp2), "Failed to parse JSON response")
require.Equal(t, "deduplicated", resp2["status"], "Second publish should have status 'deduplicated' when --no-dedup is used")
require.Equal(t, resp1["message_id"], resp2["message_id"], "Message IDs should be the same (no timestamp)")

subCancel()
subCmd.Wait()
})

// Cleanup: stop subscriber
cancel()
subCmd.Wait()
Expand Down
Loading