diff --git a/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubDecisionsEventSubscriber.java b/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubDecisionsEventSubscriber.java index 8b8e8b0..bdd94d6 100644 --- a/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubDecisionsEventSubscriber.java +++ b/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubDecisionsEventSubscriber.java @@ -76,11 +76,19 @@ private void processMessage(PubsubMessage message, AckReplyConsumer consumer) { log.info("Processing event type={} msgId={}", event.getType(), msgId); if ("ASSET_DECISION_TAKEN".equals(event.getType())) { - if (event.getAggregateId() == null || event.getPayload() == null || - !event.getPayload().containsKey("assetId") || !event.getPayload().containsKey("decision") || - !event.getPayload().containsKey("riskLevel")) { - log.warn("Malformed event: Skipping ASSET_DECISION_TAKEN event with missing fields msgId={}", msgId); - consumer.ack(); + if (!event.getPayload().containsKey("assetId")) { + log.warn("Event payload missing 'assetId' field, ignoring event msgId={}", msgId); + consumer.nack(); + return; + } + if (!event.getPayload().containsKey("decision")) { + log.warn("Event payload missing 'decision' field, ignoring event msgId={}", msgId); + consumer.nack(); + return; + } + if (!event.getPayload().containsKey("riskLevel")) { + log.warn("Event payload missing 'riskLevel' field, ignoring event msgId={}", msgId); + consumer.nack(); return; } EmitAlertsCommand cmd = new EmitAlertsCommand(