Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Cortex.Mediator/Assets/license.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
The MIT License (MIT)

Copyright (c) 2022 Buildersoft
Copyright (c) 2024 Buildersoft

Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
Expand Down
2 changes: 1 addition & 1 deletion src/Cortex.States.RocksDb/Assets/license.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
The MIT License (MIT)

Copyright (c) 2022 Buildersoft
Copyright (c) 2024 Buildersoft

Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
Expand Down
2 changes: 1 addition & 1 deletion src/Cortex.States/Assets/license.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
The MIT License (MIT)

Copyright (c) 2022 Buildersoft
Copyright (c) 2024 Buildersoft

Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
Expand Down
2 changes: 1 addition & 1 deletion src/Cortex.Streams.AWSSQS/Assets/license.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
The MIT License (MIT)

Copyright (c) 2022 Buildersoft
Copyright (c) 2024 Buildersoft

Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
Expand Down
2 changes: 1 addition & 1 deletion src/Cortex.Streams.AzureBlobStorage/Assets/license.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
The MIT License (MIT)

Copyright (c) 2022 Buildersoft
Copyright (c) 2024 Buildersoft

Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
Expand Down
2 changes: 1 addition & 1 deletion src/Cortex.Streams.AzureServiceBus/Assets/license.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
The MIT License (MIT)

Copyright (c) 2022 Buildersoft
Copyright (c) 2024 Buildersoft

Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
Expand Down
2 changes: 1 addition & 1 deletion src/Cortex.Streams.Files/Assets/license.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
The MIT License (MIT)

Copyright (c) 2022 Buildersoft
Copyright (c) 2024 Buildersoft

Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
Expand Down
2 changes: 1 addition & 1 deletion src/Cortex.Streams.Kafka/Assets/license.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
The MIT License (MIT)

Copyright (c) 2022 Buildersoft
Copyright (c) 2024 Buildersoft

Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
Expand Down
2 changes: 1 addition & 1 deletion src/Cortex.Streams.Pulsar/Assets/license.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
The MIT License (MIT)

Copyright (c) 2022 Buildersoft
Copyright (c) 2024 Buildersoft

Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
Expand Down
2 changes: 1 addition & 1 deletion src/Cortex.Streams.RabbitMQ/Assets/license.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
The MIT License (MIT)

Copyright (c) 2022 Buildersoft
Copyright (c) 2024 Buildersoft

Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
Expand Down
2 changes: 1 addition & 1 deletion src/Cortex.Streams.S3/Assets/license.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
The MIT License (MIT)

Copyright (c) 2022 Buildersoft
Copyright (c) 2024 Buildersoft

Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
Expand Down
65 changes: 63 additions & 2 deletions src/Cortex.Streams/Abstractions/IBranchStreamBuilder.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using Cortex.Streams.Operators;
using Cortex.States;
using Cortex.Streams.Operators;
using System;
using System.Collections.Generic;

namespace Cortex.Streams.Abstractions
{
Expand All @@ -23,7 +25,62 @@ public interface IBranchStreamBuilder<TIn, TCurrent>
/// <typeparam name="TNext">The type of data after the transformation.</typeparam>
/// <param name="mapFunction">A function to transform data.</param>
/// <returns>The branch stream builder with the new data type.</returns>
IBranchStreamBuilder<TIn, TNext> Map<TNext>(Func<TCurrent, TNext> mapFunction);
IBranchStreamBuilder<TCurrent, TNext> Map<TNext>(Func<TCurrent, TNext> mapFunction);



/// <summary>
/// Groups the stream data by a specified key selector.
/// </summary>
/// <typeparam name="TKey">The type of the key to group by.</typeparam>
/// <param name="keySelector">A function to extract the key from data.</param>
/// <param name="stateStore">An optional state store to use for storing group state.</param>
/// <returns>A stream builder with grouped data.</returns>
IBranchStreamBuilder<TIn, TCurrent> GroupBySilently<TKey>(
Func<TCurrent, TKey> keySelector,
string stateStoreName = null,
IStateStore<TKey, List<TCurrent>> stateStore = null);

/// <summary>
/// Groups the stream data by a specified key selector silently.
/// </summary>
/// <typeparam name="TKey">The type of the key to group by.</typeparam>
/// <param name="keySelector">A function to extract the key from data.</param>
/// <param name="stateStore">An optional state store to use for storing group state.</param>
/// <returns>A stream builder with grouped data.</returns>
IBranchStreamBuilder<TIn, KeyValuePair<TKey, TCurrent>> GroupBy<TKey>(
Func<TCurrent, TKey> keySelector,
string stateStoreName = null,
IStateStore<TKey, List<TCurrent>> stateStore = null);

/// <summary>
/// Aggregates the stream data using a specified aggregation function.
/// </summary>
/// <typeparam name="TAggregate">The type of the aggregate value.</typeparam>
/// <param name="aggregateFunction">A function to aggregate data.</param>
/// <param name="stateStore">An optional state store to use for storing aggregate state.</param>
/// <returns>A stream builder with aggregated data.</returns>
IBranchStreamBuilder<TIn, TCurrent> AggregateSilently<TKey, TAggregate>(
Func<TCurrent, TKey> keySelector,
Func<TAggregate, TCurrent, TAggregate> aggregateFunction,
string stateStoreName = null,
IStateStore<TKey, TAggregate> stateStore = null);

/// <summary>
/// Aggregates the stream data using a specified aggregation function silently in the background.
/// </summary>
/// <typeparam name="TAggregate">The type of the aggregate value.</typeparam>
/// <param name="aggregateFunction">A function to aggregate data.</param>
/// <param name="stateStore">An optional state store to use for storing aggregate state.</param>
/// <returns>A stream builder with input data.</returns>
IBranchStreamBuilder<TIn, KeyValuePair<TKey, TAggregate>> Aggregate<TKey, TAggregate>(
Func<TCurrent, TKey> keySelector,
Func<TAggregate, TCurrent, TAggregate> aggregateFunction,
string stateStoreName = null,
IStateStore<TKey, TAggregate> stateStore = null);




/// <summary>
/// Adds a sink function to the branch to consume data.
Expand All @@ -36,5 +93,9 @@ public interface IBranchStreamBuilder<TIn, TCurrent>
/// </summary>
/// <param name="sinkOperator">A sink operator to consume data.</param>
void Sink(ISinkOperator<TCurrent> sinkOperator);




}
}
37 changes: 33 additions & 4 deletions src/Cortex.Streams/Abstractions/IStreamBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,22 @@ public interface IStreamBuilder<TIn, TCurrent>
/// <param name="keySelector">A function to extract the key from data.</param>
/// <param name="stateStore">An optional state store to use for storing group state.</param>
/// <returns>A stream builder with grouped data.</returns>
//IStreamBuilder<TIn, KeyValuePair<TKey, TCurrent>> GroupBy<TKey>(Func<TCurrent, TKey> keySelector, string stateStoreName = null, IStateStore<TKey,List<TCurrent>> stateStore = null);
IStreamBuilder<TIn, TCurrent> GroupBy<TKey>(Func<TCurrent, TKey> keySelector, string stateStoreName = null, IStateStore<TKey, List<TCurrent>> stateStore = null);
IStreamBuilder<TIn, TCurrent> GroupBySilently<TKey>(
Func<TCurrent, TKey> keySelector,
string stateStoreName = null,
IStateStore<TKey, List<TCurrent>> stateStore = null);

