From 08fb3e4f10cff2034e34f081f8242418579a6a3d Mon Sep 17 00:00:00 2001 From: Pol Pinol Castuera Date: Mon, 26 May 2025 22:30:25 +0200 Subject: [PATCH] add more logs --- .../PubsubDecisionsEventSubscriber.java | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubDecisionsEventSubscriber.java b/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubDecisionsEventSubscriber.java index 8b8e8b0..bdd94d6 100644 --- a/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubDecisionsEventSubscriber.java +++ b/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubDecisionsEventSubscriber.java @@ -76,11 +76,19 @@ private void processMessage(PubsubMessage message, AckReplyConsumer consumer) { log.info("Processing event type={} msgId={}", event.getType(), msgId); if ("ASSET_DECISION_TAKEN".equals(event.getType())) { - if (event.getAggregateId() == null || event.getPayload() == null || - !event.getPayload().containsKey("assetId") || !event.getPayload().containsKey("decision") || - !event.getPayload().containsKey("riskLevel")) { - log.warn("Malformed event: Skipping ASSET_DECISION_TAKEN event with missing fields msgId={}", msgId); - consumer.ack(); + if (!event.getPayload().containsKey("assetId")) { + log.warn("Event payload missing 'assetId' field, ignoring event msgId={}", msgId); + consumer.nack(); + return; + } + if (!event.getPayload().containsKey("decision")) { + log.warn("Event payload missing 'decision' field, ignoring event msgId={}", msgId); + consumer.nack(); + return; + } + if (!event.getPayload().containsKey("riskLevel")) { + log.warn("Event payload missing 'riskLevel' field, ignoring event msgId={}", msgId); + consumer.nack(); return; } EmitAlertsCommand cmd = new EmitAlertsCommand(