diff --git a/Inngest/Attributes/IdempotencyAttribute.cs b/Inngest/Attributes/IdempotencyAttribute.cs new file mode 100644 index 0000000..44dec4f --- /dev/null +++ b/Inngest/Attributes/IdempotencyAttribute.cs @@ -0,0 +1,47 @@ +namespace Inngest.Attributes; + +/// +/// Configures idempotency for an Inngest function using a CEL expression. +/// When multiple events result in the same idempotency key, only the first +/// event will be processed; subsequent events with the same key are skipped. +/// +/// +/// Use idempotency to prevent duplicate processing when events may be retried +/// or delivered multiple times, such as webhook handlers or receipt senders. +/// +/// +/// +/// // One receipt per contribution - prevents duplicate emails on retry +/// [InngestFunction("send-donor-receipt")] +/// [EventTrigger("contribution/created")] +/// [Idempotency("event.data.contributionId")] +/// public class SendDonorReceiptFunction : IInngestFunction { } +/// +/// // Compound key for more specific deduplication +/// [InngestFunction("process-payment")] +/// [EventTrigger("payment/received")] +/// [Idempotency("event.data.paymentId + '-' + event.data.customerId")] +/// public class ProcessPaymentFunction : IInngestFunction { } +/// +/// +[AttributeUsage(AttributeTargets.Class, AllowMultiple = false, Inherited = false)] +public sealed class IdempotencyAttribute : Attribute +{ + /// + /// CEL expression that evaluates to the idempotency key. + /// Events with the same key will only be processed once. + /// + public string Key { get; } + + /// + /// Creates a new idempotency attribute + /// + /// CEL expression for the idempotency key (e.g., "event.data.orderId") + public IdempotencyAttribute(string key) + { + if (string.IsNullOrWhiteSpace(key)) + throw new ArgumentException("Idempotency key cannot be null or empty", nameof(key)); + + Key = key; + } +} diff --git a/Inngest/Attributes/ThrottleAttribute.cs b/Inngest/Attributes/ThrottleAttribute.cs new file mode 100644 index 0000000..5c01c29 --- /dev/null +++ b/Inngest/Attributes/ThrottleAttribute.cs @@ -0,0 +1,67 @@ +namespace Inngest.Attributes; + +/// +/// Configures throttling for an Inngest function. +/// Unlike RateLimit which DROPS events when the limit is exceeded, +/// Throttle QUEUES events and processes them at the configured rate. +/// +/// +/// Use Throttle instead of RateLimit when you cannot afford to lose events, +/// such as payment processing webhooks. +/// +/// +/// +/// // Limit to 20 executions per minute, queuing excess events +/// [InngestFunction("payment-processor")] +/// [EventTrigger("payment/received")] +/// [Throttle(20, "1m")] +/// public class PaymentProcessor : IInngestFunction { } +/// +/// // Limit to 10 executions per minute per customer, with burst allowance +/// [InngestFunction("customer-processor")] +/// [EventTrigger("customer/action")] +/// [Throttle(10, "1m", Key = "event.data.customerId", Burst = 5)] +/// public class CustomerProcessor : IInngestFunction { } +/// +/// +[AttributeUsage(AttributeTargets.Class, AllowMultiple = false, Inherited = false)] +public sealed class ThrottleAttribute : Attribute +{ + /// + /// The maximum number of executions allowed in the period + /// + public int Limit { get; } + + /// + /// The time period for throttling. + /// Uses Inngest time string format (e.g., "1m", "1h", "1d"). + /// + public string Period { get; } + + /// + /// Optional CEL expression to partition the throttle. + /// When specified, the limit applies per unique value of this expression. + /// + public string? Key { get; set; } + + /// + /// Maximum number of events to burst through before throttling kicks in. + /// + public int? Burst { get; set; } + + /// + /// Creates a new throttle attribute + /// + /// The maximum number of executions in the period + /// The time period (e.g., "1m", "1h", "1d") + public ThrottleAttribute(int limit, string period) + { + if (limit < 1) + throw new ArgumentException("Throttle limit must be at least 1", nameof(limit)); + if (string.IsNullOrWhiteSpace(period)) + throw new ArgumentException("Period cannot be null or empty", nameof(period)); + + Limit = limit; + Period = period; + } +} diff --git a/Inngest/Attributes/TimeoutAttribute.cs b/Inngest/Attributes/TimeoutAttribute.cs new file mode 100644 index 0000000..e062cb1 --- /dev/null +++ b/Inngest/Attributes/TimeoutAttribute.cs @@ -0,0 +1,72 @@ +namespace Inngest.Attributes; + +/// +/// Configures timeouts for an Inngest function to automatically cancel +/// runs that take too long to start or finish. +/// +/// +/// +/// Use timeouts to prevent queue buildup and ensure functions don't hang indefinitely. +/// This is especially important when using concurrency controls, as duplicate webhooks +/// queue up and wait - if a request hangs, the queue grows unbounded. +/// +/// +/// There are two timeout types: +/// +/// +/// Start +/// +/// Maximum time a run can wait in the queue before starting. +/// Runs exceeding this timeout are cancelled before executing. +/// +/// +/// +/// Finish +/// +/// Maximum time a run can execute after starting. +/// Runs exceeding this timeout are cancelled during execution. +/// +/// +/// +/// +/// +/// +/// +/// // Cancel if function takes longer than 30 seconds to complete +/// [InngestFunction("create-contribution")] +/// [EventTrigger("payment/received")] +/// [Timeout(Finish = "30s")] +/// public class CreateContributionFunction : IInngestFunction { } +/// +/// // Cancel if queued longer than 1 minute or runs longer than 2 minutes +/// [InngestFunction("process-order")] +/// [EventTrigger("order/created")] +/// [Timeout(Start = "1m", Finish = "2m")] +/// public class ProcessOrderFunction : IInngestFunction { } +/// +/// +[AttributeUsage(AttributeTargets.Class, AllowMultiple = false, Inherited = false)] +public sealed class TimeoutAttribute : Attribute +{ + /// + /// Maximum time a run can wait in the queue before starting. + /// Uses Inngest time string format (e.g., "10s", "1m", "1h"). + /// If exceeded, the run is cancelled before it starts. + /// + public string? Start { get; set; } + + /// + /// Maximum time a run can execute after starting. + /// Uses Inngest time string format (e.g., "30s", "5m", "1h"). + /// If exceeded, the run is cancelled during execution. + /// + public string? Finish { get; set; } + + /// + /// Creates a new timeout attribute. + /// At least one of Start or Finish must be specified. + /// + public TimeoutAttribute() + { + } +} diff --git a/Inngest/FunctionDefinition.cs b/Inngest/FunctionDefinition.cs index 753a29a..50e277a 100644 --- a/Inngest/FunctionDefinition.cs +++ b/Inngest/FunctionDefinition.cs @@ -153,6 +153,11 @@ public class FunctionOptions /// Idempotency key expression (CEL) to prevent duplicate executions /// public string? IdempotencyKey { get; set; } + + /// + /// Timeout configuration to automatically cancel runs that take too long + /// + public TimeoutOptions? Timeouts { get; set; } } /// @@ -292,6 +297,26 @@ public class CancellationOptions public string? Timeout { get; set; } } +/// +/// Timeout configuration to automatically cancel runs that take too long +/// +public class TimeoutOptions +{ + /// + /// Maximum time a run can wait in the queue before starting. + /// Uses Inngest time string format (e.g., "10s", "1m", "1h"). + /// If exceeded, the run is cancelled before it starts. + /// + public string? Start { get; set; } + + /// + /// Maximum time a run can execute after starting. + /// Uses Inngest time string format (e.g., "30s", "5m", "1h"). + /// If exceeded, the run is cancelled during execution. + /// + public string? Finish { get; set; } +} + /// /// Configuration for retry behavior /// diff --git a/Inngest/Inngest.csproj b/Inngest/Inngest.csproj index 8300069..d4b5e3c 100644 --- a/Inngest/Inngest.csproj +++ b/Inngest/Inngest.csproj @@ -19,7 +19,7 @@ https://github.com/jmarbutt/InngestDotNet/releases - 1.3.7 + 1.4.0 true diff --git a/Inngest/InngestClient.cs b/Inngest/InngestClient.cs index ae8e594..37fff44 100644 --- a/Inngest/InngestClient.cs +++ b/Inngest/InngestClient.cs @@ -30,7 +30,7 @@ public class InngestClient : IInngestClient private readonly string _environment; private readonly bool _isDev; private readonly bool _disableCronTriggersInDev; - private readonly string _sdkVersion = "1.3.7"; + private readonly string _sdkVersion = "1.4.0"; private readonly string _appId; private readonly ILogger _logger; private readonly IInngestFunctionRegistry? _registry; @@ -947,6 +947,18 @@ private async Task HandleSyncRequest(HttpContext context) { functionObj["retries"] = fn.Options.Retry.Attempts.Value; } + + // Timeouts + if (fn.Options.Timeouts != null) + { + var timeouts = new Dictionary(); + if (fn.Options.Timeouts.Start != null) + timeouts["start"] = fn.Options.Timeouts.Start; + if (fn.Options.Timeouts.Finish != null) + timeouts["finish"] = fn.Options.Timeouts.Finish; + if (timeouts.Count > 0) + functionObj["timeouts"] = timeouts; + } } fnArray.Add(functionObj); diff --git a/Inngest/Internal/InngestFunctionRegistry.cs b/Inngest/Internal/InngestFunctionRegistry.cs index 53b559e..a5b85b3 100644 --- a/Inngest/Internal/InngestFunctionRegistry.cs +++ b/Inngest/Internal/InngestFunctionRegistry.cs @@ -228,6 +228,40 @@ private static FunctionTrigger[] GetTriggers(Type functionType, Type? eventDataT hasOptions = true; } + // Throttle + var throttleAttr = functionType.GetCustomAttribute(); + if (throttleAttr != null) + { + options.Throttle = new ThrottleOptions + { + Limit = throttleAttr.Limit, + Period = throttleAttr.Period, + Key = throttleAttr.Key, + Burst = throttleAttr.Burst + }; + hasOptions = true; + } + + // Idempotency + var idempotencyAttr = functionType.GetCustomAttribute(); + if (idempotencyAttr != null) + { + options.IdempotencyKey = idempotencyAttr.Key; + hasOptions = true; + } + + // Timeout + var timeoutAttr = functionType.GetCustomAttribute(); + if (timeoutAttr != null && (timeoutAttr.Start != null || timeoutAttr.Finish != null)) + { + options.Timeouts = new TimeoutOptions + { + Start = timeoutAttr.Start, + Finish = timeoutAttr.Finish + }; + hasOptions = true; + } + return hasOptions ? options : null; } } diff --git a/InngestExample/Functions/PaymentProcessorFunction.cs b/InngestExample/Functions/PaymentProcessorFunction.cs new file mode 100644 index 0000000..d2baa3e --- /dev/null +++ b/InngestExample/Functions/PaymentProcessorFunction.cs @@ -0,0 +1,131 @@ +using System.Text.Json; +using Inngest; +using Inngest.Attributes; +using Inngest.Steps; + +namespace InngestExample.Functions; + +/// +/// Example payment processing function demonstrating flow control features +/// critical for production payment handling. +/// +/// This function demonstrates: +/// - [Throttle]: QUEUES events at 20/minute (unlike RateLimit which DROPS events) +/// - [Concurrency]: Serializes processing per payment ID to prevent duplicate donors +/// - [Idempotency]: Prevents duplicate processing when webhooks retry +/// - [Timeout]: Cancels hanging requests to prevent queue buildup +/// +[InngestFunction("payment-processor", Name = "Process Payment Webhook")] +[EventTrigger("payment/received")] +[Throttle(20, "1m", Key = "event.data.customerId")] // Queue, don't drop +[Concurrency(1, Key = "event.data.paymentId")] // Serialize per payment +[Idempotency("event.data.paymentId")] // One execution per payment +[Timeout(Finish = "30s")] // Cancel if hanging +public class PaymentProcessorFunction : IInngestFunction +{ + private readonly ILogger _logger; + + public PaymentProcessorFunction(ILogger logger) + { + _logger = logger; + } + + public async Task ExecuteAsync(InngestContext context, CancellationToken cancellationToken) + { + // Extract payment data from event (Data is JsonElement when deserialized) + var data = (JsonElement?)context.Event.Data; + var paymentId = data?.GetProperty("paymentId").GetString() ?? "unknown"; + var customerId = data?.GetProperty("customerId").GetString() ?? "unknown"; + var amount = data?.GetProperty("amount").GetDecimal() ?? 0m; + + _logger.LogInformation("Processing payment {PaymentId} for customer {CustomerId}, amount: {Amount}", + paymentId, customerId, amount); + + // Step 1: Find or create donor (serialized by payment ID to prevent race conditions) + var donor = await context.Step.Run("find-or-create-donor", async () => + { + _logger.LogInformation("Finding or creating donor for customer {CustomerId}", customerId); + await Task.Delay(100, cancellationToken); + return new { donorId = $"donor_{customerId}", isNew = false }; + }); + + // Step 2: Create contribution record + var contribution = await context.Step.Run("create-contribution", async () => + { + _logger.LogInformation("Creating contribution for payment {PaymentId}", paymentId); + await Task.Delay(100, cancellationToken); + return new + { + contributionId = $"contrib_{paymentId}", + donorId = donor.donorId, + amount = amount + }; + }); + + // Step 3: Emit event for downstream handlers (receipts, notifications) + var eventIds = await context.Step.SendEvent("emit-contribution-created", + new InngestEvent + { + Name = "contribution/created", + Data = new + { + contributionId = contribution.contributionId, + donorId = contribution.donorId, + customerId = customerId, + amount = amount + } + }); + + _logger.LogInformation("Payment {PaymentId} processed, contribution {ContributionId} created, emitted {EventCount} events", + paymentId, contribution.contributionId, eventIds.Length); + + return new + { + status = "completed", + paymentId = paymentId, + contributionId = contribution.contributionId, + donorId = donor.donorId, + eventsEmitted = eventIds + }; + } +} + +/// +/// Example receipt sender demonstrating idempotency for email delivery. +/// +/// The [Idempotency] attribute ensures only one receipt email is sent per contribution, +/// even if the event is delivered multiple times due to retries. +/// +[InngestFunction("send-receipt", Name = "Send Donation Receipt")] +[EventTrigger("contribution/created")] +[Idempotency("event.data.contributionId")] // One receipt per contribution +[Timeout(Finish = "15s")] // Emails should be quick +public class SendReceiptFunction : IInngestFunction +{ + private readonly ILogger _logger; + + public SendReceiptFunction(ILogger logger) + { + _logger = logger; + } + + public async Task ExecuteAsync(InngestContext context, CancellationToken cancellationToken) + { + var data = (JsonElement?)context.Event.Data; + var contributionId = data?.GetProperty("contributionId").GetString() ?? "unknown"; + var donorId = data?.GetProperty("donorId").GetString() ?? "unknown"; + + _logger.LogInformation("Sending receipt for contribution {ContributionId} to donor {DonorId}", + contributionId, donorId); + + await context.Step.Run("send-email", async () => + { + // Simulate email sending + await Task.Delay(50, cancellationToken); + _logger.LogInformation("Receipt email sent for contribution {ContributionId}", contributionId); + return new { sent = true }; + }); + + return new { status = "sent", contributionId = contributionId }; + } +}