/// <summary>
/// Groups the stream data by a specified key selector silently.
/// </summary>
/// <typeparam name="TKey">The type of the key to group by.</typeparam>
/// <param name="keySelector">A function to extract the key from data.</param>
/// <param name="stateStore">An optional state store to use for storing group state.</param>
/// <returns>A stream builder with grouped data.</returns>
IStreamBuilder<TIn, KeyValuePair<TKey, TCurrent>> GroupBy<TKey>(
Func<TCurrent, TKey> keySelector,
string stateStoreName = null,
IStateStore<TKey, List<TCurrent>> stateStore = null);

/// <summary>
/// Aggregates the stream data using a specified aggregation function.
Expand All @@ -86,9 +100,24 @@ public interface IStreamBuilder<TIn, TCurrent>
/// <param name="aggregateFunction">A function to aggregate data.</param>
/// <param name="stateStore">An optional state store to use for storing aggregate state.</param>
/// <returns>A stream builder with aggregated data.</returns>
IStreamBuilder<TIn, TCurrent> Aggregate<TKey, TAggregate>(Func<TCurrent, TKey> keySelector, Func<TAggregate, TCurrent, TAggregate> aggregateFunction, string stateStoreName = null, IStateStore<TKey, TAggregate> stateStore = null);
//IStreamBuilder<TIn, KeyValuePair<TKey, TAggregate>> Aggregate<TKey, TAggregate>(Func<TCurrent, TKey> keySelector, Func<TAggregate, TCurrent, TAggregate> aggregateFunction, string stateStoreName = null, IStateStore<TKey, TAggregate> stateStore = null);
IStreamBuilder<TIn, TCurrent> AggregateSilently<TKey, TAggregate>(
Func<TCurrent, TKey> keySelector,
Func<TAggregate, TCurrent, TAggregate> aggregateFunction,
string stateStoreName = null,
IStateStore<TKey, TAggregate> stateStore = null);

/// <summary>
/// Aggregates the stream data using a specified aggregation function silently in the background.
/// </summary>
/// <typeparam name="TAggregate">The type of the aggregate value.</typeparam>
/// <param name="aggregateFunction">A function to aggregate data.</param>
/// <param name="stateStore">An optional state store to use for storing aggregate state.</param>
/// <returns>A stream builder with input data.</returns>
IStreamBuilder<TIn, KeyValuePair<TKey, TAggregate>> Aggregate<TKey, TAggregate>(
Func<TCurrent, TKey> keySelector,
Func<TAggregate, TCurrent, TAggregate> aggregateFunction,
string stateStoreName = null,
IStateStore<TKey, TAggregate> stateStore = null);


/// <summary>
Expand Down
2 changes: 1 addition & 1 deletion src/Cortex.Streams/Assets/license.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
The MIT License (MIT)

Copyright (c) 2022 Buildersoft
Copyright (c) 2024 Buildersoft

Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
Expand Down
Loading
Loading