From 0a4ca02f3a799efd0b87a4d6f2055962f699f06a Mon Sep 17 00:00:00 2001 From: Arkadiusz Biel Date: Mon, 12 Dec 2022 16:34:03 +0000 Subject: [PATCH 1/2] feat: added default implementation for queue --- .../{ => DequeueActions}/IDequeueAction.cs | 0 .../DequeueActions/IdOrientedDequeueAction.cs | 51 ++++++++++++ src/SImpl.Queue/Module/QueueModuleConfig.cs | 5 ++ .../{ => Queues}/BlockingCollectionQueue.cs | 0 src/SImpl.Queue/{ => Queues}/IQueue.cs | 0 src/SImpl.Queue/Queues/IdOrientedJobQueue.cs | 82 +++++++++++++++++++ src/SImpl.Queue/SImpl.Queue.csproj | 1 + 7 files changed, 139 insertions(+) rename src/SImpl.Queue/{ => DequeueActions}/IDequeueAction.cs (100%) create mode 100644 src/SImpl.Queue/DequeueActions/IdOrientedDequeueAction.cs rename src/SImpl.Queue/{ => Queues}/BlockingCollectionQueue.cs (100%) rename src/SImpl.Queue/{ => Queues}/IQueue.cs (100%) create mode 100644 src/SImpl.Queue/Queues/IdOrientedJobQueue.cs diff --git a/src/SImpl.Queue/IDequeueAction.cs b/src/SImpl.Queue/DequeueActions/IDequeueAction.cs similarity index 100% rename from src/SImpl.Queue/IDequeueAction.cs rename to src/SImpl.Queue/DequeueActions/IDequeueAction.cs diff --git a/src/SImpl.Queue/DequeueActions/IdOrientedDequeueAction.cs b/src/SImpl.Queue/DequeueActions/IdOrientedDequeueAction.cs new file mode 100644 index 0000000..47128dd --- /dev/null +++ b/src/SImpl.Queue/DequeueActions/IdOrientedDequeueAction.cs @@ -0,0 +1,51 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; + +namespace SImpl.Queue +{ + public class IdOrientedDequeueAction : IDequeueAction<(Guid Id, Func Job)> + { + private readonly IServiceProvider _serviceProvider; + private readonly ILogger _logger; + + public IdOrientedDequeueAction(IServiceProvider serviceProvider, ILogger logger) + { + _serviceProvider = serviceProvider; + _logger = logger; + } + + public void DequeueAction((Guid Id, Func Job) action) + { + ExecutionContext.SuppressFlow(); + Task.Run(async () => + { + try + { + var startTime = DateTime.Now; + _logger.LogInformation($"{startTime.ToLongTimeString()} starting task {action.Id}"); + await action.Job.Invoke( + _serviceProvider.GetService(typeof(IServiceScopeFactory)) as IServiceScopeFactory); + var endTime = DateTime.Now; + + TimeSpan diff = endTime - startTime; + _logger.LogInformation( + $"{endTime.ToLongTimeString()} finishing task {action.Id} - took {diff.TotalSeconds} seconds"); + + } + catch (Exception e) + { + _logger.LogInformation( + $" task {action.Id} failed"); + } + }); + + + + ExecutionContext.RestoreFlow(); + + } + } +} \ No newline at end of file diff --git a/src/SImpl.Queue/Module/QueueModuleConfig.cs b/src/SImpl.Queue/Module/QueueModuleConfig.cs index ac064ae..71dd317 100644 --- a/src/SImpl.Queue/Module/QueueModuleConfig.cs +++ b/src/SImpl.Queue/Module/QueueModuleConfig.cs @@ -27,6 +27,11 @@ public QueueModuleConfig AddQueuesAndDequeueActionsFromAssemblyOf() AddQueuesAndDequeueActionsFromAssembly(typeof(T).Assembly); return this; } + public QueueModuleConfig EnableDefaultQueue() + { + AddQueuesAndDequeueActionsFromAssembly(this.GetType().Assembly); + return this; + } public QueueModuleConfig AddQueue() where TQueue : IQueue diff --git a/src/SImpl.Queue/BlockingCollectionQueue.cs b/src/SImpl.Queue/Queues/BlockingCollectionQueue.cs similarity index 100% rename from src/SImpl.Queue/BlockingCollectionQueue.cs rename to src/SImpl.Queue/Queues/BlockingCollectionQueue.cs diff --git a/src/SImpl.Queue/IQueue.cs b/src/SImpl.Queue/Queues/IQueue.cs similarity index 100% rename from src/SImpl.Queue/IQueue.cs rename to src/SImpl.Queue/Queues/IQueue.cs diff --git a/src/SImpl.Queue/Queues/IdOrientedJobQueue.cs b/src/SImpl.Queue/Queues/IdOrientedJobQueue.cs new file mode 100644 index 0000000..a10c225 --- /dev/null +++ b/src/SImpl.Queue/Queues/IdOrientedJobQueue.cs @@ -0,0 +1,82 @@ +using System; +using System.Collections.Concurrent; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; + +namespace SImpl.Queue +{ + public class IdOrientedJobQueue : IQueue<(Guid Id, Func Job)> + { + private readonly IDequeueAction<(Guid Id, Func Job)> _dequeueAction; + private readonly ILogger _logger; + private readonly BlockingCollection<(Guid Id, Func Job)> _jobs = new BlockingCollection<(Guid Id, Func Job)>(); + + public IdOrientedJobQueue(IDequeueAction<(Guid Id, Func Job)> dequeueAction, ILogger logger) + { + this._dequeueAction = dequeueAction; + _logger = logger; + new Thread(new ThreadStart(this.OnStart)) + { + IsBackground = false, + + }.Start(); + new Thread(new ThreadStart(this.OnStart)) + { + IsBackground = false, + + }.Start(); + } + + public static IdOrientedJobQueue CreateInstance(IDequeueAction<(Guid Id, Func Job)> dequeueAction, ILogger logger) + { + return new IdOrientedJobQueue(dequeueAction, logger); + } + + + private void ProcessQueuedItem((Guid Id, Func Job) item) => this._dequeueAction.DequeueAction(item); + + private void OnStart() + { + var pause = TimeSpan.FromSeconds(1); + var lastrun = DateTime.Now; + while (true) // some criteria to abort or even true works here + { + if (_jobs.Count == 0) + { + if (lastrun + TimeSpan.FromMinutes(1) < DateTime.Now) + { + _logger.LogInformation("background Queue is empty"); + lastrun = DateTime.Now; + + } + // no pending actions available. pause + continue; + } + + foreach ((Guid Id, Func Job) consuming in this._jobs.GetConsumingEnumerable( + CancellationToken.None)) + { + _logger.LogInformation($"background Queue has {_jobs.Count}"); + try + { + this.ProcessQueuedItem(consuming); + + } + catch (Exception e) + { + _logger.LogError("background task failed",e); + } + } + } + + } + + public void Enqueue((Guid Id, Func Job) job) + { + _logger.LogInformation($"adding job with id {job.Id}"); + this._jobs.Add(job); + } + } +} \ No newline at end of file diff --git a/src/SImpl.Queue/SImpl.Queue.csproj b/src/SImpl.Queue/SImpl.Queue.csproj index 2fa3dd1..d70ae79 100644 --- a/src/SImpl.Queue/SImpl.Queue.csproj +++ b/src/SImpl.Queue/SImpl.Queue.csproj @@ -11,6 +11,7 @@ + From 824da7ef1ae85ac5d47600248a7d704f11400cc7 Mon Sep 17 00:00:00 2001 From: Arkadiusz Biel Date: Mon, 12 Dec 2022 17:01:21 +0000 Subject: [PATCH 2/2] fix: corrected lifespan --- src/SImpl.Queue/Module/QueueModule.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/SImpl.Queue/Module/QueueModule.cs b/src/SImpl.Queue/Module/QueueModule.cs index 6bf5295..fc79b04 100644 --- a/src/SImpl.Queue/Module/QueueModule.cs +++ b/src/SImpl.Queue/Module/QueueModule.cs @@ -27,13 +27,13 @@ public void ConfigureServices(IServiceCollection services) s.FromAssemblies(Config.RegisteredAssemblies) .AddClasses(c => c.AssignableTo(typeof(IQueue<>))) .AsImplementedInterfaces() - .WithTransientLifetime()); + .WithSingletonLifetime()); services.Scan(s => s.FromAssemblies(Config.RegisteredAssemblies) .AddClasses(c => c.AssignableTo(typeof(IDequeueAction<>))) .AsImplementedInterfaces() - .WithTransientLifetime()); + .WithSingletonLifetime()); var queueInterface = typeof(IQueue<>); foreach (var queueReg in Config.RegisteredQueues)