Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,13 @@ namespace Eventuous.Azure.ServiceBus.Producers;

using Shared;

class ServiceBusMessageBuilder {
readonly IEventSerializer _serializer;
readonly string _streamName;
readonly ServiceBusProduceOptions? _options;
readonly ServiceBusMessageAttributeNames _attributes;
readonly Action<string>? _setActivityMessageType;

public ServiceBusMessageBuilder(
IEventSerializer serializer,
string streamName,
ServiceBusMessageAttributeNames attributes,
ServiceBusProduceOptions? options = null,
Action<string>? setActivityMessageType = null
) {
_serializer = serializer;
_streamName = streamName;
_options = options;
_attributes = attributes;
_setActivityMessageType = setActivityMessageType;
}

class ServiceBusMessageBuilder(
IEventSerializer serializer,
string streamName,
ServiceBusMessageAttributeNames attributes,
ServiceBusProduceOptions? options = null,
Action<string>? setActivityMessageType = null
) {
/// <summary>
/// Creates a <see cref="ServiceBusMessage"/> from the provided <see cref="ProducedMessage"/>.
/// This method serializes the event, sets the necessary properties, and adds custom application properties
Expand All @@ -37,22 +23,22 @@ public ServiceBusMessageBuilder(
/// <param name="message"></param>
/// <returns></returns>
internal ServiceBusMessage CreateServiceBusMessage(ProducedMessage message) {
var (messageType, contentType, payload) = _serializer.SerializeEvent(message.Message);
_setActivityMessageType?.Invoke(messageType);
var (messageType, contentType, payload) = serializer.SerializeEvent(message.Message);
setActivityMessageType?.Invoke(messageType);

var metadata = message.Metadata;

var serviceBusMessage = new ServiceBusMessage(payload) {
ContentType = contentType,
MessageId = metadata?.GetValueOrDefault(_attributes.MessageId, message.MessageId)?.ToString(),
Subject = metadata?.GetValueOrDefault(_attributes.Subject, _options?.Subject)?.ToString(),
TimeToLive = _options?.TimeToLive ?? TimeSpan.MaxValue,
MessageId = metadata?.GetValueOrDefault(attributes.MessageId, message.MessageId)?.ToString(),
Subject = metadata?.GetValueOrDefault(attributes.Subject, options?.Subject)?.ToString(),
TimeToLive = options?.TimeToLive ?? TimeSpan.MaxValue,
CorrelationId = message.Metadata?.GetCorrelationId(),
To = metadata?.GetValueOrDefault(_attributes.To, _options?.To)?.ToString(),
ReplyTo = metadata?.GetValueOrDefault(_attributes.ReplyTo, _options?.ReplyTo)?.ToString()
To = metadata?.GetValueOrDefault(attributes.To, options?.To)?.ToString(),
ReplyTo = metadata?.GetValueOrDefault(attributes.ReplyTo, options?.ReplyTo)?.ToString()
};

var reservedAttributes = _attributes.ReservedNames();
var reservedAttributes = attributes.ReservedNames();

foreach (var property in GetCustomApplicationProperties(message, messageType, reservedAttributes)) {
serviceBusMessage.ApplicationProperties.Add(property);
Expand All @@ -64,8 +50,8 @@ internal ServiceBusMessage CreateServiceBusMessage(ProducedMessage message) {
IEnumerable<KeyValuePair<string, object>> GetCustomApplicationProperties(ProducedMessage message, string messageType, HashSet<string> reservedAttributes)
=> (message.Metadata ?? [])
.Concat(message.AdditionalHeaders ?? [])
.Concat([new(_attributes.MessageType, messageType), new(_attributes.StreamName, _streamName)])
.Concat([new(attributes.MessageType, messageType), new(attributes.StreamName, streamName)])
.Where(pair => !reservedAttributes.Contains(pair.Key))
.Where(pair => IsSerialisableByServiceBus(pair.Value))
.Select(pair => new KeyValuePair<string, object>(pair.Key, pair.Value!));
.Select(pair => new KeyValuePair<string, object>(pair.Key, pair.Value!));
}
51 changes: 24 additions & 27 deletions src/Azure/src/Eventuous.Azure.ServiceBus/Shared/ServiceBusHelper.cs
Original file line number Diff line number Diff line change
@@ -1,31 +1,28 @@
using System;
using System.IO;
// Copyright (C) Eventuous HQ OÜ. All rights reserved
// Licensed under the Apache License, Version 2.0.

namespace Eventuous.Azure.ServiceBus.Shared;

public static class ServiceBusHelper
{
public static bool IsSerialisableByServiceBus(object? value) =>
value is not null && (
value is string ||
value is bool ||
value is byte ||
value is sbyte ||
value is short ||
value is ushort ||
value is int ||
value is uint ||
value is long ||
value is ulong ||
value is float ||
value is double ||
value is decimal ||
value is char ||
value is Guid ||
value is DateTime ||
value is DateTimeOffset ||
value is Stream ||
value is Uri ||
value is TimeSpan
);
public static class ServiceBusHelper {
public static bool IsSerialisableByServiceBus(object? value) =>
value is string
or bool
or byte
or sbyte
or short
or ushort
or int
or uint
or long
or ulong
or float
or double
or decimal
or char
or Guid
or DateTime
or DateTimeOffset
or Stream
or Uri
or TimeSpan;
}
Original file line number Diff line number Diff line change
@@ -1,39 +1,38 @@
using static Eventuous.Azure.ServiceBus.Shared.ServiceBusHelper;
using System.Threading.Tasks;

namespace Eventuous.Tests.Azure.ServiceBus;

public class IsSerialisableByServiceBus {
public static IEnumerable<Func<object?>> PassingTestData() {

Check warning on line 6 in src/Azure/test/Eventuous.Tests.Azure.ServiceBus/IsSerialisableByServiceBus.cs

View workflow job for this annotation

GitHub Actions / Build and test (8.0)

Return a `Func<T>` rather than a `<T>`.

Check warning on line 6 in src/Azure/test/Eventuous.Tests.Azure.ServiceBus/IsSerialisableByServiceBus.cs

View workflow job for this annotation

GitHub Actions / Build and test (9.0)

Return a `Func<T>` rather than a `<T>`.

Check warning on line 6 in src/Azure/test/Eventuous.Tests.Azure.ServiceBus/IsSerialisableByServiceBus.cs

View workflow job for this annotation

GitHub Actions / Build and test (10.0)

Return a `Func<T>` rather than a `<T>`.
yield return () => "string";
yield return () => 123; // int
yield return () => 123L; // long
yield return () => 123;
yield return () => 123L;
yield return () => (short)1;
yield return () => (byte)1;
yield return () => 123u; // uint
yield return () => 123UL; // ulong
yield return () => 1.23f; // float
yield return () => 1.23d; // double
yield return () => 12.34m; // decimal
yield return () => true; // bool
yield return () => 'c'; // char
yield return () => 123u;
yield return () => 123UL;
yield return () => 1.23f;
yield return () => 1.23d;
yield return () => 12.34m;
yield return () => true;
yield return () => 'c';
yield return () => Guid.NewGuid();
yield return () => DateTime.UtcNow;
yield return () => DateTimeOffset.UtcNow;
yield return () => TimeSpan.FromMinutes(5);
yield return () => new Uri("https://example.com");
yield return () => new MemoryStream(); // Stream
yield return () => new MemoryStream();
}

public static IEnumerable<Func<object?>> FailingTestData() {

Check warning on line 27 in src/Azure/test/Eventuous.Tests.Azure.ServiceBus/IsSerialisableByServiceBus.cs

View workflow job for this annotation

GitHub Actions / Build and test (8.0)

Return a `Func<T>` rather than a `<T>`.

Check warning on line 27 in src/Azure/test/Eventuous.Tests.Azure.ServiceBus/IsSerialisableByServiceBus.cs

View workflow job for this annotation

GitHub Actions / Build and test (9.0)

Return a `Func<T>` rather than a `<T>`.

Check warning on line 27 in src/Azure/test/Eventuous.Tests.Azure.ServiceBus/IsSerialisableByServiceBus.cs

View workflow job for this annotation

GitHub Actions / Build and test (10.0)

Return a `Func<T>` rather than a `<T>`.
yield return () => null;
yield return () => new object(); // plain object
yield return () => new Dictionary<object, string> { [new object()] = "v" }; // dictionary with non-string key
yield return () => new Dictionary<string, object> { ["k"] = new object() }; // dictionary with non-serializable value
yield return () => new List<object> { new() }; // list with non-serializable item
yield return () => Task.CompletedTask; // Task
yield return () => new Action(() => { }); // delegate
yield return () => new WeakReference(new object()); // complex type
yield return () => new(); // plain object
yield return () => new Dictionary<object, string> { [new()] = "v" }; // dictionary with non-string key
yield return () => new Dictionary<string, object> { ["k"] = new() }; // dictionary with non-serializable value
yield return () => new List<object> { new() }; // list with non-serializable item
yield return () => Task.CompletedTask; // Task
yield return () => new Action(() => { }); // delegate
yield return () => new WeakReference(new()); // complex type
}

[Test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,25 +63,66 @@
return this;
}

public SubscriptionBuilder AddCompositionEventHandler
<[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicConstructors)] THandler, TWrappingHandler>(Func<THandler, TWrappingHandler> getWrappingHandler)
/// <summary>
/// Adds a composition event handler to the subscription.
/// The inner handler of type <typeparamref name="THandler"/> will be resolved from the container
/// (keyed by <see cref="SubscriptionId"/>), and then wrapped by <typeparamref name="TWrappingHandler"/>
/// using the provided factory.
/// </summary>
/// <typeparam name="THandler">Inner event handler type to be resolved from the service provider</typeparam>
/// <typeparam name="TWrappingHandler">Wrapping event handler type produced by the factory</typeparam>
/// <param name="getWrappingHandler">Factory that takes the resolved inner handler and returns the wrapping handler</param>
/// <returns>The current <see cref="SubscriptionBuilder"/> instance</returns>
public SubscriptionBuilder AddCompositionEventHandler<[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicConstructors)] THandler, TWrappingHandler>(
Func<THandler, TWrappingHandler> getWrappingHandler
)
where THandler : class, IEventHandler where TWrappingHandler : class, IEventHandler {
Services.TryAddKeyedSingleton<THandler>(SubscriptionId);
AddHandlerResolve(sp => getWrappingHandler(sp.GetRequiredKeyedService<THandler>(SubscriptionId)));

return this;
}

/// <summary>
/// Adds a composition event handler to the subscription with a custom inner handler resolver.
/// The inner handler is created via <paramref name="getInnerHandler"/> and then wrapped into
/// <typeparamref name="TWrappingHandler"/> using <paramref name="getWrappingHandler"/>.
/// </summary>
/// <typeparam name="THandler">Inner event handler type</typeparam>
/// <typeparam name="TWrappingHandler">Wrapping event handler type</typeparam>
/// <param name="getInnerHandler">Function that resolves or creates the inner handler using the service provider</param>
/// <param name="getWrappingHandler">Factory that produces the wrapping handler from the inner handler</param>
/// <returns>The current <see cref="SubscriptionBuilder"/> instance</returns>
public SubscriptionBuilder AddCompositionEventHandler<THandler, TWrappingHandler>(
Func<IServiceProvider, THandler> getInnerHandler,
Func<THandler, TWrappingHandler> getWrappingHandler
) where THandler : class, IEventHandler where TWrappingHandler : class, IEventHandler {
Services.TryAddKeyedSingleton(SubscriptionId, getInnerHandler);
Services.TryAddKeyedSingleton(SubscriptionId, (sp, _) => getInnerHandler(sp));
AddHandlerResolve(sp => getWrappingHandler(sp.GetRequiredKeyedService<THandler>(SubscriptionId)));

return this;
}

/// <summary>
/// Adds a composition event handler to the subscription with a custom inner handler resolver.
/// The inner handler is created via <paramref name="getInnerHandler"/> and then wrapped into
/// <typeparamref name="TWrappingHandler"/> using <paramref name="getWrappingHandler"/>.
/// </summary>
/// <typeparam name="THandler">Inner event handler type</typeparam>
/// <typeparam name="TWrappingHandler">Wrapping event handler type</typeparam>
/// <param name="getInnerHandler">Function that resolves or creates the inner handler using the service provider</param>
/// <param name="getWrappingHandler">Factory that produces the wrapping handler from the inner handler</param>
/// <returns>The current <see cref="SubscriptionBuilder"/> instance</returns>
public SubscriptionBuilder AddCompositionEventHandler<THandler, TWrappingHandler>(
Func<IServiceProvider, THandler> getInnerHandler,
Func<THandler, IServiceProvider, TWrappingHandler> getWrappingHandler
) where THandler : class, IEventHandler where TWrappingHandler : class, IEventHandler {
Services.TryAddKeyedSingleton(SubscriptionId, (sp, _) => getInnerHandler(sp));
AddHandlerResolve(sp => getWrappingHandler(sp.GetRequiredKeyedService<THandler>(SubscriptionId), sp));

return this;
}

/// <summary>
/// Allows using a custom consumer instead of the <see cref="DefaultConsumer"/> one.
/// Can also be used to change the default consumer instantiation.
Expand Down Expand Up @@ -141,6 +182,11 @@
TOptions> : SubscriptionBuilder
where T : EventSubscription<TOptions>
where TOptions : SubscriptionOptions {
/// <summary>
/// Creates a new subscription builder for a specific subscription id.
/// </summary>
/// <param name="services">The service collection to register handlers and dependencies with</param>
/// <param name="subscriptionId">The subscription identifier used to key registrations</param>
public SubscriptionBuilder(IServiceCollection services, string subscriptionId) : base(services, subscriptionId) {
ResolveConsumer = ResolveDefaultConsumer;
ConfigureOptions = options => options.SubscriptionId = subscriptionId;
Expand Down Expand Up @@ -182,6 +228,14 @@
return _resolvedConsumer;
}

/// <summary>
/// Resolves and builds the subscription instance of type <typeparamref name="T"/>.
/// Applies tracing and consumer filters to the consume pipe when diagnostics are enabled,
/// resolves the configured consumer, and creates the subscription using options keyed by
/// <see cref="SubscriptionId"/>.

Check warning on line 235 in src/Core/src/Eventuous.Subscriptions/Registrations/SubscriptionBuilder.cs

View workflow job for this annotation

GitHub Actions / Build and test (8.0)

XML comment has cref attribute 'SubscriptionId' that could not be resolved

Check warning on line 235 in src/Core/src/Eventuous.Subscriptions/Registrations/SubscriptionBuilder.cs

View workflow job for this annotation

GitHub Actions / Build and test (9.0)

XML comment has cref attribute 'SubscriptionId' that could not be resolved

Check warning on line 235 in src/Core/src/Eventuous.Subscriptions/Registrations/SubscriptionBuilder.cs

View workflow job for this annotation

GitHub Actions / Build and test (10.0)

XML comment has cref attribute 'SubscriptionId' that could not be resolved
/// </summary>
/// <param name="sp">Service provider used to resolve dependencies</param>
/// <returns>The resolved and configured subscription instance</returns>
public T ResolveSubscription(IServiceProvider sp) {
if (_resolvedSubscription != null) {
return _resolvedSubscription;
Expand Down
Loading
Loading