From 1e9e9e86d1f6201a30cbe513050e5644df2e1830 Mon Sep 17 00:00:00 2001 From: DarekKrawczyk Date: Sat, 6 Dec 2025 23:36:26 +0100 Subject: [PATCH] Preprod --- Services/Devices/Devices.API/Program.cs | 2 +- .../Devices/DeviceEndpoints.cs | 8 + .../GetDevice/GetMonitoredLocationsCommand.cs | 11 + .../GetDevice/GetMonitoredLocationsHandler.cs | 22 + .../DeviceGenerateMeasurementConsumer.cs | 16 + .../Emulators.WebApp/Emulators.WebApp.csproj | 1 + .../Emulators.WebApp/GlobalUsings.cs | 4 +- .../Emulators/Emulators.WebApp/Program.cs | 21 + .../Gateways/Gateways.Main/appsettings.json | 14 + .../Measurements/Measurements.API/Program.cs | 2 +- .../Mappers/MeasurementMapper.cs | 48 ++ Services/Raports/Raports.API/Program.cs | 4 +- .../Consumers/AppendRaportConsumer.cs | 524 ++++++++++++++++++ .../Document/ProcessDailyDocumentConsumer.cs | 19 - .../Document/ProcessHourlyDocumentConsumer.cs | 19 - .../ProcessMonthlyDocumentConsumer.cs | 19 - .../Document/ProcessWeeklyDocumentConsumer.cs | 19 - .../Consumers/GenerateDocumentConsumer.cs | 91 ++- .../Consumers/GenerateSummaryConsumer.cs | 11 +- .../Pending/ProcessDailyRaportConsumer.cs | 19 - .../Pending/ProcessHourlyRaportConsumer.cs | 482 ---------------- .../Pending/ProcessMonthlyRaportConsumer.cs | 19 - .../Pending/ProcessWeeklyRaportConsumer.cs | 19 - .../Consumers/RaportFailedConsumer.cs | 69 ++- .../Summary/ProcessDailySummaryConsumer.cs | 19 - .../Summary/ProcessHourlySummaryConsumer.cs | 19 - .../Summary/ProcessMonthlySummaryConsumer.cs | 19 - .../Summary/ProcessWeeklySummaryConsumer.cs | 19 - .../DependencyInjection.cs | 76 +-- .../Raports.Application/GlobalUsings.cs | 4 +- .../Handlers/Create/CreateRaportHandler.cs | 18 +- .../Handlers/RaportsServiceEndpoints.cs | 17 + .../Handlers/Read/DownloadRaportCommand.cs | 20 + .../Handlers/Read/DownloadRaportHandler.cs | 50 ++ .../Handlers/Update/RetryRaportCommand.cs | 11 + .../Handlers/Update/RetryRaportHandler.cs | 66 +++ .../Update/UpdateRaportStatusHandler.cs | 20 +- docker-compose.override.yml | 10 +- 38 files changed, 989 insertions(+), 842 deletions(-) create mode 100644 Services/Devices/Devices.Application/Devices/GetDevice/GetMonitoredLocationsCommand.cs create mode 100644 Services/Devices/Devices.Application/Devices/GetDevice/GetMonitoredLocationsHandler.cs create mode 100644 Services/Raports/Raports.Application/Consumers/AppendRaportConsumer.cs delete mode 100644 Services/Raports/Raports.Application/Consumers/Document/ProcessDailyDocumentConsumer.cs delete mode 100644 Services/Raports/Raports.Application/Consumers/Document/ProcessHourlyDocumentConsumer.cs delete mode 100644 Services/Raports/Raports.Application/Consumers/Document/ProcessMonthlyDocumentConsumer.cs delete mode 100644 Services/Raports/Raports.Application/Consumers/Document/ProcessWeeklyDocumentConsumer.cs delete mode 100644 Services/Raports/Raports.Application/Consumers/Pending/ProcessDailyRaportConsumer.cs delete mode 100644 Services/Raports/Raports.Application/Consumers/Pending/ProcessHourlyRaportConsumer.cs delete mode 100644 Services/Raports/Raports.Application/Consumers/Pending/ProcessMonthlyRaportConsumer.cs delete mode 100644 Services/Raports/Raports.Application/Consumers/Pending/ProcessWeeklyRaportConsumer.cs delete mode 100644 Services/Raports/Raports.Application/Consumers/Summary/ProcessDailySummaryConsumer.cs delete mode 100644 Services/Raports/Raports.Application/Consumers/Summary/ProcessHourlySummaryConsumer.cs delete mode 100644 Services/Raports/Raports.Application/Consumers/Summary/ProcessMonthlySummaryConsumer.cs delete mode 100644 Services/Raports/Raports.Application/Consumers/Summary/ProcessWeeklySummaryConsumer.cs create mode 100644 Services/Raports/Raports.Application/Handlers/Read/DownloadRaportCommand.cs create mode 100644 Services/Raports/Raports.Application/Handlers/Read/DownloadRaportHandler.cs create mode 100644 Services/Raports/Raports.Application/Handlers/Update/RetryRaportCommand.cs create mode 100644 Services/Raports/Raports.Application/Handlers/Update/RetryRaportHandler.cs diff --git a/Services/Devices/Devices.API/Program.cs b/Services/Devices/Devices.API/Program.cs index 4729009..9c72dd5 100644 --- a/Services/Devices/Devices.API/Program.cs +++ b/Services/Devices/Devices.API/Program.cs @@ -35,7 +35,7 @@ app.UseCors(); -app.UseHealthChecks("/health", new HealthCheckOptions +app.MapHealthChecks("/devices/health", new HealthCheckOptions { ResponseWriter = UIResponseWriter.WriteHealthCheckUIResponse, }); diff --git a/Services/Devices/Devices.Application/Devices/DeviceEndpoints.cs b/Services/Devices/Devices.Application/Devices/DeviceEndpoints.cs index 27174ea..e9d2422 100644 --- a/Services/Devices/Devices.Application/Devices/DeviceEndpoints.cs +++ b/Services/Devices/Devices.Application/Devices/DeviceEndpoints.cs @@ -47,6 +47,14 @@ public void AddRoutes(IEndpointRouteBuilder app) return Results.Ok(dtos); }); + app.MapGet("/devices/locations/monitored", async (ISender sender) => + { + var response = await sender.Send(new GetMonitoredLocationsCommand()); + var dtos = response.Locations; + + return Results.Ok(dtos); + }); + app.MapGet("/devices/timestamps/all", async (ISender sender) => { var response = await sender.Send(new GetAllTimestampsCommand()); diff --git a/Services/Devices/Devices.Application/Devices/GetDevice/GetMonitoredLocationsCommand.cs b/Services/Devices/Devices.Application/Devices/GetDevice/GetMonitoredLocationsCommand.cs new file mode 100644 index 0000000..bb5e31b --- /dev/null +++ b/Services/Devices/Devices.Application/Devices/GetDevice/GetMonitoredLocationsCommand.cs @@ -0,0 +1,11 @@ +namespace Devices.Application.Devices.GetDevice; + +public record GetMonitoredLocationsCommand() : IRequest; +public record GetMonitoredLocationsResponse(IEnumerable Locations); + +public class GetMonitoredLocationsCommandValidator : AbstractValidator +{ + public GetMonitoredLocationsCommandValidator() + { + } +} diff --git a/Services/Devices/Devices.Application/Devices/GetDevice/GetMonitoredLocationsHandler.cs b/Services/Devices/Devices.Application/Devices/GetDevice/GetMonitoredLocationsHandler.cs new file mode 100644 index 0000000..6d03058 --- /dev/null +++ b/Services/Devices/Devices.Application/Devices/GetDevice/GetMonitoredLocationsHandler.cs @@ -0,0 +1,22 @@ +namespace Devices.Application.Devices.GetDevice; + +public class GetMonitoredLocationsHandler(DevicesDBContext context) : IRequestHandler +{ + public async Task Handle(GetMonitoredLocationsCommand request, CancellationToken cancellationToken) + { + var monitoredLocations = await context.Devices + .Include(d => d.Location) + .Include(d => d.Status) + .Where(d => d.Status.Type == "Online") + .Select(d => d.Location) + .Distinct() + .OrderBy(l => l.Name) + .ToListAsync(cancellationToken); + + var dtos = monitoredLocations.Adapt>(); + + var response = new GetMonitoredLocationsResponse(dtos); + + return response; + } +} diff --git a/Services/Emulators/Emulators.Application/Consumers/DeviceGenerateMeasurementConsumer.cs b/Services/Emulators/Emulators.Application/Consumers/DeviceGenerateMeasurementConsumer.cs index 90865bf..f0ea2e7 100644 --- a/Services/Emulators/Emulators.Application/Consumers/DeviceGenerateMeasurementConsumer.cs +++ b/Services/Emulators/Emulators.Application/Consumers/DeviceGenerateMeasurementConsumer.cs @@ -151,11 +151,27 @@ public async Task Consume(ConsumeContext context) .OrderByDescending(s => s.Time) .FirstOrDefault(); + if (previousSample is null) + { + // Get last one + previousSample = chart.Samples + .OrderByDescending(x => x.Time) + .FirstOrDefault(); + } + var nextSample = chart.Samples .Where(s => s.Time > targetTime) .OrderBy(s => s.Time) .FirstOrDefault(); + // It is the 00:00 + if (nextSample is null) + { + nextSample = chart.Samples + .OrderBy(x => x.Time) + .FirstOrDefault(); + } + var seconds = targetTime.Minute * 60 + targetTime.Second; var deltaValue = nextSample.Value - previousSample.Value; var deltaValuePerSec = deltaValue / (60 * 60); diff --git a/Services/Emulators/Emulators.WebApp/Emulators.WebApp.csproj b/Services/Emulators/Emulators.WebApp/Emulators.WebApp.csproj index 82b2fb9..da62161 100644 --- a/Services/Emulators/Emulators.WebApp/Emulators.WebApp.csproj +++ b/Services/Emulators/Emulators.WebApp/Emulators.WebApp.csproj @@ -10,6 +10,7 @@ + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/Services/Emulators/Emulators.WebApp/GlobalUsings.cs b/Services/Emulators/Emulators.WebApp/GlobalUsings.cs index 0204e92..309760b 100644 --- a/Services/Emulators/Emulators.WebApp/GlobalUsings.cs +++ b/Services/Emulators/Emulators.WebApp/GlobalUsings.cs @@ -1,2 +1,4 @@ global using Emulators.Application; -global using Emulators.Infrastructure; \ No newline at end of file +global using Emulators.Infrastructure; +global using HealthChecks.UI.Client; +global using Microsoft.AspNetCore.Diagnostics.HealthChecks; \ No newline at end of file diff --git a/Services/Emulators/Emulators.WebApp/Program.cs b/Services/Emulators/Emulators.WebApp/Program.cs index cae2bf9..fe29abb 100644 --- a/Services/Emulators/Emulators.WebApp/Program.cs +++ b/Services/Emulators/Emulators.WebApp/Program.cs @@ -2,12 +2,33 @@ builder.Services.AddInfrastructureServices(builder.Configuration, builder.Environment); builder.Services.AddApplicationServices(builder.Configuration, builder.Environment); +builder.Services.AddHealthChecks(); + +builder.Services.AddCors(options => +{ + options.AddDefaultPolicy(policy => + { + policy.WithOrigins("http://localhost:5173") + .AllowAnyHeader() + .AllowCredentials() + .AllowAnyMethod(); + }); +}); builder.Configuration.AddJsonFile("appsettings.json", optional: true, reloadOnChange: true); builder.Configuration.AddUserSecrets(); var app = builder.Build(); +app.UseRouting(); + +app.UseCors(); + app.MapGet("/", () => "Hello World!"); +app.MapHealthChecks("/emulators/health", new HealthCheckOptions +{ + ResponseWriter = UIResponseWriter.WriteHealthCheckUIResponse, +}); + app.Run(); \ No newline at end of file diff --git a/Services/Gateways/Gateways.Main/appsettings.json b/Services/Gateways/Gateways.Main/appsettings.json index b34f29f..e317976 100644 --- a/Services/Gateways/Gateways.Main/appsettings.json +++ b/Services/Gateways/Gateways.Main/appsettings.json @@ -28,6 +28,13 @@ "Path": "/raports-service/{**catch-all}" }, "Transforms": [ { "PathPattern": "{**catch-all}" } ] + }, + "emulators-route": { + "ClusterId": "emulators-cluster", + "Match": { + "Path": "/emulators-service/{**catch-all}" + }, + "Transforms": [ { "PathPattern": "{**catch-all}" } ] } }, "Clusters": { @@ -51,6 +58,13 @@ "Address": "http://raports.api:8080/" } } + }, + "emulators-cluster": { + "Destinations": { + "destination1": { + "Address": "http://emulators.webapp:8080/" + } + } } } } diff --git a/Services/Measurements/Measurements.API/Program.cs b/Services/Measurements/Measurements.API/Program.cs index de3466d..1a4da4c 100644 --- a/Services/Measurements/Measurements.API/Program.cs +++ b/Services/Measurements/Measurements.API/Program.cs @@ -28,7 +28,7 @@ app.UseCors(); -app.UseHealthChecks("/health", new HealthCheckOptions +app.MapHealthChecks("/measurements/health", new HealthCheckOptions { ResponseWriter = UIResponseWriter.WriteHealthCheckUIResponse, }); diff --git a/Services/Measurements/Measurements.Application/Mappers/MeasurementMapper.cs b/Services/Measurements/Measurements.Application/Mappers/MeasurementMapper.cs index 4408ca1..b9adb3a 100644 --- a/Services/Measurements/Measurements.Application/Mappers/MeasurementMapper.cs +++ b/Services/Measurements/Measurements.Application/Mappers/MeasurementMapper.cs @@ -101,6 +101,54 @@ public void Register(TypeAdapterConfig config) .Map(x => x.Illuminance, y => y.Illuminance) .Map(x => x.SoundLevel, y => y.SoundLevel); + TypeAdapterConfig + .NewConfig() + .Map(x => x.ID, y => y.ID) + .Map(x => x.DeviceNumber, y => y.DeviceNumber) + .Map(x => x.MeasurementCaptureDate, y => y.MeasurementCaptureDate) + .Map(x => x.LocationHash, y => y.LocationHash) + .Map(x => x.Temperature, y => y.Temperature) + .Map(x => x.Humidity, y => y.Humidity) + .Map(x => x.CarbonDioxide, y => y.CarbonDioxide) + .Map(x => x.VolatileOrganicCompounds, y => y.VolatileOrganicCompounds) + .Map(x => x.ParticulateMatter1, y => y.ParticulateMatter1) + .Map(x => x.ParticulateMatter2v5, y => y.ParticulateMatter2v5) + .Map(x => x.ParticulateMatter10, y => y.ParticulateMatter10) + .Map(x => x.Formaldehyde, y => y.Formaldehyde) + .Map(x => x.CarbonMonoxide, y => y.CarbonMonoxide) + .Map(x => x.Ozone, y => y.Ozone) + .Map(x => x.Ammonia, y => y.Ammonia) + .Map(x => x.Airflow, y => y.Airflow) + .Map(x => x.AirIonizationLevel, y => y.AirIonizationLevel) + .Map(x => x.Oxygen, y => y.Oxygen) + .Map(x => x.Radon, y => y.Radon) + .Map(x => x.Illuminance, y => y.Illuminance) + .Map(x => x.SoundLevel, y => y.SoundLevel); + + TypeAdapterConfig + .NewConfig() + .Map(x => x.ID, y => y.ID) + .Map(x => x.DeviceNumber, y => y.DeviceNumber) + .Map(x => x.MeasurementCaptureDate, y => y.MeasurementCaptureDate) + .Map(x => x.LocationHash, y => y.LocationHash) + .Map(x => x.Temperature, y => y.Temperature) + .Map(x => x.Humidity, y => y.Humidity) + .Map(x => x.CarbonDioxide, y => y.CarbonDioxide) + .Map(x => x.VolatileOrganicCompounds, y => y.VolatileOrganicCompounds) + .Map(x => x.ParticulateMatter1, y => y.ParticulateMatter1) + .Map(x => x.ParticulateMatter2v5, y => y.ParticulateMatter2v5) + .Map(x => x.ParticulateMatter10, y => y.ParticulateMatter10) + .Map(x => x.Formaldehyde, y => y.Formaldehyde) + .Map(x => x.CarbonMonoxide, y => y.CarbonMonoxide) + .Map(x => x.Ozone, y => y.Ozone) + .Map(x => x.Ammonia, y => y.Ammonia) + .Map(x => x.Airflow, y => y.Airflow) + .Map(x => x.AirIonizationLevel, y => y.AirIonizationLevel) + .Map(x => x.Oxygen, y => y.Oxygen) + .Map(x => x.Radon, y => y.Radon) + .Map(x => x.Illuminance, y => y.Illuminance) + .Map(x => x.SoundLevel, y => y.SoundLevel); + // --------------- Mappings with dtos types --------------- TypeAdapterConfig .NewConfig() diff --git a/Services/Raports/Raports.API/Program.cs b/Services/Raports/Raports.API/Program.cs index bd0a4d2..085df95 100644 --- a/Services/Raports/Raports.API/Program.cs +++ b/Services/Raports/Raports.API/Program.cs @@ -22,9 +22,11 @@ app.UseApplicationServices(); +app.UseRouting(); + app.UseCors(); -app.UseHealthChecks("/health", new HealthCheckOptions +app.MapHealthChecks("/raports/health", new HealthCheckOptions { ResponseWriter = UIResponseWriter.WriteHealthCheckUIResponse, }); diff --git a/Services/Raports/Raports.Application/Consumers/AppendRaportConsumer.cs b/Services/Raports/Raports.Application/Consumers/AppendRaportConsumer.cs new file mode 100644 index 0000000..6c45ac6 --- /dev/null +++ b/Services/Raports/Raports.Application/Consumers/AppendRaportConsumer.cs @@ -0,0 +1,524 @@ +namespace Raports.Application.Consumers; + +internal class AppendRaportConsumer(ILogger logger, + IPublishEndpoint publish, + RaportsDBContext database, + IMemoryCache cashe, + IHubContext hub, + MeasurementService.MeasurementServiceClient measurementsGRPC, + DevicesService.DevicesServiceClient deviceGRPC) : IConsumer +{ + public async Task Consume(ConsumeContext context) + { + try + { + logger.LogInformation($"Processing Hourly raport"); + + var cache = cashe; // keep original parameter name but use a clearer local variable + var ct = context.CancellationToken; + + // Update raport status to processing + var processingStatus = await database.Statuses.FirstOrDefaultAsync(x => x.Name == "Processing"); + if (processingStatus is null) + { + // Handle + throw new InvalidOperationException(); + } + + var raport = await database.Raports + .Include(x => x.RequestedLocations) + .Include(x => x.RequestedMeasurements) + .Include(x => x.Period) + .Include(x => x.Status) + .FirstOrDefaultAsync(x => x.ID == context.Message.Raport.ID); + if (raport is null) + { + // Handle + throw new InvalidOperationException(); + } + + raport.StatusID = processingStatus.ID; + + var entry = database.Entry(raport); + database.ChangeTracker.DetectChanges(); + + var wasChanged = entry.Properties.Any(p => p.IsModified) || entry.ComplexProperties.Any(c => c.IsModified); + + if (wasChanged) + { + var dto = raport.Adapt(); + + await database.SaveChangesAsync(ct); + await hub.Clients.All.SendAsync("RaportStatusChanged", dto, ct); + } + + // Fetch and cache LocationGRPC objects from Devices service for requested locations + var cashedLocations = new List(); + foreach (var requestedLocation in context.Message.Raport.RequestedLocations) + { + var key = $"{nameof(LocationGRPC)}:{requestedLocation.Hash}"; + + var cashedLocation = cache.Get(key); + if (cashedLocation is null) + { + try + { + var response = await deviceGRPC.GetLocationByNameAsync(new LocationByNameRequest() { Name = requestedLocation.Name }).ResponseAsync.WaitAsync(ct); + var castedResponse = response.Adapt(); + + cache.Set(key, castedResponse); + cashedLocation = castedResponse; + } + catch (OperationCanceledException) + { + logger.LogInformation("Location lookup canceled for {Location}", requestedLocation.Name); + throw; + } + catch (Exception ex) + { + logger.LogWarning(ex, "Failed to fetch location '{Location}' from Devices service; skipping location", requestedLocation.Name); + continue; + } + } + + cashedLocations.Add(cashedLocation); + } + + if (cashedLocations.Count == 0) + { + logger.LogInformation("No locations to query measurements for; aborting."); + return; + } + + // Get Measurements from Measurements service via gRPC + var grpcMeasurements = new List(); + var request = new MeasurementsQueryRequest() + { + SortOrder = "asc", + StartDate = context.Message.Raport.StartDate.ToString("o"), + EndDate = context.Message.Raport.EndDate.ToString("o"), + }; + + foreach (var loc in cashedLocations) + { + request.LocationIds.Add(loc.Hash.ToString() ?? string.Empty); + } + + try + { + using (var call = measurementsGRPC.MeasurementsQuery(request)) + { + while (await call.ResponseStream.MoveNext(ct)) + { + var current = call.ResponseStream.Current; + + if (!Guid.TryParse(current.Id, out var id)) + { + logger.LogWarning("Skipping measurement with invalid Id: {Id}", current.Id); + continue; + } + + if (!Guid.TryParse(current.DeviceNumber, out var deviceNumber)) + { + logger.LogWarning("Skipping measurement with invalid DeviceNumber: {DeviceNumber}", current.DeviceNumber); + continue; + } + + if (!DateTime.TryParse(current.MeasurementCaptureDate, out var measurementCaptureDate)) + { + logger.LogWarning("Skipping measurement with invalid MeasurementCaptureDate: {Date}", current.MeasurementCaptureDate); + continue; + } + + if (!Guid.TryParse(current.LocationHash, out var locationHash)) + { + logger.LogWarning("Skipping measurement with invalid LocationHash: {LocationHash}", current.LocationHash); + continue; + } + + var mes = new MeasurementGRPC( + id, + deviceNumber, + measurementCaptureDate, + locationHash, + + current.Temperature == double.MinValue ? null : current.Temperature, + current.Humidity == double.MinValue ? null : current.Humidity, + current.Co2 == double.MinValue ? null : current.Co2, + current.Voc == double.MinValue ? null : current.Voc, + current.ParticulateMatter1 == double.MinValue ? null : current.ParticulateMatter1, + current.ParticulateMatter2V5 == double.MinValue ? null : current.ParticulateMatter2V5, + current.ParticulateMatter10 == double.MinValue ? null : current.ParticulateMatter10, + current.Formaldehyde == double.MinValue ? null : current.Formaldehyde, + current.Co == double.MinValue ? null : current.Co, + current.O3 == double.MinValue ? null : current.O3, + current.Ammonia == double.MinValue ? null : current.Ammonia, + current.Airflow == double.MinValue ? null : current.Airflow, + current.AirIonizationLevel == double.MinValue ? null : current.AirIonizationLevel, + current.O2 == double.MinValue ? null : current.O2, + current.Radon == double.MinValue ? null : current.Radon, + current.Illuminance == double.MinValue ? null : current.Illuminance, + current.SoundLevel == double.MinValue ? null : current.SoundLevel + ); + + grpcMeasurements.Add(mes); + } + } + } + catch (OperationCanceledException) + { + logger.LogInformation("Measurement streaming canceled"); + throw; + } + catch (Exception ex) + { + logger.LogError(ex, "Error while querying measurements"); + } + + // sort oldest to newest and group by location + var measurementsByLocation = grpcMeasurements + .OrderBy(m => m.MeasurementCaptureDate) + .GroupBy(m => m.LocationHash) + .ToDictionary(g => g.Key, g => g.ToList()); + + // Determine timeframe for buckets from the Raport Period + TimeSpan timeframe = TimeSpan.Zero; + try + { + timeframe = context.Message.Raport.Period?.TimeFrame ?? TimeSpan.Zero; + } + catch + { + timeframe = TimeSpan.Zero; + } + + if (timeframe <= TimeSpan.Zero) + { + logger.LogWarning("Period timeframe is not set or invalid. Falling back to 1 hour."); + timeframe = TimeSpan.FromHours(1); + } + + var start = context.Message.Raport.StartDate; + var end = context.Message.Raport.EndDate; + + // For each location, create centered windows for each timepoint from start..end (inclusive of end) + var groupedByLocationAndTime = new Dictionary Measurements)>>(); + + foreach (var loc in measurementsByLocation) + { + var buckets = new List<(DateTime TimePoint, DateTime WindowStart, DateTime WindowEnd, List)>(); + + // build timepoints: start, start+timeframe, ..., up to <= end. Always include end as last point. + var timepoints = new List(); + for (var tp = start; tp <= end; tp = tp.Add(timeframe)) + { + timepoints.Add(tp); + } + if (timepoints.Count == 0 || timepoints.Last() < end) + { + timepoints.Add(end); + } + + var half = TimeSpan.FromTicks((long)(timeframe.Ticks / 2.0)); + + foreach (var tp in timepoints) + { + var windowStart = tp - half; + var windowEnd = tp + half; + + // clamp to report range + if (windowStart < start) windowStart = start; + if (windowEnd > end) windowEnd = end; + + // include measurements: >= windowStart and < windowEnd, but include measurements equal to end for last bucket + List items; + if (windowEnd == end) + { + items = loc.Value.Where(m => m.MeasurementCaptureDate >= windowStart && m.MeasurementCaptureDate <= windowEnd).ToList(); + } + else + { + items = loc.Value.Where(m => m.MeasurementCaptureDate >= windowStart && m.MeasurementCaptureDate < windowEnd).ToList(); + } + + buckets.Add((tp, windowStart, windowEnd, items)); + } + + groupedByLocationAndTime[loc.Key] = buckets; + } + + // Aggregate averages per bucket excluding nulls + + var aggregatedByLocation = new Dictionary>(); + + foreach (var kv in groupedByLocationAndTime) + { + var list = new List(); + + foreach (var bucket in kv.Value) + { + var items = bucket.Measurements; + + double? Avg(IEnumerable seq) + { + var vals = seq.Where(x => x.HasValue).Select(x => x!.Value).ToList(); + return vals.Count == 0 ? null : (double?)vals.Average(); + } + + var agg = new AggregatedBucket( + bucket.TimePoint, + bucket.WindowStart, + bucket.WindowEnd, + Avg(items.Select(x => x.Temperature)), + Avg(items.Select(x => x.Humidity)), + Avg(items.Select(x => x.CarbonDioxide)), + Avg(items.Select(x => x.VolatileOrganicCompounds)), + Avg(items.Select(x => x.ParticulateMatter1)), + Avg(items.Select(x => x.ParticulateMatter2v5)), + Avg(items.Select(x => x.ParticulateMatter10)), + Avg(items.Select(x => x.Formaldehyde)), + Avg(items.Select(x => x.CarbonMonoxide)), + Avg(items.Select(x => x.Ozone)), + Avg(items.Select(x => x.Ammonia)), + Avg(items.Select(x => x.Airflow)), + Avg(items.Select(x => x.AirIonizationLevel)), + Avg(items.Select(x => x.Oxygen)), + Avg(items.Select(x => x.Radon)), + Avg(items.Select(x => x.Illuminance)), + Avg(items.Select(x => x.SoundLevel)), + items.Count); + + list.Add(agg); + } + + aggregatedByLocation[kv.Key] = list; + } + + // Persist aggregated data into Raports DB + // create raport + var periodId = context.Message.Raport.Period?.ID ?? 0; + if (periodId == 0) + { + var period = await database.Periods.FirstOrDefaultAsync(p => p.TimeFrame == timeframe); + periodId = period?.ID ?? 1; + } + + var dbLocations = await database.Locations.ToListAsync(); + var status = await database.Statuses.FirstOrDefaultAsync(s => s.Name == "Completed") ?? await database.Statuses.FirstOrDefaultAsync(); + + // prepare mapping and lookups + var measurementsLookup = await database.Measurements.ToListAsync(); + + var measurementMap = new Dictionary() + { + { "Temperature", "Air Temperature" }, + { "Humidity", "Relative Humidity" }, + { "CarbonDioxide", "Carbon Dioxide" }, + { "VolatileOrganicCompounds", "Volatile Organic Compounds" }, + { "ParticulateMatter1", "Particulate Matter 1um" }, + { "ParticulateMatter2v5", "Particulate Matter 2.5um" }, + { "ParticulateMatter10", "Particulate Matter 10um" }, + { "Formaldehyde", "Formaldehyde" }, + { "CarbonMonoxide", "Carbon Monoxide" }, + { "Ozone", "Ozone" }, + { "Ammonia", "Ammonia" }, + { "Airflow", "Air Flow Rate" }, + { "AirIonizationLevel", "Air Ionization Level" }, + { "Oxygen", "Oxygen Concentration" }, + { "Radon", "Radon Concentration" }, + { "Illuminance", "Illuminance level" }, + { "SoundLevel", "Sound Pressure Level" } + }; + + foreach (var mapEntry in measurementMap) + { + var key = mapEntry.Key; // property name on AggregatedBucket + var measurementName = mapEntry.Value; + + var measurementEntity = measurementsLookup.FirstOrDefault(m => m.Name == measurementName); + if (measurementEntity is null) + { + logger.LogWarning($"Measurement with name '{measurementName}' was not recognized!"); + continue; + } + + var isMeasurementTypeRequested = context.Message.Raport.RequestedMeasurements.Any(x => x.Name == measurementName); + if (isMeasurementTypeRequested == false) + { + logger.LogWarning($"Measurement '{measurementEntity.Name}' was not requested for this raport, skipping."); + continue; + } + else + { + logger.LogWarning($"Creating Measurement Group for '{measurementEntity.Name}'"); + } + + var measurementGroup = new MeasurementGroup + { + MeasurementID = measurementEntity.ID, + RaportID = raport.ID, + Summary = string.Empty + }; + + await database.MeasurementGroups.AddAsync(measurementGroup); + await database.SaveChangesAsync(); + + // for each location store location group + samples + foreach (var locEntry in aggregatedByLocation) + { + var locHash = locEntry.Key; + var dbLocation = await database.Locations.FirstOrDefaultAsync(l => l.Hash == locHash); + + if (dbLocation is null) + { + logger.LogWarning($"Location with hash '{locHash}' was not recognized!"); + continue; + } + + var isLocationRequested = context.Message.Raport.RequestedLocations.Any(x => x.Hash == locHash); + if (isLocationRequested == false) + { + logger.LogWarning($"Location '{dbLocation.Name}' was not requested for this raport, skipping.", locHash); + continue; + } + else + { + logger.LogWarning($"Creating Location Group for '{dbLocation.Name}'", locHash); + } + + // Append new location group to DB! + var locationGroup = new LocationGroup + { + LocationID = dbLocation.ID, + MeasurementGroupID = measurementGroup.ID, + Summary = string.Empty + }; + + await database.LocationGroups.AddAsync(locationGroup); + await database.SaveChangesAsync(); + + // Create samples + var samples = new List(); + foreach (var bucket in locEntry.Value) + { + var val = GetMeasurementValueByKey(bucket, key); + if (val.HasValue) + { + samples.Add(new SampleGroup { Date = bucket.TimePoint, Value = val.Value, LocationGroupID = locationGroup.ID }); + } + } + + if (samples.Count > 0) + { + await database.SampleGroups.AddRangeAsync(samples); + await database.SaveChangesAsync(); + } + } + } + + logger.LogInformation($"Hourly raport processed and saved RaportID={raport.ID}"); + + // Forward to next step + var validateRaportMessage = new ValidateRaport() + { + Raport = context.Message.Raport + }; + + await publish.Publish(validateRaportMessage, ct); + + logger.LogInformation($"Raport sent to validation!"); + } + catch (Exception exception) + { + logger.LogInformation($"Forwarding to Raport Failed topic!"); + + // Forward to next step + var failedMessage = new RaportFailed() + { + Raport = context.Message.Raport + }; + + await publish.Publish(failedMessage, context.CancellationToken); + } + } + + private static double? GetMeasurementValueByKey(AggregatedBucket bucket, string key) + { + return key switch + { + "Temperature" => bucket.Temperature, + "Humidity" => bucket.Humidity, + "CarbonDioxide" => bucket.CarbonDioxide, + "VolatileOrganicCompounds" => bucket.VolatileOrganicCompounds, + "ParticulateMatter1" => bucket.ParticulateMatter1, + "ParticulateMatter2v5" => bucket.ParticulateMatter2v5, + "ParticulateMatter10" => bucket.ParticulateMatter10, + "Formaldehyde" => bucket.Formaldehyde, + "CarbonMonoxide" => bucket.CarbonMonoxide, + "Ozone" => bucket.Ozone, + "Ammonia" => bucket.Ammonia, + "Airflow" => bucket.Airflow, + "AirIonizationLevel" => bucket.AirIonizationLevel, + "Oxygen" => bucket.Oxygen, + "Radon" => bucket.Radon, + "Illuminance" => bucket.Illuminance, + "SoundLevel" => bucket.SoundLevel, + _ => null + }; + } + + private sealed class AggregatedBucket + { + public DateTime TimePoint { get; } + public DateTime WindowStart { get; } + public DateTime WindowEnd { get; } + public double? Temperature { get; } + public double? Humidity { get; } + public double? CarbonDioxide { get; } + public double? VolatileOrganicCompounds { get; } + public double? ParticulateMatter1 { get; } + public double? ParticulateMatter2v5 { get; } + public double? ParticulateMatter10 { get; } + public double? Formaldehyde { get; } + public double? CarbonMonoxide { get; } + public double? Ozone { get; } + public double? Ammonia { get; } + public double? Airflow { get; } + public double? AirIonizationLevel { get; } + public double? Oxygen { get; } + public double? Radon { get; } + public double? Illuminance { get; } + public double? SoundLevel { get; } + public int Count { get; } + + public AggregatedBucket(DateTime timePoint, DateTime windowStart, DateTime windowEnd, + double? temperature, double? humidity, double? carbonDioxide, double? volatileOrganicCompounds, + double? particulateMatter1, double? particulateMatter2v5, double? particulateMatter10, + double? formaldehyde, double? carbonMonoxide, double? ozone, double? ammonia, + double? airflow, double? airIonizationLevel, double? oxygen, double? radon, + double? illuminance, double? soundLevel, int count) + { + TimePoint = timePoint; + WindowStart = windowStart; + WindowEnd = windowEnd; + Temperature = temperature; + Humidity = humidity; + CarbonDioxide = carbonDioxide; + VolatileOrganicCompounds = volatileOrganicCompounds; + ParticulateMatter1 = particulateMatter1; + ParticulateMatter2v5 = particulateMatter2v5; + ParticulateMatter10 = particulateMatter10; + Formaldehyde = formaldehyde; + CarbonMonoxide = carbonMonoxide; + Ozone = ozone; + Ammonia = ammonia; + Airflow = airflow; + AirIonizationLevel = airIonizationLevel; + Oxygen = oxygen; + Radon = radon; + Illuminance = illuminance; + SoundLevel = soundLevel; + Count = count; + } + } +} diff --git a/Services/Raports/Raports.Application/Consumers/Document/ProcessDailyDocumentConsumer.cs b/Services/Raports/Raports.Application/Consumers/Document/ProcessDailyDocumentConsumer.cs deleted file mode 100644 index 8a58d56..0000000 --- a/Services/Raports/Raports.Application/Consumers/Document/ProcessDailyDocumentConsumer.cs +++ /dev/null @@ -1,19 +0,0 @@ -namespace Raports.Application.Consumers.Document; - -internal class ProcessDailyDocumentConsumer(ILogger logger, IPublishEndpoint publish) : IConsumer -{ - public async Task Consume(ConsumeContext context) - { - logger.LogInformation($"Creating document for Daily raport"); - - var message = new RaportReady() - { - Raport = context.Message.Raport - }; - - await publish.Publish(message, context => - { - context.Headers.Set("PeriodName", message.Raport.Period.Name); - }); - } -} diff --git a/Services/Raports/Raports.Application/Consumers/Document/ProcessHourlyDocumentConsumer.cs b/Services/Raports/Raports.Application/Consumers/Document/ProcessHourlyDocumentConsumer.cs deleted file mode 100644 index a18a77c..0000000 --- a/Services/Raports/Raports.Application/Consumers/Document/ProcessHourlyDocumentConsumer.cs +++ /dev/null @@ -1,19 +0,0 @@ -namespace Raports.Application.Consumers.Document; - -internal class ProcessHourlyDocumentConsumer(ILogger logger, IPublishEndpoint publish) : IConsumer -{ - public async Task Consume(ConsumeContext context) - { - logger.LogInformation($"Creating document for Hourly raport"); - - var message = new RaportReady() - { - Raport = context.Message.Raport - }; - - await publish.Publish(message, context => - { - context.Headers.Set("PeriodName", message.Raport.Period.Name); - }); - } -} diff --git a/Services/Raports/Raports.Application/Consumers/Document/ProcessMonthlyDocumentConsumer.cs b/Services/Raports/Raports.Application/Consumers/Document/ProcessMonthlyDocumentConsumer.cs deleted file mode 100644 index 560470e..0000000 --- a/Services/Raports/Raports.Application/Consumers/Document/ProcessMonthlyDocumentConsumer.cs +++ /dev/null @@ -1,19 +0,0 @@ -namespace Raports.Application.Consumers.Document; - -internal class ProcessMonthlyDocumentConsumer(ILogger logger, IPublishEndpoint publish) : IConsumer -{ - public async Task Consume(ConsumeContext context) - { - logger.LogInformation($"Creating document for Monthly raport"); - - var message = new RaportReady() - { - Raport = context.Message.Raport - }; - - await publish.Publish(message, context => - { - context.Headers.Set("PeriodName", message.Raport.Period.Name); - }); - } -} diff --git a/Services/Raports/Raports.Application/Consumers/Document/ProcessWeeklyDocumentConsumer.cs b/Services/Raports/Raports.Application/Consumers/Document/ProcessWeeklyDocumentConsumer.cs deleted file mode 100644 index bb73006..0000000 --- a/Services/Raports/Raports.Application/Consumers/Document/ProcessWeeklyDocumentConsumer.cs +++ /dev/null @@ -1,19 +0,0 @@ -namespace Raports.Application.Consumers.Document; - -internal class ProcessWeeklyDocumentConsumer(ILogger logger, IPublishEndpoint publish) : IConsumer -{ - public async Task Consume(ConsumeContext context) - { - logger.LogInformation($"Creating document for Weekly raport"); - - var message = new RaportReady() - { - Raport = context.Message.Raport - }; - - await publish.Publish(message, context => - { - context.Headers.Set("PeriodName", message.Raport.Period.Name); - }); - } -} diff --git a/Services/Raports/Raports.Application/Consumers/GenerateDocumentConsumer.cs b/Services/Raports/Raports.Application/Consumers/GenerateDocumentConsumer.cs index eff3e2d..c854008 100644 --- a/Services/Raports/Raports.Application/Consumers/GenerateDocumentConsumer.cs +++ b/Services/Raports/Raports.Application/Consumers/GenerateDocumentConsumer.cs @@ -7,9 +7,12 @@ namespace Raports.Application.Consumers; internal class GenerateDocumentConsumer(ILogger logger, IPublishEndpoint publish, + IHubContext hub, BlobContainerClient container, RaportsDBContext database) : IConsumer { + private static readonly TimeZoneInfo PolandTimeZone = TimeZoneInfo.FindSystemTimeZoneById("Central European Standard Time"); + public async Task Consume(ConsumeContext context) { logger.LogInformation("GenerateDocumentConsumer: generating document for RaportID={RaportId}", context.Message.Raport.ID); @@ -83,8 +86,10 @@ async Task PublishRaportFailedAsync(DefaultRaportDTO raportDto, int raportId, st document.GeneratePdf(pdfStream); pdfStream.Position = 0; + var documentHash = Guid.NewGuid(); + // Upload to blob storage with raport ID in the name - var blobName = $"Raport-{dbRaport.ID}-{dbRaport.StartDate:yyyy-MM-dd}-{DateTime.UtcNow:yyyy-MM-dd_HH-mm-ss}.pdf"; + var blobName = $"{documentHash}.pdf"; var blobClient = container.GetBlobClient(blobName); // Set blob metadata tags for easy identification and filtering @@ -96,16 +101,6 @@ async Task PublishRaportFailedAsync(DefaultRaportDTO raportDto, int raportId, st var blobMetadata = new Dictionary { { "RaportID", dbRaport.ID.ToString() }, - { "PeriodID", dbRaport.PeriodID.ToString() }, - { "PeriodName", dbRaport.Period?.Name ?? "Unknown" }, - { "StatusID", dbRaport.StatusID.ToString() }, - { "StatusName", dbRaport.Status?.Name ?? "Unknown" }, - { "StartDate", dbRaport.StartDate.ToString("yyyy-MM-dd") }, - { "EndDate", dbRaport.EndDate.ToString("yyyy-MM-dd") }, - { "RaportCreationDate", dbRaport.RaportCreationDate.ToString("yyyy-MM-dd_HH-mm-ss") }, - { "RaportCompletedDate", dbRaport.RaportCompletedDate.ToString("yyyy-MM-dd_HH-mm-ss") }, - { "DocumentGeneratedDate", DateTime.UtcNow.ToString("yyyy-MM-dd_HH-mm-ss") }, - { "MeasurementGroupsCount", dbRaport.MeasurementGroups.Count.ToString() }, { "Version", "1.0" } }; @@ -119,6 +114,42 @@ await blobClient.UploadAsync( cancellationToken: ct); logger.LogInformation("GenerateDocumentConsumer: uploaded PDF '{BlobName}' for Raport {RaportId} with metadata", blobName, dbRaport.ID); + + // Update raport status to processing + var completedStatus = await database.Statuses.FirstOrDefaultAsync(x => x.Name == "Completed"); + if (completedStatus is null) + { + // Handle + throw new InvalidOperationException(); + } + + var raport = await database.Raports + .Include(x => x.RequestedLocations) + .Include(x => x.RequestedMeasurements) + .Include(x => x.Period) + .Include(x => x.Status) + .FirstOrDefaultAsync(x => x.ID == context.Message.Raport.ID); + if (raport is null) + { + // Handle + throw new InvalidOperationException(); + } + + raport.StatusID = completedStatus.ID; + raport.DocumentHash = documentHash; + + var entry = database.Entry(raport); + database.ChangeTracker.DetectChanges(); + + var wasChanged = entry.Properties.Any(p => p.IsModified) || entry.ComplexProperties.Any(c => c.IsModified); + + if (wasChanged) + { + var dto = raport.Adapt(); + + await database.SaveChangesAsync(ct); + await hub.Clients.All.SendAsync("RaportStatusChanged", dto, ct); + } } catch (Exception ex) { @@ -129,6 +160,9 @@ await blobClient.UploadAsync( private void ComposeHeader(IContainer container, Raport raport) { + var startDateLocal = TimeZoneInfo.ConvertTimeFromUtc(raport.StartDate, PolandTimeZone); + var endDateLocal = TimeZoneInfo.ConvertTimeFromUtc(raport.EndDate, PolandTimeZone); + container.Row(row => { row.RelativeItem().Column(column => @@ -143,13 +177,17 @@ private void ComposeHeader(IContainer container, Raport raport) row.RelativeItem().Column(column => { - column.Item().Text($"Date: {raport.StartDate:dd.MM.yyyy} - {raport.EndDate:dd.MM.yyyy}").AlignRight(); + column.Item().Text($"Date: {startDateLocal:dd.MM.yyyy} - {endDateLocal:dd.MM.yyyy}").AlignRight(); }); }); } private void ComposeContent(IContainer container, Raport raport) { + var creationDateLocal = TimeZoneInfo.ConvertTimeFromUtc(raport.RaportCreationDate, PolandTimeZone); + var startDateLocal = TimeZoneInfo.ConvertTimeFromUtc(raport.StartDate, PolandTimeZone); + var endDateLocal = TimeZoneInfo.ConvertTimeFromUtc(raport.EndDate, PolandTimeZone); + container.Column(column => { column.Spacing(15); @@ -163,7 +201,7 @@ private void ComposeContent(IContainer container, Raport raport) column.Item().Row(row => { row.RelativeItem().Text($"Report ID: {raport.ID}").FontSize(14).Bold(); - row.RelativeItem().Text($"Generated: {raport.RaportCreationDate:dd.MM.yyyy HH:mm}").FontSize(12).AlignRight(); + row.RelativeItem().Text($"Generated: {creationDateLocal:dd.MM.yyyy HH:mm}").FontSize(12).AlignRight(); }); column.Item().PaddingVertical(10); @@ -175,7 +213,7 @@ private void ComposeContent(IContainer container, Raport raport) // List of measurements included column.Item().Text("Measurements Included:").FontSize(14).Bold(); column.Item().PaddingVertical(3); - + foreach (var measurementGroup in raport.MeasurementGroups) { var measurementName = measurementGroup.Measurement?.Name ?? "Unknown Measurement"; @@ -206,8 +244,8 @@ private void ComposeContent(IContainer container, Raport raport) // Report period information column.Item().Text("Report Period:").FontSize(14).Bold(); column.Item().PaddingVertical(3); - column.Item().Text($"From: {raport.StartDate:dd.MM.yyyy HH:mm}").FontSize(11); - column.Item().Text($"To: {raport.EndDate:dd.MM.yyyy HH:mm}").FontSize(11); + column.Item().Text($"From: {startDateLocal:dd.MM.yyyy HH:mm}").FontSize(11); + column.Item().Text($"To: {endDateLocal:dd.MM.yyyy HH:mm}").FontSize(11); // START FROM SECOND PAGE: Charts and descriptions // Iterate through measurement groups @@ -293,11 +331,9 @@ private string ComposeMultiLocationChartImage(MeasurementGroup measurementGroup, logger.LogInformation("Generating chart for {MeasurementName} with {LocationCount} locations", measurement.Name, locationGroups.Count); - logger.LogInformation("Generating TEST chart with multi-language text"); - ScottPlot.Plot myPlot = new(); - myPlot.Font.Automatic(); // set font for each item based on its content + myPlot.Font.Automatic(); // Define colors var colors = new List @@ -311,16 +347,17 @@ private string ComposeMultiLocationChartImage(MeasurementGroup measurementGroup, int colorIndex = 0; - // Add data series + // Add data series with local time conversion foreach (var locationGroup in locationGroups) { var samples = locationGroup.SampleGroups.OrderBy(s => s.Date).ToList(); if (!samples.Any()) continue; - var dates = samples.Select(s => s.Date).ToArray(); + // Convert UTC dates to Poland local time + var localDates = samples.Select(s => TimeZoneInfo.ConvertTimeFromUtc(s.Date, PolandTimeZone)).ToArray(); var values = samples.Select(s => s.Value).ToArray(); - var scatter = myPlot.Add.Scatter(dates, values); + var scatter = myPlot.Add.Scatter(localDates, values); scatter.LegendText = locationGroup.Location?.Name ?? "Unknown"; scatter.Color = colors[colorIndex % colors.Count]; scatter.LineWidth = 3; @@ -330,14 +367,18 @@ private string ComposeMultiLocationChartImage(MeasurementGroup measurementGroup, colorIndex++; } + // Convert UTC to local time for chart limits + var startDateLocal = TimeZoneInfo.ConvertTimeFromUtc(raport.StartDate, PolandTimeZone); + var endDateLocal = TimeZoneInfo.ConvertTimeFromUtc(raport.EndDate, PolandTimeZone); + // Configure axes myPlot.Axes.SetLimitsY(measurement.MinChartYValue, measurement.MaxChartYValue); - myPlot.Axes.SetLimitsX(raport.StartDate.ToOADate(), raport.EndDate.ToOADate()); + myPlot.Axes.SetLimitsX(startDateLocal.ToOADate(), endDateLocal.ToOADate()); myPlot.Axes.DateTimeTicksBottom(); // Set chart title and labels myPlot.Title($"{measurement.Name} ({measurement.Unit})"); - myPlot.XLabel("Time"); + myPlot.XLabel("Time (CET/CEST)"); myPlot.YLabel(measurement.Unit); // Show legend @@ -353,7 +394,7 @@ private string ComposeMultiLocationChartImage(MeasurementGroup measurementGroup, // Generate SVG (vector format - scales perfectly, no font issues) var svg = myPlot.GetSvgXml(600, 400); - logger.LogInformation("Generated TEST chart SVG with {Size} characters", svg.Length); + logger.LogInformation("Generated chart SVG with {Size} characters for {MeasurementName}", svg.Length, measurement.Name); return svg; } diff --git a/Services/Raports/Raports.Application/Consumers/GenerateSummaryConsumer.cs b/Services/Raports/Raports.Application/Consumers/GenerateSummaryConsumer.cs index e132852..bdb1914 100644 --- a/Services/Raports/Raports.Application/Consumers/GenerateSummaryConsumer.cs +++ b/Services/Raports/Raports.Application/Consumers/GenerateSummaryConsumer.cs @@ -8,6 +8,8 @@ internal class GenerateSummaryConsumer(ILogger logger, ChatClient openAiClient, RaportsDBContext database) : IConsumer { + private static readonly TimeZoneInfo PolandTimeZone = TimeZoneInfo.FindSystemTimeZoneById("Central European Standard Time"); + public async Task Consume(ConsumeContext context) { logger.LogInformation("GenerateSummaryConsumer: generating summaries for RaportID={RaportId}", context.Message.Raport.ID); @@ -47,7 +49,6 @@ public async Task Consume(ConsumeContext context) foreach (var mg in dbRaport.MeasurementGroups) { - // First, generate summaries for all locations in this measurement group var locationSummaries = new List(); foreach (var lg in mg.LocationGroups) @@ -60,7 +61,11 @@ public async Task Consume(ConsumeContext context) } var samplesText = lg.SampleGroups?.Any() == true - ? string.Join(", ", lg.SampleGroups.Select(s => $"Time: {s.Date:yyyy-MM-dd HH:mm}, Value: {s.Value}")) + ? string.Join(", ", lg.SampleGroups.Select(s => + { + var localTime = TimeZoneInfo.ConvertTimeFromUtc(s.Date, PolandTimeZone); + return $"Time: {localTime:yyyy-MM-dd HH:mm}, Value: {s.Value}"; + })) : "No samples"; var locGroupMessage = new LocationGroupDescription( @@ -76,6 +81,7 @@ public async Task Consume(ConsumeContext context) new SystemChatMessage( "You are a data analyst specializing in home automation measurements. " + "Your job is to analyze measurement data and provide concise summaries. " + + "All timestamps are in Central European Time (Poland). " + "Output ONLY the summary text, no introductory phrases, no extra formatting." ), new UserChatMessage( @@ -124,6 +130,7 @@ public async Task Consume(ConsumeContext context) new SystemChatMessage( "You are a data analyst specializing in home automation measurements. " + "Your job is to analyze summaries from multiple locations and create a comprehensive overview. " + + "All timestamps are in Central European Time (Poland). " + "Output ONLY the summary text, no introductory phrases, no extra formatting." ), new UserChatMessage( diff --git a/Services/Raports/Raports.Application/Consumers/Pending/ProcessDailyRaportConsumer.cs b/Services/Raports/Raports.Application/Consumers/Pending/ProcessDailyRaportConsumer.cs deleted file mode 100644 index 8126aa7..0000000 --- a/Services/Raports/Raports.Application/Consumers/Pending/ProcessDailyRaportConsumer.cs +++ /dev/null @@ -1,19 +0,0 @@ -namespace Raports.Application.Consumers.Pending; - -internal class ProcessDailyRaportConsumer(ILogger logger, IPublishEndpoint publish) : IConsumer -{ - public async Task Consume(ConsumeContext context) - { - logger.LogInformation($"Processing Daily raport"); - - var message = new RaportToSummary() - { - Raport = context.Message.Raport - }; - - await publish.Publish(message, context => - { - context.Headers.Set("PeriodName", message.Raport.Period.Name); - }); - } -} diff --git a/Services/Raports/Raports.Application/Consumers/Pending/ProcessHourlyRaportConsumer.cs b/Services/Raports/Raports.Application/Consumers/Pending/ProcessHourlyRaportConsumer.cs deleted file mode 100644 index 889933d..0000000 --- a/Services/Raports/Raports.Application/Consumers/Pending/ProcessHourlyRaportConsumer.cs +++ /dev/null @@ -1,482 +0,0 @@ -using CommonServiceLibrary.GRPC.Types.Measurements; - -namespace Raports.Application.Consumers.Pending; - -internal class ProcessHourlyRaportConsumer(ILogger logger, - IPublishEndpoint publish, - RaportsDBContext database, - IMemoryCache cashe, - MeasurementService.MeasurementServiceClient measurementsGRPC, - DevicesService.DevicesServiceClient deviceGRPC) : IConsumer -{ - public async Task Consume(ConsumeContext context) - { - logger.LogInformation($"Processing Hourly raport"); - - var cache = cashe; // keep original parameter name but use a clearer local variable - var ct = context.CancellationToken; - - // Fetch and cache LocationGRPC objects from Devices service for requested locations - var cashedLocations = new List(); - foreach (var requestedLocation in context.Message.Raport.RequestedLocations) - { - var key = $"{nameof(LocationGRPC)}:{requestedLocation.Hash}"; - - var cashedLocation = cache.Get(key); - if (cashedLocation is null) - { - try - { - var response = await deviceGRPC.GetLocationByNameAsync(new LocationByNameRequest() { Name = requestedLocation.Name }).ResponseAsync.WaitAsync(ct); - var castedResponse = response.Adapt(); - - cache.Set(key, castedResponse); - cashedLocation = castedResponse; - } - catch (OperationCanceledException) - { - logger.LogInformation("Location lookup canceled for {Location}", requestedLocation.Name); - throw; - } - catch (Exception ex) - { - logger.LogWarning(ex, "Failed to fetch location '{Location}' from Devices service; skipping location", requestedLocation.Name); - continue; - } - } - - cashedLocations.Add(cashedLocation); - } - - if (cashedLocations.Count == 0) - { - logger.LogInformation("No locations to query measurements for; aborting."); - return; - } - - // Get Measurements from Measurements service via gRPC - var grpcMeasurements = new List(); - var request = new MeasurementsQueryRequest() - { - SortOrder = "asc", - StartDate = context.Message.Raport.StartDate.ToString("o"), - EndDate = context.Message.Raport.EndDate.ToString("o"), - }; - - foreach (var loc in cashedLocations) - { - request.LocationIds.Add(loc.Hash.ToString() ?? string.Empty); - } - - try - { - using (var call = measurementsGRPC.MeasurementsQuery(request)) - { - while (await call.ResponseStream.MoveNext(ct)) - { - var current = call.ResponseStream.Current; - - if (!Guid.TryParse(current.Id, out var id)) - { - logger.LogWarning("Skipping measurement with invalid Id: {Id}", current.Id); - continue; - } - - if (!Guid.TryParse(current.DeviceNumber, out var deviceNumber)) - { - logger.LogWarning("Skipping measurement with invalid DeviceNumber: {DeviceNumber}", current.DeviceNumber); - continue; - } - - if (!DateTime.TryParse(current.MeasurementCaptureDate, out var measurementCaptureDate)) - { - logger.LogWarning("Skipping measurement with invalid MeasurementCaptureDate: {Date}", current.MeasurementCaptureDate); - continue; - } - - if (!Guid.TryParse(current.LocationHash, out var locationHash)) - { - logger.LogWarning("Skipping measurement with invalid LocationHash: {LocationHash}", current.LocationHash); - continue; - } - - var mes = new MeasurementGRPC( - id, - deviceNumber, - measurementCaptureDate, - locationHash, - - current.Temperature == double.MinValue ? null : current.Temperature, - current.Humidity == double.MinValue ? null : current.Humidity, - current.Co2 == double.MinValue ? null : current.Co2, - current.Voc == double.MinValue ? null : current.Voc, - current.ParticulateMatter1 == double.MinValue ? null : current.ParticulateMatter1, - current.ParticulateMatter2V5 == double.MinValue ? null : current.ParticulateMatter2V5, - current.ParticulateMatter10 == double.MinValue ? null : current.ParticulateMatter10, - current.Formaldehyde == double.MinValue ? null : current.Formaldehyde, - current.Co == double.MinValue ? null : current.Co, - current.O3 == double.MinValue ? null : current.O3, - current.Ammonia == double.MinValue ? null : current.Ammonia, - current.Airflow == double.MinValue ? null : current.Airflow, - current.AirIonizationLevel == double.MinValue ? null : current.AirIonizationLevel, - current.O2 == double.MinValue ? null : current.O2, - current.Radon == double.MinValue ? null : current.Radon, - current.Illuminance == double.MinValue ? null : current.Illuminance, - current.SoundLevel == double.MinValue ? null : current.SoundLevel - ); - - grpcMeasurements.Add(mes); - } - } - } - catch (OperationCanceledException) - { - logger.LogInformation("Measurement streaming canceled"); - throw; - } - catch (Exception ex) - { - logger.LogError(ex, "Error while querying measurements"); - } - - // sort oldest to newest and group by location - var measurementsByLocation = grpcMeasurements - .OrderBy(m => m.MeasurementCaptureDate) - .GroupBy(m => m.LocationHash) - .ToDictionary(g => g.Key, g => g.ToList()); - - // Determine timeframe for buckets from the Raport Period - TimeSpan timeframe = TimeSpan.Zero; - try - { - timeframe = context.Message.Raport.Period?.TimeFrame ?? TimeSpan.Zero; - } - catch - { - timeframe = TimeSpan.Zero; - } - - if (timeframe <= TimeSpan.Zero) - { - logger.LogWarning("Period timeframe is not set or invalid. Falling back to 1 hour."); - timeframe = TimeSpan.FromHours(1); - } - - var start = context.Message.Raport.StartDate; - var end = context.Message.Raport.EndDate; - - // For each location, create centered windows for each timepoint from start..end (inclusive of end) - var groupedByLocationAndTime = new Dictionary Measurements)>>(); - - foreach (var loc in measurementsByLocation) - { - var buckets = new List<(DateTime TimePoint, DateTime WindowStart, DateTime WindowEnd, List)>(); - - // build timepoints: start, start+timeframe, ..., up to <= end. Always include end as last point. - var timepoints = new List(); - for (var tp = start; tp <= end; tp = tp.Add(timeframe)) - { - timepoints.Add(tp); - } - if (timepoints.Count == 0 || timepoints.Last() < end) - { - timepoints.Add(end); - } - - var half = TimeSpan.FromTicks((long)(timeframe.Ticks / 2.0)); - - foreach (var tp in timepoints) - { - var windowStart = tp - half; - var windowEnd = tp + half; - - // clamp to report range - if (windowStart < start) windowStart = start; - if (windowEnd > end) windowEnd = end; - - // include measurements: >= windowStart and < windowEnd, but include measurements equal to end for last bucket - List items; - if (windowEnd == end) - { - items = loc.Value.Where(m => m.MeasurementCaptureDate >= windowStart && m.MeasurementCaptureDate <= windowEnd).ToList(); - } - else - { - items = loc.Value.Where(m => m.MeasurementCaptureDate >= windowStart && m.MeasurementCaptureDate < windowEnd).ToList(); - } - - buckets.Add((tp, windowStart, windowEnd, items)); - } - - groupedByLocationAndTime[loc.Key] = buckets; - } - - // Aggregate averages per bucket excluding nulls - - var aggregatedByLocation = new Dictionary>(); - - foreach (var kv in groupedByLocationAndTime) - { - var list = new List(); - - foreach (var bucket in kv.Value) - { - var items = bucket.Measurements; - - double? Avg(IEnumerable seq) - { - var vals = seq.Where(x => x.HasValue).Select(x => x!.Value).ToList(); - return vals.Count == 0 ? null : (double?)vals.Average(); - } - - var agg = new AggregatedBucket( - bucket.TimePoint, - bucket.WindowStart, - bucket.WindowEnd, - Avg(items.Select(x => x.Temperature)), - Avg(items.Select(x => x.Humidity)), - Avg(items.Select(x => x.CarbonDioxide)), - Avg(items.Select(x => x.VolatileOrganicCompounds)), - Avg(items.Select(x => x.ParticulateMatter1)), - Avg(items.Select(x => x.ParticulateMatter2v5)), - Avg(items.Select(x => x.ParticulateMatter10)), - Avg(items.Select(x => x.Formaldehyde)), - Avg(items.Select(x => x.CarbonMonoxide)), - Avg(items.Select(x => x.Ozone)), - Avg(items.Select(x => x.Ammonia)), - Avg(items.Select(x => x.Airflow)), - Avg(items.Select(x => x.AirIonizationLevel)), - Avg(items.Select(x => x.Oxygen)), - Avg(items.Select(x => x.Radon)), - Avg(items.Select(x => x.Illuminance)), - Avg(items.Select(x => x.SoundLevel)), - items.Count); - - list.Add(agg); - } - - aggregatedByLocation[kv.Key] = list; - } - - // Persist aggregated data into Raports DB - // create raport - var periodId = context.Message.Raport.Period?.ID ?? 0; - if (periodId == 0) - { - var period = await database.Periods.FirstOrDefaultAsync(p => p.TimeFrame == timeframe); - periodId = period?.ID ?? 1; - } - - var dbLocations = await database.Locations.ToListAsync(); - var status = await database.Statuses.FirstOrDefaultAsync(s => s.Name == "Completed") ?? await database.Statuses.FirstOrDefaultAsync(); - - var raport = await database.Raports.FirstOrDefaultAsync(x => x.ID == context.Message.Raport.ID); - if (raport is null) - { - // ? - throw new InvalidOperationException(); - } - - // prepare mapping and lookups - var measurementsLookup = await database.Measurements.ToListAsync(); - - var measurementMap = new Dictionary() - { - { "Temperature", "Air Temperature" }, - { "Humidity", "Relative Humidity" }, - { "CarbonDioxide", "Carbon Dioxide" }, - { "VolatileOrganicCompounds", "Volatile Organic Compounds" }, - { "ParticulateMatter1", "Particulate Matter 1um" }, - { "ParticulateMatter2v5", "Particulate Matter 2.5um" }, - { "ParticulateMatter10", "Particulate Matter 10um" }, - { "Formaldehyde", "Formaldehyde" }, - { "CarbonMonoxide", "Carbon Monoxide" }, - { "Ozone", "Ozone" }, - { "Ammonia", "Ammonia" }, - { "Airflow", "Air Flow Rate" }, - { "AirIonizationLevel", "Air Ionization Level" }, - { "Oxygen", "Oxygen Concentration" }, - { "Radon", "Radon Concentration" }, - { "Illuminance", "Illuminance level" }, - { "SoundLevel", "Sound Pressure Level" } - }; - - foreach (var mapEntry in measurementMap) - { - var key = mapEntry.Key; // property name on AggregatedBucket - var measurementName = mapEntry.Value; - - var measurementEntity = measurementsLookup.FirstOrDefault(m => m.Name == measurementName); - if (measurementEntity is null) - { - logger.LogWarning($"Measurement with name '{measurementName}' was not recognized!"); - continue; - } - - var isMeasurementTypeRequested = context.Message.Raport.RequestedMeasurements.Any(x => x.Name == measurementName); - if (isMeasurementTypeRequested == false) - { - logger.LogWarning($"Measurement '{measurementEntity.Name}' was not requested for this raport, skipping."); - continue; - } - else - { - logger.LogWarning($"Creating Measurement Group for '{measurementEntity.Name}'"); - } - - var measurementGroup = new MeasurementGroup - { - MeasurementID = measurementEntity.ID, - RaportID = raport.ID, - Summary = string.Empty - }; - - await database.MeasurementGroups.AddAsync(measurementGroup); - await database.SaveChangesAsync(); - - // for each location store location group + samples - foreach (var locEntry in aggregatedByLocation) - { - var locHash = locEntry.Key; - var dbLocation = await database.Locations.FirstOrDefaultAsync(l => l.Hash == locHash); - - if (dbLocation is null) - { - logger.LogWarning($"Location with hash '{locHash}' was not recognized!"); - continue; - } - - var isLocationRequested = context.Message.Raport.RequestedLocations.Any(x => x.Hash == locHash); - if (isLocationRequested == false) - { - logger.LogWarning($"Location '{dbLocation.Name}' was not requested for this raport, skipping.", locHash); - continue; - } - else - { - logger.LogWarning($"Creating Location Group for '{dbLocation.Name}'", locHash); - } - - // Append new location group to DB! - var locationGroup = new LocationGroup - { - LocationID = dbLocation.ID, - MeasurementGroupID = measurementGroup.ID, - Summary = string.Empty - }; - - await database.LocationGroups.AddAsync(locationGroup); - await database.SaveChangesAsync(); - - // Create samples - var samples = new List(); - foreach (var bucket in locEntry.Value) - { - var val = GetMeasurementValueByKey(bucket, key); - if (val.HasValue) - { - samples.Add(new SampleGroup { Date = bucket.TimePoint, Value = val.Value, LocationGroupID = locationGroup.ID }); - } - } - - if (samples.Count > 0) - { - await database.SampleGroups.AddRangeAsync(samples); - await database.SaveChangesAsync(); - } - } - } - - logger.LogInformation($"Hourly raport processed and saved RaportID={raport.ID}"); - - // Forward to next step - var validateRaportMessage = new ValidateRaport() - { - Raport = context.Message.Raport - }; - - await publish.Publish(validateRaportMessage, ct); - - logger.LogInformation($"Raport sent to validation!"); - } - - private static double? GetMeasurementValueByKey(AggregatedBucket bucket, string key) - { - return key switch - { - "Temperature" => bucket.Temperature, - "Humidity" => bucket.Humidity, - "CarbonDioxide" => bucket.CarbonDioxide, - "VolatileOrganicCompounds" => bucket.VolatileOrganicCompounds, - "ParticulateMatter1" => bucket.ParticulateMatter1, - "ParticulateMatter2v5" => bucket.ParticulateMatter2v5, - "ParticulateMatter10" => bucket.ParticulateMatter10, - "Formaldehyde" => bucket.Formaldehyde, - "CarbonMonoxide" => bucket.CarbonMonoxide, - "Ozone" => bucket.Ozone, - "Ammonia" => bucket.Ammonia, - "Airflow" => bucket.Airflow, - "AirIonizationLevel" => bucket.AirIonizationLevel, - "Oxygen" => bucket.Oxygen, - "Radon" => bucket.Radon, - "Illuminance" => bucket.Illuminance, - "SoundLevel" => bucket.SoundLevel, - _ => null - }; - } - - private sealed class AggregatedBucket - { - public DateTime TimePoint { get; } - public DateTime WindowStart { get; } - public DateTime WindowEnd { get; } - public double? Temperature { get; } - public double? Humidity { get; } - public double? CarbonDioxide { get; } - public double? VolatileOrganicCompounds { get; } - public double? ParticulateMatter1 { get; } - public double? ParticulateMatter2v5 { get; } - public double? ParticulateMatter10 { get; } - public double? Formaldehyde { get; } - public double? CarbonMonoxide { get; } - public double? Ozone { get; } - public double? Ammonia { get; } - public double? Airflow { get; } - public double? AirIonizationLevel { get; } - public double? Oxygen { get; } - public double? Radon { get; } - public double? Illuminance { get; } - public double? SoundLevel { get; } - public int Count { get; } - - public AggregatedBucket(DateTime timePoint, DateTime windowStart, DateTime windowEnd, - double? temperature, double? humidity, double? carbonDioxide, double? volatileOrganicCompounds, - double? particulateMatter1, double? particulateMatter2v5, double? particulateMatter10, - double? formaldehyde, double? carbonMonoxide, double? ozone, double? ammonia, - double? airflow, double? airIonizationLevel, double? oxygen, double? radon, - double? illuminance, double? soundLevel, int count) - { - TimePoint = timePoint; - WindowStart = windowStart; - WindowEnd = windowEnd; - Temperature = temperature; - Humidity = humidity; - CarbonDioxide = carbonDioxide; - VolatileOrganicCompounds = volatileOrganicCompounds; - ParticulateMatter1 = particulateMatter1; - ParticulateMatter2v5 = particulateMatter2v5; - ParticulateMatter10 = particulateMatter10; - Formaldehyde = formaldehyde; - CarbonMonoxide = carbonMonoxide; - Ozone = ozone; - Ammonia = ammonia; - Airflow = airflow; - AirIonizationLevel = airIonizationLevel; - Oxygen = oxygen; - Radon = radon; - Illuminance = illuminance; - SoundLevel = soundLevel; - Count = count; - } - } -} diff --git a/Services/Raports/Raports.Application/Consumers/Pending/ProcessMonthlyRaportConsumer.cs b/Services/Raports/Raports.Application/Consumers/Pending/ProcessMonthlyRaportConsumer.cs deleted file mode 100644 index d1a0498..0000000 --- a/Services/Raports/Raports.Application/Consumers/Pending/ProcessMonthlyRaportConsumer.cs +++ /dev/null @@ -1,19 +0,0 @@ -namespace Raports.Application.Consumers.Pending; - -internal class ProcessMonthlyRaportConsumer(ILogger logger, IPublishEndpoint publish) : IConsumer -{ - public async Task Consume(ConsumeContext context) - { - logger.LogInformation($"Processing Monthly raport"); - - var message = new RaportToSummary() - { - Raport = context.Message.Raport - }; - - await publish.Publish(message, context => - { - context.Headers.Set("PeriodName", message.Raport.Period.Name); - }); - } -} diff --git a/Services/Raports/Raports.Application/Consumers/Pending/ProcessWeeklyRaportConsumer.cs b/Services/Raports/Raports.Application/Consumers/Pending/ProcessWeeklyRaportConsumer.cs deleted file mode 100644 index 9df4351..0000000 --- a/Services/Raports/Raports.Application/Consumers/Pending/ProcessWeeklyRaportConsumer.cs +++ /dev/null @@ -1,19 +0,0 @@ -namespace Raports.Application.Consumers.Pending; - -internal class ProcessWeeklyRaportConsumer(ILogger logger, IPublishEndpoint publish) : IConsumer -{ - public async Task Consume(ConsumeContext context) - { - logger.LogInformation($"Processing Weekly raport"); - - var message = new RaportToSummary() - { - Raport = context.Message.Raport - }; - - await publish.Publish(message, context => - { - context.Headers.Set("PeriodName", message.Raport.Period.Name); - }); - } -} diff --git a/Services/Raports/Raports.Application/Consumers/RaportFailedConsumer.cs b/Services/Raports/Raports.Application/Consumers/RaportFailedConsumer.cs index 9f09960..c29a9e8 100644 --- a/Services/Raports/Raports.Application/Consumers/RaportFailedConsumer.cs +++ b/Services/Raports/Raports.Application/Consumers/RaportFailedConsumer.cs @@ -1,57 +1,68 @@ namespace Raports.Application.Consumers; internal class RaportFailedConsumer(ILogger logger, - IPublishEndpoint publish, - RaportsDBContext database, - IMemoryCache cashe, - MeasurementService.MeasurementServiceClient measurementsGRPC, - DevicesService.DevicesServiceClient deviceGRPC) : IConsumer + RaportsDBContext database, + IHubContext hub) : IConsumer { public async Task Consume(ConsumeContext context) { - logger.LogInformation("RaportFailedConsumer: handling failure for RaportID={RaportId}", context.Message.Raport?.ID); + var raportDto = context.Message.Raport; + var description = context.Message.Description; + var raportId = raportDto?.ID; - var ct = context.CancellationToken; + logger.LogInformation("RaportFailedConsumer: handling failure for RaportID={RaportId}", raportId); - if (context.Message is null) + try { - logger.LogWarning("Received empty RaportFailed message"); - return; - } + var ct = context.CancellationToken; - var raportDto = context.Message.Raport; - if (raportDto is null) - { - logger.LogWarning("RaportFailed message does not contain Raport DTO"); - return; - } + if (raportDto is null) + { + logger.LogWarning("RaportFailed message does not contain Raport DTO"); + throw new InvalidOperationException("RaportFailed message must contain a Raport DTO"); + } + + logger.LogWarning("Raport {RaportId} failed: {Description}", raportId, context.Message.Description); + + var failedStatus = await database.Statuses.FirstOrDefaultAsync(x => x.Name == "Failed", ct); + if (failedStatus is null) + { + logger.LogError("Failed status not found in database"); + throw new InvalidOperationException("Status 'Failed' not found in database"); + } + + var raport = await database.Raports + .Include(x => x.RequestedLocations) + .Include(x => x.RequestedMeasurements) + .Include(x => x.Period) + .Include(x => x.Status) + .FirstOrDefaultAsync(x => x.ID == raportDto.ID, ct); - try - { - var raport = await database.Raports.FirstOrDefaultAsync(r => r.ID == raportDto.ID, ct); if (raport is null) { - logger.LogWarning("Raport with ID {RaportId} not found in DB", raportDto.ID); + logger.LogWarning("Raport {RaportId} not found in database", raportId); return; } - var failedStatus = await database.Statuses.FirstOrDefaultAsync(s => s.Name == "Failed", ct) ?? await database.Statuses.FirstOrDefaultAsync(ct); - if (failedStatus is not null) + if (raport.Status.Name == "Failed") { - raport.StatusID = failedStatus.ID; + logger.LogInformation("Raport {RaportId} is already in Failed status, skipping update", raportId); + return; } - // set message and completion date from incoming message - raport.Message = context.Message.Description ?? raport.Message; - raport.RaportCompletedDate = context.Message.FailedDate == default ? DateTime.UtcNow : context.Message.FailedDate; + raport.StatusID = failedStatus.ID; + raport.Message = description; await database.SaveChangesAsync(ct); - logger.LogInformation("Raport ID {RaportId} updated to Failed in DB", raport.ID); + var dto = raport.Adapt(); + await hub.Clients.All.SendAsync("RaportStatusChanged", dto, ct); + + logger.LogInformation("Raport {RaportId} status updated to Failed", raportId); } catch (Exception ex) { - logger.LogError(ex, "Error while handling RaportFailed for RaportID={RaportId}", context.Message.Raport?.ID); + logger.LogError(ex, "Error while handling RaportFailed for RaportID={RaportId}", raportId); throw; } } diff --git a/Services/Raports/Raports.Application/Consumers/Summary/ProcessDailySummaryConsumer.cs b/Services/Raports/Raports.Application/Consumers/Summary/ProcessDailySummaryConsumer.cs deleted file mode 100644 index 8ac6a55..0000000 --- a/Services/Raports/Raports.Application/Consumers/Summary/ProcessDailySummaryConsumer.cs +++ /dev/null @@ -1,19 +0,0 @@ -namespace Raports.Application.Consumers.Summary; - -internal class ProcessDailySummaryConsumer(ILogger logger, IPublishEndpoint publish) : IConsumer -{ - public async Task Consume(ConsumeContext context) - { - logger.LogInformation($"Generating Daily summary"); - - var message = new RaportProduceDocument() - { - Raport = context.Message.Raport - }; - - await publish.Publish(message, context => - { - context.Headers.Set("PeriodName", message.Raport.Period.Name); - }); - } -} diff --git a/Services/Raports/Raports.Application/Consumers/Summary/ProcessHourlySummaryConsumer.cs b/Services/Raports/Raports.Application/Consumers/Summary/ProcessHourlySummaryConsumer.cs deleted file mode 100644 index 6bd356e..0000000 --- a/Services/Raports/Raports.Application/Consumers/Summary/ProcessHourlySummaryConsumer.cs +++ /dev/null @@ -1,19 +0,0 @@ -namespace Raports.Application.Consumers.Summary; - -internal class ProcessHourlySummaryConsumer(ILogger logger, IPublishEndpoint publish) : IConsumer -{ - public async Task Consume(ConsumeContext context) - { - logger.LogInformation($"Generating Hourly summary"); - - var message = new RaportProduceDocument() - { - Raport = context.Message.Raport - }; - - await publish.Publish(message, context => - { - context.Headers.Set("PeriodName", message.Raport.Period.Name); - }); - } -} diff --git a/Services/Raports/Raports.Application/Consumers/Summary/ProcessMonthlySummaryConsumer.cs b/Services/Raports/Raports.Application/Consumers/Summary/ProcessMonthlySummaryConsumer.cs deleted file mode 100644 index 31f2c0e..0000000 --- a/Services/Raports/Raports.Application/Consumers/Summary/ProcessMonthlySummaryConsumer.cs +++ /dev/null @@ -1,19 +0,0 @@ -namespace Raports.Application.Consumers.Summary; - -internal class ProcessMonthlySummaryConsumer(ILogger logger, IPublishEndpoint publish) : IConsumer -{ - public async Task Consume(ConsumeContext context) - { - logger.LogInformation($"Generating Monthly summary"); - - var message = new RaportProduceDocument() - { - Raport = context.Message.Raport - }; - - await publish.Publish(message, context => - { - context.Headers.Set("PeriodName", message.Raport.Period.Name); - }); - } -} diff --git a/Services/Raports/Raports.Application/Consumers/Summary/ProcessWeeklySummaryConsumer.cs b/Services/Raports/Raports.Application/Consumers/Summary/ProcessWeeklySummaryConsumer.cs deleted file mode 100644 index af1c0eb..0000000 --- a/Services/Raports/Raports.Application/Consumers/Summary/ProcessWeeklySummaryConsumer.cs +++ /dev/null @@ -1,19 +0,0 @@ -namespace Raports.Application.Consumers.Summary; - -internal class ProcessWeeklySummaryConsumer(ILogger logger, IPublishEndpoint publish) : IConsumer -{ - public async Task Consume(ConsumeContext context) - { - logger.LogInformation($"Generating Weekly summary"); - - var message = new RaportProduceDocument() - { - Raport = context.Message.Raport - }; - - await publish.Publish(message, context => - { - context.Headers.Set("PeriodName", message.Raport.Period.Name); - }); - } -} diff --git a/Services/Raports/Raports.Application/DependencyInjection.cs b/Services/Raports/Raports.Application/DependencyInjection.cs index 1b51d8a..2d87b33 100644 --- a/Services/Raports/Raports.Application/DependencyInjection.cs +++ b/Services/Raports/Raports.Application/DependencyInjection.cs @@ -85,23 +85,12 @@ public static IServiceCollection AddApplicationServices(this IServiceCollection // MassTransit - Azure Service Bus services.AddMassTransit(config => { - config.AddConsumer(); - config.AddConsumer(); config.AddConsumer(); config.AddConsumer(); config.AddConsumer(); config.AddConsumer(); config.AddConsumer(); - config.AddConsumer(); - config.AddConsumer(); - config.AddConsumer(); - config.AddConsumer(); - config.AddConsumer(); - config.AddConsumer(); - config.AddConsumer(); - config.AddConsumer(); - config.AddConsumer(); - config.AddConsumer(); + config.AddConsumer(); config.AddConsumer(); config.UsingAzureServiceBus((context, configurator) => @@ -173,68 +162,9 @@ public static IServiceCollection AddApplicationServices(this IServiceCollection }); // Pending - configurator.SubscriptionEndpoint("raports-process-hourly-raport", e => + configurator.SubscriptionEndpoint("raports-append-raport", e => { - e.ConfigureConsumer(context); - e.ConfigureConsumeTopology = false; - }); - configurator.SubscriptionEndpoint("raports-process-daily-raport", e => - { - e.ConfigureConsumer(context); - e.ConfigureConsumeTopology = false; - }); - configurator.SubscriptionEndpoint("raports-process-weekly-raport", e => - { - e.ConfigureConsumer(context); - e.ConfigureConsumeTopology = false; - }); - configurator.SubscriptionEndpoint("raports-process-monthly-raport", e => - { - e.ConfigureConsumer(context); - e.ConfigureConsumeTopology = false; - }); - - // Generate document - configurator.SubscriptionEndpoint("raports-hourly-document-raport", e => - { - e.ConfigureConsumer(context); - e.ConfigureConsumeTopology = false; - }); - configurator.SubscriptionEndpoint("raports-daily-document-raport", e => - { - e.ConfigureConsumer(context); - e.ConfigureConsumeTopology = false; - }); - configurator.SubscriptionEndpoint("raports-weekly-document-raport", e => - { - e.ConfigureConsumer(context); - e.ConfigureConsumeTopology = false; - }); - configurator.SubscriptionEndpoint("raports-monthly-document-raport", e => - { - e.ConfigureConsumer(context); - e.ConfigureConsumeTopology = false; - }); - - // Summary - configurator.SubscriptionEndpoint("raports-hourly-summary-raport", e => - { - e.ConfigureConsumer(context); - e.ConfigureConsumeTopology = false; - }); - configurator.SubscriptionEndpoint("raports-daily-summary-raport", e => - { - e.ConfigureConsumer(context); - e.ConfigureConsumeTopology = false; - }); - configurator.SubscriptionEndpoint("raports-weekly-summary-raport", e => - { - e.ConfigureConsumer(context); - e.ConfigureConsumeTopology = false; - }); - configurator.SubscriptionEndpoint("raports-monthly-summary-raport", e => - { - e.ConfigureConsumer(context); + e.ConfigureConsumer(context); e.ConfigureConsumeTopology = false; }); diff --git a/Services/Raports/Raports.Application/GlobalUsings.cs b/Services/Raports/Raports.Application/GlobalUsings.cs index 349beae..a261bd5 100644 --- a/Services/Raports/Raports.Application/GlobalUsings.cs +++ b/Services/Raports/Raports.Application/GlobalUsings.cs @@ -5,6 +5,7 @@ global using CommonServiceLibrary.Exceptions; global using CommonServiceLibrary.Exceptions.Handlers; global using CommonServiceLibrary.GRPC.Types.Devices; +global using CommonServiceLibrary.GRPC.Types.Measurements; global using Devices.GRPCClient; global using FluentValidation; global using Mapster; @@ -24,9 +25,6 @@ global using Microsoft.Extensions.Hosting; global using Microsoft.Extensions.Logging; global using Raports.Application.Consumers; -global using Raports.Application.Consumers.Document; -global using Raports.Application.Consumers.Pending; -global using Raports.Application.Consumers.Summary; global using Raports.Application.Handlers.Create; global using Raports.Application.Handlers.Read; global using Raports.Application.Handlers.Update; diff --git a/Services/Raports/Raports.Application/Handlers/Create/CreateRaportHandler.cs b/Services/Raports/Raports.Application/Handlers/Create/CreateRaportHandler.cs index bd85df2..bd70bc0 100644 --- a/Services/Raports/Raports.Application/Handlers/Create/CreateRaportHandler.cs +++ b/Services/Raports/Raports.Application/Handlers/Create/CreateRaportHandler.cs @@ -1,6 +1,8 @@ namespace Raports.Application.Handlers.Create; -public class CreateRaportHandler(RaportsDBContext database, IHubContext hub, IPublishEndpoint publishEndpoint) : IRequestHandler +public class CreateRaportHandler( + RaportsDBContext database, + IHubContext hub) : IRequestHandler { public async Task Handle(CreateRaportCommand request, CancellationToken cancellationToken) { @@ -34,10 +36,10 @@ public async Task Handle(CreateRaportCommand request, Canc foundMeasurements.Add(foundMeasurement); } - var pendingStatus = await database.Statuses.FirstOrDefaultAsync(x => x.Name == "Pending"); + var pendingStatus = await database.Statuses.FirstOrDefaultAsync(x => x.Name == "Suspended"); if (pendingStatus is null) { - throw new EntityNotFoundException(nameof(Status), "Pending"); + throw new EntityNotFoundException(nameof(Status), "Suspended"); } var newRaport = new Raport() @@ -85,6 +87,7 @@ public async Task Handle(CreateRaportCommand request, Canc .Include(x => x.RequestedMeasurements) .Include(x => x.RequestedLocations) .Include(x => x.Period) + .Include(x => x.Status) .FirstOrDefaultAsync(x => x.ID == newRaport.ID); var dto = newRaport.Adapt(); @@ -92,15 +95,6 @@ public async Task Handle(CreateRaportCommand request, Canc await hub.Clients.All.SendAsync("RaportCreated", dto, cancellationToken); - var message = new RaportPending() - { - Raport = dto - }; - await publishEndpoint.Publish(message, context => - { - context.Headers.Set("PeriodName", message.Raport.Period.Name); - }); - return response; } } \ No newline at end of file diff --git a/Services/Raports/Raports.Application/Handlers/RaportsServiceEndpoints.cs b/Services/Raports/Raports.Application/Handlers/RaportsServiceEndpoints.cs index c8fe1e0..5f0d737 100644 --- a/Services/Raports/Raports.Application/Handlers/RaportsServiceEndpoints.cs +++ b/Services/Raports/Raports.Application/Handlers/RaportsServiceEndpoints.cs @@ -74,8 +74,25 @@ public void AddRoutes(IEndpointRouteBuilder app) return Results.Ok(dto); }); + // ---------- Download raport ---------- + + app.MapGet("/raports/raports/download/{RaportID}", async (ISender sender, int RaportID) => + { + var response = await sender.Send(new DownloadRaportCommand(RaportID)); + + return Results.File(response.FileStream, response.ContentType, response.FileName); + }); + // ---------- Put ---------- + app.MapPut("/raports/raport/retry", async ([FromQuery] int RaportID, ISender sender) => + { + var response = await sender.Send(new RetryRaportCommand(RaportID)); + var dto = response.RaportDTO; + + return Results.Ok(dto); + }); + app.MapPut("/raports/raport/status", async ([FromQuery] int RaportID, [FromQuery] int StatusID, ISender sender) => { var response = await sender.Send(new UpdateRaportStatusCommand(RaportID, StatusID)); diff --git a/Services/Raports/Raports.Application/Handlers/Read/DownloadRaportCommand.cs b/Services/Raports/Raports.Application/Handlers/Read/DownloadRaportCommand.cs new file mode 100644 index 0000000..8fb2252 --- /dev/null +++ b/Services/Raports/Raports.Application/Handlers/Read/DownloadRaportCommand.cs @@ -0,0 +1,20 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using FluentValidation; +using MediatR; + +namespace Raports.Application.Handlers.Read; + +public record DownloadRaportCommand(int RaportID) : IRequest; +public record DownloadRaportResponse(Stream FileStream, string FileName, string ContentType); + +public class DownloadRaportCommandValidator : AbstractValidator +{ + public DownloadRaportCommandValidator() + { + RuleFor(x => x.RaportID).GreaterThan(0); + } +} diff --git a/Services/Raports/Raports.Application/Handlers/Read/DownloadRaportHandler.cs b/Services/Raports/Raports.Application/Handlers/Read/DownloadRaportHandler.cs new file mode 100644 index 0000000..0886fe7 --- /dev/null +++ b/Services/Raports/Raports.Application/Handlers/Read/DownloadRaportHandler.cs @@ -0,0 +1,50 @@ +namespace Raports.Application.Handlers.Read; + +public class DownloadRaportHandler( + RaportsDBContext dbContext, + BlobContainerClient blobContainerClient, + ILogger logger) : IRequestHandler +{ + public async Task Handle(DownloadRaportCommand request, CancellationToken cancellationToken) + { + var raport = await dbContext.Raports + .Include(x => x.Status) + .Include(x => x.Period) + .FirstOrDefaultAsync(x => x.ID == request.RaportID, cancellationToken); + + if (raport is null) + { + logger.LogWarning("Raport {RaportId} not found", request.RaportID); + throw new EntityNotFoundException(nameof(Raport), request.RaportID); + } + + if (raport.Status.Name != "Completed") + { + logger.LogWarning("Raport {RaportId} is not completed. Current status: {Status}", request.RaportID, raport.Status.Name); + throw new InvalidOperationException($"Raport {request.RaportID} is not completed. Current status: {raport.Status.Name}"); + } + + if (raport.DocumentHash == Guid.Empty) + { + logger.LogWarning("Raport {RaportId} does not have a document hash", request.RaportID); + throw new InvalidOperationException($"Raport {request.RaportID} does not have an associated document"); + } + + var fileName = $"{raport.DocumentHash}.pdf"; + var blobClient = blobContainerClient.GetBlobClient(fileName); + + if (!await blobClient.ExistsAsync(cancellationToken)) + { + logger.LogError("Blob {BlobName} not found in Azure Storage for Raport {RaportId}", fileName, request.RaportID); + throw new InvalidOperationException($"Report file not found in storage for Raport {request.RaportID}"); + } + + var stream = new MemoryStream(); + await blobClient.DownloadToAsync(stream, cancellationToken); + stream.Position = 0; + + logger.LogInformation("Successfully downloaded raport {RaportId} from Azure Blob Storage", request.RaportID); + + return new DownloadRaportResponse(stream, fileName, "application/pdf"); + } +} diff --git a/Services/Raports/Raports.Application/Handlers/Update/RetryRaportCommand.cs b/Services/Raports/Raports.Application/Handlers/Update/RetryRaportCommand.cs new file mode 100644 index 0000000..75f17c8 --- /dev/null +++ b/Services/Raports/Raports.Application/Handlers/Update/RetryRaportCommand.cs @@ -0,0 +1,11 @@ +namespace Raports.Application.Handlers.Update; + +public record RetryRaportCommand(int RaportID) : IRequest; + +public class RetryRaportCommandValidator : AbstractValidator +{ + public RetryRaportCommandValidator() + { + RuleFor(x => x.RaportID).GreaterThan(0); + } +} diff --git a/Services/Raports/Raports.Application/Handlers/Update/RetryRaportHandler.cs b/Services/Raports/Raports.Application/Handlers/Update/RetryRaportHandler.cs new file mode 100644 index 0000000..b0eb2a8 --- /dev/null +++ b/Services/Raports/Raports.Application/Handlers/Update/RetryRaportHandler.cs @@ -0,0 +1,66 @@ +namespace Raports.Application.Handlers.Update; + +public class RetryRaportHandler( + RaportsDBContext dbContext, + IHubContext hub, + IPublishEndpoint publish, + ILogger logger + ) : IRequestHandler +{ + public async Task Handle(RetryRaportCommand request, CancellationToken cancellationToken) + { + var raport = await dbContext.Raports + .Include(x => x.Status) + .Include(x => x.Period) + .Include(x => x.RequestedMeasurements) + .Include(x => x.RequestedLocations) + .Include(x => x.MeasurementGroups) + .FirstOrDefaultAsync(x => x.ID == request.RaportID, cancellationToken); + + if (raport is null) + { + logger.LogWarning("Raport {RaportId} not found for retry", request.RaportID); + throw new EntityNotFoundException(nameof(Raport), request.RaportID); + } + + if (raport.Status.Name != "Failed") + { + logger.LogWarning("Raport {RaportId} cannot be retried. Current status: {Status}", request.RaportID, raport.Status.Name); + throw new InvalidOperationException($"Only failed reports can be retried. Current status: {raport.Status.Name}"); + } + + var pendingStatus = await dbContext.Statuses.FirstOrDefaultAsync(x => x.Name == "Pending", cancellationToken); + if (pendingStatus is null) + { + logger.LogError("Pending status not found in database"); + throw new InvalidOperationException("Status 'Pending' not found in database"); + } + + if (raport.MeasurementGroups?.Any() == true) + { + dbContext.MeasurementGroups.RemoveRange(raport.MeasurementGroups); + logger.LogInformation("Deleted {Count} measurement groups for Raport {RaportId}", raport.MeasurementGroups.Count, request.RaportID); + } + + raport.StatusID = pendingStatus.ID; + raport.Message = string.Empty; + raport.DocumentHash = Guid.Empty; + + await dbContext.SaveChangesAsync(cancellationToken); + + var dto = raport.Adapt(); + + await hub.Clients.All.SendAsync("RaportStatusChanged", dto, cancellationToken); + + var message = new RaportPending() + { + Raport = dto + }; + + await publish.Publish(message, cancellationToken); + + logger.LogInformation("Raport {RaportId} retry initiated - status changed to Pending", request.RaportID); + + return new ReadRaportResponse(dto); + } +} diff --git a/Services/Raports/Raports.Application/Handlers/Update/UpdateRaportStatusHandler.cs b/Services/Raports/Raports.Application/Handlers/Update/UpdateRaportStatusHandler.cs index 0572351..c180f84 100644 --- a/Services/Raports/Raports.Application/Handlers/Update/UpdateRaportStatusHandler.cs +++ b/Services/Raports/Raports.Application/Handlers/Update/UpdateRaportStatusHandler.cs @@ -1,6 +1,10 @@ namespace Raports.Application.Handlers.Update; -public class UpdateRaportStatusHandler(RaportsDBContext dbcontext, IHubContext hub) : IRequestHandler +public class UpdateRaportStatusHandler( + RaportsDBContext dbcontext, + IHubContext hub, + IPublishEndpoint publish + ) : IRequestHandler { public async Task Handle(UpdateRaportStatusCommand request, CancellationToken cancellationToken) { @@ -13,6 +17,8 @@ public async Task Handle(UpdateRaportStatusCommand request, var requestEntity = await dbcontext.Raports .Include(x => x.Status) .Include(x => x.Period) + .Include(x => x.RequestedMeasurements) + .Include(x => x.RequestedLocations) .FirstOrDefaultAsync(x => x.ID == request.RaportID); if (requestEntity is null) { @@ -32,7 +38,17 @@ public async Task Handle(UpdateRaportStatusCommand request, if (wasChanged) { await dbcontext.SaveChangesAsync(cancellationToken); - await hub.Clients.All.SendAsync("RequestStatusChanged", dto, cancellationToken); + await hub.Clients.All.SendAsync("RaportStatusChanged", dto, cancellationToken); + + if (requestEntity.Status.Name == "Pending") + { + var message = new RaportPending() + { + Raport = dto + }; + + await publish.Publish(message, cancellationToken); + } } return response; diff --git a/docker-compose.override.yml b/docker-compose.override.yml index 6bbb927..23468b0 100644 --- a/docker-compose.override.yml +++ b/docker-compose.override.yml @@ -109,15 +109,23 @@ services: emulators.webapp: container_name: Emulators.Service - restart: on-failure + restart: unless-stopped environment: - ASPNETCORE_ENVIRONMENT=Development - ASPNETCORE_HTTP_PORTS=8080 - ASPNETCORE_HTTPS_PORTS=8081 + ports: + - "6004:8080" + - "6064:8081" depends_on: - emulators.db - gateways.main - devices.api + volumes: + - ${APPDATA}/Microsoft/UserSecrets:/home/app/.microsoft/usersecrets:ro + - ${APPDATA}/Microsoft/UserSecrets:/root/.microsoft/usersecrets:ro + - ${APPDATA}/ASP.NET/Https:/home/app/.aspnet/https:ro + - ${APPDATA}/ASP.NET/Https:/root/.aspnet/https:ro volumes: devices_db_data: