Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions src/SImpl.Queue/DequeueActions/IdOrientedDequeueAction.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

namespace SImpl.Queue
{
public class IdOrientedDequeueAction : IDequeueAction<(Guid Id, Func<IServiceScopeFactory,Task> Job)>
{
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<IdOrientedDequeueAction> _logger;

public IdOrientedDequeueAction(IServiceProvider serviceProvider, ILogger<IdOrientedDequeueAction> logger)
{
_serviceProvider = serviceProvider;
_logger = logger;
}

public void DequeueAction((Guid Id, Func<IServiceScopeFactory,Task> Job) action)
{
ExecutionContext.SuppressFlow();
Task.Run(async () =>
{
try
{
var startTime = DateTime.Now;
_logger.LogInformation($"{startTime.ToLongTimeString()} starting task {action.Id}");
await action.Job.Invoke(
_serviceProvider.GetService(typeof(IServiceScopeFactory)) as IServiceScopeFactory);
var endTime = DateTime.Now;

TimeSpan diff = endTime - startTime;
_logger.LogInformation(
$"{endTime.ToLongTimeString()} finishing task {action.Id} - took {diff.TotalSeconds} seconds");

}
catch (Exception e)
{
_logger.LogInformation(
$" task {action.Id} failed");
}
});



ExecutionContext.RestoreFlow();

}
}
}
4 changes: 2 additions & 2 deletions src/SImpl.Queue/Module/QueueModule.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ public void ConfigureServices(IServiceCollection services)
s.FromAssemblies(Config.RegisteredAssemblies)
.AddClasses(c => c.AssignableTo(typeof(IQueue<>)))
.AsImplementedInterfaces()
.WithTransientLifetime());
.WithSingletonLifetime());

services.Scan(s =>
s.FromAssemblies(Config.RegisteredAssemblies)
.AddClasses(c => c.AssignableTo(typeof(IDequeueAction<>)))
.AsImplementedInterfaces()
.WithTransientLifetime());
.WithSingletonLifetime());

var queueInterface = typeof(IQueue<>);
foreach (var queueReg in Config.RegisteredQueues)
Expand Down
5 changes: 5 additions & 0 deletions src/SImpl.Queue/Module/QueueModuleConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ public QueueModuleConfig AddQueuesAndDequeueActionsFromAssemblyOf<T>()
AddQueuesAndDequeueActionsFromAssembly(typeof(T).Assembly);
return this;
}
public QueueModuleConfig EnableDefaultQueue()
{
AddQueuesAndDequeueActionsFromAssembly(this.GetType().Assembly);
return this;
}

public QueueModuleConfig AddQueue<TQueue, T>()
where TQueue : IQueue<T>
Expand Down
File renamed without changes.
82 changes: 82 additions & 0 deletions src/SImpl.Queue/Queues/IdOrientedJobQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

namespace SImpl.Queue
{
public class IdOrientedJobQueue : IQueue<(Guid Id, Func<IServiceScopeFactory,Task> Job)>
{
private readonly IDequeueAction<(Guid Id, Func<IServiceScopeFactory,Task> Job)> _dequeueAction;
private readonly ILogger<IdOrientedJobQueue> _logger;
private readonly BlockingCollection<(Guid Id, Func<IServiceScopeFactory,Task> Job)> _jobs = new BlockingCollection<(Guid Id, Func<IServiceScopeFactory,Task> Job)>();

public IdOrientedJobQueue(IDequeueAction<(Guid Id, Func<IServiceScopeFactory,Task> Job)> dequeueAction, ILogger<IdOrientedJobQueue> logger)
{
this._dequeueAction = dequeueAction;
_logger = logger;
new Thread(new ThreadStart(this.OnStart))
{
IsBackground = false,

}.Start();
new Thread(new ThreadStart(this.OnStart))
{
IsBackground = false,

}.Start();
}

public static IdOrientedJobQueue CreateInstance(IDequeueAction<(Guid Id, Func<IServiceScopeFactory,Task> Job)> dequeueAction, ILogger<IdOrientedJobQueue> logger)
{
return new IdOrientedJobQueue(dequeueAction, logger);
}


private void ProcessQueuedItem((Guid Id, Func<IServiceScopeFactory,Task> Job) item) => this._dequeueAction.DequeueAction(item);

private void OnStart()
{
var pause = TimeSpan.FromSeconds(1);
var lastrun = DateTime.Now;
while (true) // some criteria to abort or even true works here
{
if (_jobs.Count == 0)
{
if (lastrun + TimeSpan.FromMinutes(1) < DateTime.Now)
{
_logger.LogInformation("background Queue is empty");
lastrun = DateTime.Now;

}
// no pending actions available. pause
continue;
}

foreach ((Guid Id, Func<IServiceScopeFactory,Task> Job) consuming in this._jobs.GetConsumingEnumerable(
CancellationToken.None))
{
_logger.LogInformation($"background Queue has {_jobs.Count}");
try
{
this.ProcessQueuedItem(consuming);

}
catch (Exception e)
{
_logger.LogError("background task failed",e);
}
}
}

}

public void Enqueue((Guid Id, Func<IServiceScopeFactory,Task> Job) job)
{
_logger.LogInformation($"adding job with id {job.Id}");
this._jobs.Add(job);
}
}
}
1 change: 1 addition & 0 deletions src/SImpl.Queue/SImpl.Queue.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="5.0.0" />
<PackageReference Include="Scrutor" Version="3.3.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="5.0.0" />
Expand Down