Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,30 @@
// Licensed under the Apache License, Version 2.0.

using Eventuous.Producers;
using static Eventuous.Azure.ServiceBus.Shared.ServiceBusHelper;

namespace Eventuous.Azure.ServiceBus.Producers;

using Shared;

class ServiceBusMessageBuilder {
readonly IEventSerializer _serializer;
readonly string _streamName;
readonly ServiceBusProduceOptions? _options;
readonly IEventSerializer _serializer;
readonly string _streamName;
readonly ServiceBusProduceOptions? _options;
readonly ServiceBusMessageAttributeNames _attributes;
readonly Action<string>? _setActivityMessageType;
readonly Action<string>? _setActivityMessageType;

public ServiceBusMessageBuilder(
IEventSerializer serializer,
string streamName,
IEventSerializer serializer,
string streamName,
ServiceBusMessageAttributeNames attributes,
ServiceBusProduceOptions? options = null,
Action<string>? setActivityMessageType = null
ServiceBusProduceOptions? options = null,
Action<string>? setActivityMessageType = null
) {
_serializer = serializer;
_streamName = streamName;
_options = options;
_attributes = attributes;
_serializer = serializer;
_streamName = streamName;
_options = options;
_attributes = attributes;
_setActivityMessageType = setActivityMessageType;
}

Expand All @@ -42,13 +43,13 @@ internal ServiceBusMessage CreateServiceBusMessage(ProducedMessage message) {
var metadata = message.Metadata;

var serviceBusMessage = new ServiceBusMessage(payload) {
ContentType = contentType,
MessageId = metadata?.GetValueOrDefault(_attributes.MessageId, message.MessageId)?.ToString(),
Subject = metadata?.GetValueOrDefault(_attributes.Subject, _options?.Subject)?.ToString(),
TimeToLive = _options?.TimeToLive ?? TimeSpan.MaxValue,
ContentType = contentType,
MessageId = metadata?.GetValueOrDefault(_attributes.MessageId, message.MessageId)?.ToString(),
Subject = metadata?.GetValueOrDefault(_attributes.Subject, _options?.Subject)?.ToString(),
TimeToLive = _options?.TimeToLive ?? TimeSpan.MaxValue,
CorrelationId = message.Metadata?.GetCorrelationId(),
To = metadata?.GetValueOrDefault(_attributes.To, _options?.To)?.ToString(),
ReplyTo = metadata?.GetValueOrDefault(_attributes.ReplyTo, _options?.ReplyTo)?.ToString()
To = metadata?.GetValueOrDefault(_attributes.To, _options?.To)?.ToString(),
ReplyTo = metadata?.GetValueOrDefault(_attributes.ReplyTo, _options?.ReplyTo)?.ToString()
};

var reservedAttributes = _attributes.ReservedNames();
Expand All @@ -65,6 +66,6 @@ IEnumerable<KeyValuePair<string, object>> GetCustomApplicationProperties(Produce
.Concat(message.AdditionalHeaders ?? [])
.Concat([new(_attributes.MessageType, messageType), new(_attributes.StreamName, _streamName)])
.Where(pair => !reservedAttributes.Contains(pair.Key))
.Where(pair => pair.Value is not null)
.Select(pair => new KeyValuePair<string, object>(pair.Key, pair.Value!));
}
.Where(pair => IsSerialisableByServiceBus(pair.Value))
.Select(pair => new KeyValuePair<string, object>(pair.Key, pair.Value!));
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
using System;
using System.IO;

namespace Eventuous.Azure.ServiceBus.Shared;

public static class ServiceBusHelper
{
public static bool IsSerialisableByServiceBus(object? value) =>
value is not null && (
value is string ||
value is bool ||
value is byte ||
value is sbyte ||
value is short ||
value is ushort ||
value is int ||
value is uint ||
value is long ||
value is ulong ||
value is float ||
value is double ||
value is decimal ||
value is char ||
value is Guid ||
value is DateTime ||
value is DateTimeOffset ||
value is Stream ||
value is Uri ||
value is TimeSpan
);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
using static Eventuous.Azure.ServiceBus.Shared.ServiceBusHelper;
using System.Threading.Tasks;

namespace Eventuous.Tests.Azure.ServiceBus;

public class IsSerialisableByServiceBus {
public static IEnumerable<Func<object?>> PassingTestData() {

Check warning on line 7 in src/Azure/test/Eventuous.Tests.Azure.ServiceBus/IsSerialisableByServiceBus.cs

View workflow job for this annotation

GitHub Actions / Build and test (9.0)

Return a `Func<T>` rather than a `<T>`.

Check warning on line 7 in src/Azure/test/Eventuous.Tests.Azure.ServiceBus/IsSerialisableByServiceBus.cs

View workflow job for this annotation

GitHub Actions / Build and test (8.0)

Return a `Func<T>` rather than a `<T>`.

Check warning on line 7 in src/Azure/test/Eventuous.Tests.Azure.ServiceBus/IsSerialisableByServiceBus.cs

View workflow job for this annotation

GitHub Actions / Build and test (10.0)

Return a `Func<T>` rather than a `<T>`.
yield return () => "string";
yield return () => 123; // int
yield return () => 123L; // long
yield return () => (short)1;
yield return () => (byte)1;
yield return () => 123u; // uint
yield return () => 123UL; // ulong
yield return () => 1.23f; // float
yield return () => 1.23d; // double
yield return () => 12.34m; // decimal
yield return () => true; // bool
yield return () => 'c'; // char
yield return () => Guid.NewGuid();
yield return () => DateTime.UtcNow;
yield return () => DateTimeOffset.UtcNow;
yield return () => TimeSpan.FromMinutes(5);
yield return () => new Uri("https://example.com");
yield return () => new MemoryStream(); // Stream
}

public static IEnumerable<Func<object?>> FailingTestData() {

Check warning on line 28 in src/Azure/test/Eventuous.Tests.Azure.ServiceBus/IsSerialisableByServiceBus.cs

View workflow job for this annotation

GitHub Actions / Build and test (9.0)

Return a `Func<T>` rather than a `<T>`.

Check warning on line 28 in src/Azure/test/Eventuous.Tests.Azure.ServiceBus/IsSerialisableByServiceBus.cs

View workflow job for this annotation

GitHub Actions / Build and test (8.0)

Return a `Func<T>` rather than a `<T>`.

Check warning on line 28 in src/Azure/test/Eventuous.Tests.Azure.ServiceBus/IsSerialisableByServiceBus.cs

View workflow job for this annotation

GitHub Actions / Build and test (10.0)

Return a `Func<T>` rather than a `<T>`.
yield return () => null;
yield return () => new object(); // plain object
yield return () => new Dictionary<object, string> { [new object()] = "v" }; // dictionary with non-string key
yield return () => new Dictionary<string, object> { ["k"] = new object() }; // dictionary with non-serializable value
yield return () => new List<object> { new() }; // list with non-serializable item
yield return () => Task.CompletedTask; // Task
yield return () => new Action(() => { }); // delegate
yield return () => new WeakReference(new object()); // complex type
}

[Test]
[MethodDataSource(nameof(PassingTestData))]
public async Task Passes(object value) => await Assert.That(IsSerialisableByServiceBus(value)).IsTrue();

[Test]
[MethodDataSource(nameof(FailingTestData))]
public async Task Fails(object value) => await Assert.That(IsSerialisableByServiceBus(value)).IsFalse();
}
Loading