diff --git a/src/SImpl.Queue/IDequeueAction.cs b/src/SImpl.Queue/DequeueActions/IDequeueAction.cs similarity index 100% rename from src/SImpl.Queue/IDequeueAction.cs rename to src/SImpl.Queue/DequeueActions/IDequeueAction.cs diff --git a/src/SImpl.Queue/DequeueActions/IdOrientedDequeueAction.cs b/src/SImpl.Queue/DequeueActions/IdOrientedDequeueAction.cs new file mode 100644 index 0000000..47128dd --- /dev/null +++ b/src/SImpl.Queue/DequeueActions/IdOrientedDequeueAction.cs @@ -0,0 +1,51 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; + +namespace SImpl.Queue +{ + public class IdOrientedDequeueAction : IDequeueAction<(Guid Id, Func Job)> + { + private readonly IServiceProvider _serviceProvider; + private readonly ILogger _logger; + + public IdOrientedDequeueAction(IServiceProvider serviceProvider, ILogger logger) + { + _serviceProvider = serviceProvider; + _logger = logger; + } + + public void DequeueAction((Guid Id, Func Job) action) + { + ExecutionContext.SuppressFlow(); + Task.Run(async () => + { + try + { + var startTime = DateTime.Now; + _logger.LogInformation($"{startTime.ToLongTimeString()} starting task {action.Id}"); + await action.Job.Invoke( + _serviceProvider.GetService(typeof(IServiceScopeFactory)) as IServiceScopeFactory); + var endTime = DateTime.Now; + + TimeSpan diff = endTime - startTime; + _logger.LogInformation( + $"{endTime.ToLongTimeString()} finishing task {action.Id} - took {diff.TotalSeconds} seconds"); + + } + catch (Exception e) + { + _logger.LogInformation( + $" task {action.Id} failed"); + } + }); + + + + ExecutionContext.RestoreFlow(); + + } + } +} \ No newline at end of file diff --git a/src/SImpl.Queue/Module/QueueModule.cs b/src/SImpl.Queue/Module/QueueModule.cs index 6bf5295..fc79b04 100644 --- a/src/SImpl.Queue/Module/QueueModule.cs +++ b/src/SImpl.Queue/Module/QueueModule.cs @@ -27,13 +27,13 @@ public void ConfigureServices(IServiceCollection services) s.FromAssemblies(Config.RegisteredAssemblies) .AddClasses(c => c.AssignableTo(typeof(IQueue<>))) .AsImplementedInterfaces() - .WithTransientLifetime()); + .WithSingletonLifetime()); services.Scan(s => s.FromAssemblies(Config.RegisteredAssemblies) .AddClasses(c => c.AssignableTo(typeof(IDequeueAction<>))) .AsImplementedInterfaces() - .WithTransientLifetime()); + .WithSingletonLifetime()); var queueInterface = typeof(IQueue<>); foreach (var queueReg in Config.RegisteredQueues) diff --git a/src/SImpl.Queue/Module/QueueModuleConfig.cs b/src/SImpl.Queue/Module/QueueModuleConfig.cs index ac064ae..71dd317 100644 --- a/src/SImpl.Queue/Module/QueueModuleConfig.cs +++ b/src/SImpl.Queue/Module/QueueModuleConfig.cs @@ -27,6 +27,11 @@ public QueueModuleConfig AddQueuesAndDequeueActionsFromAssemblyOf() AddQueuesAndDequeueActionsFromAssembly(typeof(T).Assembly); return this; } + public QueueModuleConfig EnableDefaultQueue() + { + AddQueuesAndDequeueActionsFromAssembly(this.GetType().Assembly); + return this; + } public QueueModuleConfig AddQueue() where TQueue : IQueue diff --git a/src/SImpl.Queue/BlockingCollectionQueue.cs b/src/SImpl.Queue/Queues/BlockingCollectionQueue.cs similarity index 100% rename from src/SImpl.Queue/BlockingCollectionQueue.cs rename to src/SImpl.Queue/Queues/BlockingCollectionQueue.cs diff --git a/src/SImpl.Queue/IQueue.cs b/src/SImpl.Queue/Queues/IQueue.cs similarity index 100% rename from src/SImpl.Queue/IQueue.cs rename to src/SImpl.Queue/Queues/IQueue.cs diff --git a/src/SImpl.Queue/Queues/IdOrientedJobQueue.cs b/src/SImpl.Queue/Queues/IdOrientedJobQueue.cs new file mode 100644 index 0000000..a10c225 --- /dev/null +++ b/src/SImpl.Queue/Queues/IdOrientedJobQueue.cs @@ -0,0 +1,82 @@ +using System; +using System.Collections.Concurrent; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; + +namespace SImpl.Queue +{ + public class IdOrientedJobQueue : IQueue<(Guid Id, Func Job)> + { + private readonly IDequeueAction<(Guid Id, Func Job)> _dequeueAction; + private readonly ILogger _logger; + private readonly BlockingCollection<(Guid Id, Func Job)> _jobs = new BlockingCollection<(Guid Id, Func Job)>(); + + public IdOrientedJobQueue(IDequeueAction<(Guid Id, Func Job)> dequeueAction, ILogger logger) + { + this._dequeueAction = dequeueAction; + _logger = logger; + new Thread(new ThreadStart(this.OnStart)) + { + IsBackground = false, + + }.Start(); + new Thread(new ThreadStart(this.OnStart)) + { + IsBackground = false, + + }.Start(); + } + + public static IdOrientedJobQueue CreateInstance(IDequeueAction<(Guid Id, Func Job)> dequeueAction, ILogger logger) + { + return new IdOrientedJobQueue(dequeueAction, logger); + } + + + private void ProcessQueuedItem((Guid Id, Func Job) item) => this._dequeueAction.DequeueAction(item); + + private void OnStart() + { + var pause = TimeSpan.FromSeconds(1); + var lastrun = DateTime.Now; + while (true) // some criteria to abort or even true works here + { + if (_jobs.Count == 0) + { + if (lastrun + TimeSpan.FromMinutes(1) < DateTime.Now) + { + _logger.LogInformation("background Queue is empty"); + lastrun = DateTime.Now; + + } + // no pending actions available. pause + continue; + } + + foreach ((Guid Id, Func Job) consuming in this._jobs.GetConsumingEnumerable( + CancellationToken.None)) + { + _logger.LogInformation($"background Queue has {_jobs.Count}"); + try + { + this.ProcessQueuedItem(consuming); + + } + catch (Exception e) + { + _logger.LogError("background task failed",e); + } + } + } + + } + + public void Enqueue((Guid Id, Func Job) job) + { + _logger.LogInformation($"adding job with id {job.Id}"); + this._jobs.Add(job); + } + } +} \ No newline at end of file diff --git a/src/SImpl.Queue/SImpl.Queue.csproj b/src/SImpl.Queue/SImpl.Queue.csproj index 2fa3dd1..d70ae79 100644 --- a/src/SImpl.Queue/SImpl.Queue.csproj +++ b/src/SImpl.Queue/SImpl.Queue.csproj @@ -11,6 +11,7 @@ +