From ff3ff06ed478ddf804d764a3ab80949b312b630b Mon Sep 17 00:00:00 2001 From: Pol Pinol Castuera Date: Thu, 29 May 2025 22:16:31 +0200 Subject: [PATCH 1/3] Dont add @Id in alerts read model --- .../infrastructure/read_models/alerts/DecisionDocument.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/DecisionDocument.java b/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/DecisionDocument.java index 42772d9..a7eae2e 100644 --- a/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/DecisionDocument.java +++ b/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/DecisionDocument.java @@ -13,7 +13,6 @@ @Getter @Document(collection = "alerts") public class DecisionDocument { - @Id private String userId; private String assetId; private String type; From dc7dc7a59ada54556e61bd16e2d5a7046d3d6a4e Mon Sep 17 00:00:00 2001 From: Pol Pinol Castuera Date: Sat, 31 May 2025 10:27:36 +0200 Subject: [PATCH 2/3] Add sonar and linter --- HELP.md | 1 - build.gradle | 80 ++++++++++++------- .../io/autoinvestor/application/AlertDTO.java | 7 +- .../AlertsReadModelRepository.java | 1 + .../application/EmitAlertsCommand.java | 7 +- .../application/EmitAlertsCommandHandler.java | 38 ++++++--- .../application/GetAlertsQueryHandler.java | 10 +-- .../application/GetAlertsQueryResponse.java | 6 +- .../application/GetDecisionsQuery.java | 2 +- .../application/InboxReadModelRepository.java | 1 + .../RegisterPortfolioAssetCommand.java | 3 +- .../RegisterPortfolioAssetCommandHandler.java | 13 +-- .../application/RegisterUserCommand.java | 3 +- .../RegisterUserCommandHandler.java | 9 ++- src/main/java/io/autoinvestor/domain/Id.java | 6 +- .../domain/PortfolioRepository.java | 3 + .../domain/events/AlertEmittedEvent.java | 35 ++++---- .../events/AlertEmittedEventPayload.java | 10 +-- .../io/autoinvestor/domain/events/Event.java | 3 +- .../autoinvestor/domain/events/EventId.java | 1 - .../domain/events/EventSourcedEntity.java | 1 - .../domain/events/EventStoreRepository.java | 3 +- .../domain/events/InboxCreatedEvent.java | 28 +++---- .../events/InboxCreatedEventPayload.java | 7 +- .../events/SubscriptionCreatedEvent.java | 29 ++++--- .../SubscriptionCreatedEventPayload.java | 8 +- .../io/autoinvestor/domain/model/Alert.java | 3 +- .../io/autoinvestor/domain/model/Inbox.java | 25 +++--- .../io/autoinvestor/domain/model/InboxId.java | 1 - .../autoinvestor/domain/model/InboxState.java | 34 ++++---- .../io/autoinvestor/domain/model/UserId.java | 1 - .../event_publishers/EventMessageMapper.java | 24 +++--- .../InMemoryEventPublisher.java | 12 ++- .../PubsubEventPublisher.java | 33 +++++--- .../PubsubDecisionsEventSubscriber.java | 77 +++++++++++------- .../infrastructure/listeners/PubsubEvent.java | 7 +- .../listeners/PubsubEventMapper.java | 6 +- .../PubsubPortfolioAssetEventSubscriber.java | 73 ++++++++++------- .../listeners/PubsubUsersEventSubscriber.java | 68 ++++++++++------ .../read_models/alerts/DecisionDocument.java | 8 +- .../read_models/alerts/DecisionMapper.java | 16 +--- .../InMemoryAlertsReadModelRepository.java | 30 +++++-- .../MongoAlertsReadModelRepository.java | 16 ++-- .../read_models/users/DecisionDocument.java | 13 +-- .../InMemoryInboxReadModelRepository.java | 9 +-- .../users/MongoInboxReadModelRepository.java | 5 +- .../event_store/EventDocument.java | 50 ++++++------ .../repositories/event_store/EventMapper.java | 15 ++-- .../InMemoryEventStoreRepository.java | 16 ++-- .../MongoEventStoreRepository.java | 35 ++++---- .../InMemoryPortfolioRepository.java | 5 +- .../portfolio/MongoPortfolioRepository.java | 35 ++++---- .../portfolio/PortfolioDocument.java | 6 +- .../repositories/portfolio/UserDocument.java | 4 +- .../autoinvestor/ui/GetAlertsController.java | 30 ++++--- .../java/io/autoinvestor/ui/GetAlertsDTO.java | 3 +- 56 files changed, 504 insertions(+), 471 deletions(-) diff --git a/HELP.md b/HELP.md index f876abc..bdbf568 100644 --- a/HELP.md +++ b/HELP.md @@ -11,4 +11,3 @@ For further reference, please consider the following sections: These additional references should also help you: * [Gradle Build Scans – insights for your project's build](https://scans.gradle.com#gradle) - diff --git a/build.gradle b/build.gradle index 2e88f93..db7692c 100644 --- a/build.gradle +++ b/build.gradle @@ -1,49 +1,69 @@ plugins { - id 'java' - id 'org.springframework.boot' version '3.4.4' - id 'io.spring.dependency-management' version '1.1.7' - id 'io.freefair.lombok' version '8.13.1' + id 'java' + id 'org.springframework.boot' version '3.4.4' + id 'io.spring.dependency-management' version '1.1.7' + id 'io.freefair.lombok' version '8.13.1' + id 'com.diffplug.spotless' version '7.0.4' } group = 'io.autoinvestor' java { - toolchain { - languageVersion = JavaLanguageVersion.of(21) - } + toolchain { + languageVersion = JavaLanguageVersion.of(21) + } } repositories { - mavenCentral() + mavenCentral() } dependencies { - implementation 'org.springframework.boot:spring-boot-starter-web' - testImplementation 'org.springframework.boot:spring-boot-starter-test' - testRuntimeOnly 'org.junit.platform:junit-platform-launcher' - - implementation 'com.google.cloud:google-cloud-pubsub:1.123.0' - implementation "com.google.cloud:spring-cloud-gcp-starter-pubsub:6.1.1" - implementation 'com.fasterxml.jackson.core:jackson-databind' - implementation 'org.springframework.integration:spring-integration-core' - implementation 'org.springframework.boot:spring-boot-starter-data-mongodb' - - testImplementation 'org.springframework.boot:spring-boot-testcontainers' - testImplementation 'org.testcontainers:testcontainers' - testImplementation 'org.testcontainers:junit-jupiter' - testImplementation 'org.testcontainers:gcloud' - - compileOnly 'org.projectlombok:lombok:1.18.38' - annotationProcessor 'org.projectlombok:lombok:1.18.38' - - testCompileOnly 'org.projectlombok:lombok:1.18.38' - testAnnotationProcessor 'org.projectlombok:lombok:1.18.38' + implementation 'org.springframework.boot:spring-boot-starter-web' + testImplementation 'org.springframework.boot:spring-boot-starter-test' + testRuntimeOnly 'org.junit.platform:junit-platform-launcher' + + implementation 'com.google.cloud:google-cloud-pubsub:1.123.0' + implementation "com.google.cloud:spring-cloud-gcp-starter-pubsub:6.1.1" + implementation 'com.fasterxml.jackson.core:jackson-databind' + implementation 'org.springframework.integration:spring-integration-core' + implementation 'org.springframework.boot:spring-boot-starter-data-mongodb' + + testImplementation 'org.springframework.boot:spring-boot-testcontainers' + testImplementation 'org.testcontainers:testcontainers' + testImplementation 'org.testcontainers:junit-jupiter' + testImplementation 'org.testcontainers:gcloud' + + compileOnly 'org.projectlombok:lombok:1.18.38' + annotationProcessor 'org.projectlombok:lombok:1.18.38' + + testCompileOnly 'org.projectlombok:lombok:1.18.38' + testAnnotationProcessor 'org.projectlombok:lombok:1.18.38' } tasks.named('test') { - useJUnitPlatform() + useJUnitPlatform() } bootBuildImage { - publish = false + publish = false +} + +spotless { + java { + googleJavaFormat('1.22.0').aosp() + removeUnusedImports() + trimTrailingWhitespace() + leadingTabsToSpaces() + endWithNewline() + importOrder '', 'java', 'javax', 'org', 'com' + target 'src/**/*.java' + } + + format 'misc', { + target '*.gradle', '*.md', '.gitignore' + leadingTabsToSpaces() + trimTrailingWhitespace() + endWithNewline() + } } diff --git a/src/main/java/io/autoinvestor/application/AlertDTO.java b/src/main/java/io/autoinvestor/application/AlertDTO.java index 35d6c7e..dd7eb76 100644 --- a/src/main/java/io/autoinvestor/application/AlertDTO.java +++ b/src/main/java/io/autoinvestor/application/AlertDTO.java @@ -2,9 +2,4 @@ import java.util.Date; -public record AlertDTO( - String userId, - String assetId, - String type, - Date date -) {} +public record AlertDTO(String userId, String assetId, String type, Date date) {} diff --git a/src/main/java/io/autoinvestor/application/AlertsReadModelRepository.java b/src/main/java/io/autoinvestor/application/AlertsReadModelRepository.java index 2dcc95a..874779c 100644 --- a/src/main/java/io/autoinvestor/application/AlertsReadModelRepository.java +++ b/src/main/java/io/autoinvestor/application/AlertsReadModelRepository.java @@ -4,5 +4,6 @@ public interface AlertsReadModelRepository { void save(AlertDTO alertDTO); + List get(String userId); } diff --git a/src/main/java/io/autoinvestor/application/EmitAlertsCommand.java b/src/main/java/io/autoinvestor/application/EmitAlertsCommand.java index e324d5b..d7d16e7 100644 --- a/src/main/java/io/autoinvestor/application/EmitAlertsCommand.java +++ b/src/main/java/io/autoinvestor/application/EmitAlertsCommand.java @@ -1,8 +1,3 @@ package io.autoinvestor.application; - -public record EmitAlertsCommand( - String assetId, - String decision, - int riskLevel -) { } +public record EmitAlertsCommand(String assetId, String decision, int riskLevel) {} diff --git a/src/main/java/io/autoinvestor/application/EmitAlertsCommandHandler.java b/src/main/java/io/autoinvestor/application/EmitAlertsCommandHandler.java index 170f39a..ee52cb2 100644 --- a/src/main/java/io/autoinvestor/application/EmitAlertsCommandHandler.java +++ b/src/main/java/io/autoinvestor/application/EmitAlertsCommandHandler.java @@ -10,11 +10,12 @@ import io.autoinvestor.domain.model.UserId; import io.autoinvestor.exceptions.InternalErrorException; import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Service; import java.util.List; import java.util.Optional; +import org.springframework.stereotype.Service; + @Slf4j @Service public class EmitAlertsCommandHandler { @@ -25,9 +26,12 @@ public class EmitAlertsCommandHandler { private final InboxReadModelRepository inboxReadModel; private final EventPublisher eventPublisher; - public EmitAlertsCommandHandler(EventStoreRepository eventStore, PortfolioRepository portfolioRepository, - AlertsReadModelRepository alertsReadModel, InboxReadModelRepository inboxReadModel, - EventPublisher eventPublisher) { + public EmitAlertsCommandHandler( + EventStoreRepository eventStore, + PortfolioRepository portfolioRepository, + AlertsReadModelRepository alertsReadModel, + InboxReadModelRepository inboxReadModel, + EventPublisher eventPublisher) { this.eventStore = eventStore; this.portfolioRepository = portfolioRepository; this.alertsReadModel = alertsReadModel; @@ -36,7 +40,9 @@ public EmitAlertsCommandHandler(EventStoreRepository eventStore, PortfolioReposi } public void handle(EmitAlertsCommand command) { - List usersId = this.portfolioRepository.getUsersIdByAssetAndRiskLevel(command.assetId(), command.riskLevel()); + List usersId = + this.portfolioRepository.getUsersIdByAssetAndRiskLevel( + command.assetId(), command.riskLevel()); for (UserId userId : usersId) { Optional inboxIdOpt = this.inboxReadModel.getInboxId(userId); @@ -60,15 +66,21 @@ public void handle(EmitAlertsCommand command) { this.eventStore.save(inbox); - Alert alert = inbox.getState().getLastAlert() - .orElseThrow(() -> new InternalErrorException("No alert found after emitting one for userId " + userId.value())); + Alert alert = + inbox.getState() + .getLastAlert() + .orElseThrow( + () -> + new InternalErrorException( + "No alert found after emitting one for userId " + + userId.value())); - AlertDTO alertDTO = new AlertDTO( - userId.value(), - command.assetId(), - alert.decision().name(), - alert.date() - ); + AlertDTO alertDTO = + new AlertDTO( + userId.value(), + command.assetId(), + alert.decision().name(), + alert.date()); this.alertsReadModel.save(alertDTO); this.eventPublisher.publish(events); diff --git a/src/main/java/io/autoinvestor/application/GetAlertsQueryHandler.java b/src/main/java/io/autoinvestor/application/GetAlertsQueryHandler.java index efacd61..5d0e8c5 100644 --- a/src/main/java/io/autoinvestor/application/GetAlertsQueryHandler.java +++ b/src/main/java/io/autoinvestor/application/GetAlertsQueryHandler.java @@ -1,10 +1,10 @@ package io.autoinvestor.application; -import org.springframework.stereotype.Service; - import java.util.List; import java.util.stream.Collectors; +import org.springframework.stereotype.Service; + @Service public class GetAlertsQueryHandler { @@ -18,11 +18,7 @@ public List handle(GetDecisionsQuery query) { List decisions = this.readModel.get(query.userId()); return decisions.stream() - .map(d -> new GetAlertsQueryResponse( - d.assetId(), - d.type(), - d.date() - )) + .map(d -> new GetAlertsQueryResponse(d.assetId(), d.type(), d.date())) .collect(Collectors.toList()); } } diff --git a/src/main/java/io/autoinvestor/application/GetAlertsQueryResponse.java b/src/main/java/io/autoinvestor/application/GetAlertsQueryResponse.java index bb5dbff..f91e27f 100644 --- a/src/main/java/io/autoinvestor/application/GetAlertsQueryResponse.java +++ b/src/main/java/io/autoinvestor/application/GetAlertsQueryResponse.java @@ -2,8 +2,4 @@ import java.util.Date; -public record GetAlertsQueryResponse( - String assetId, - String type, - Date date -) { } \ No newline at end of file +public record GetAlertsQueryResponse(String assetId, String type, Date date) {} diff --git a/src/main/java/io/autoinvestor/application/GetDecisionsQuery.java b/src/main/java/io/autoinvestor/application/GetDecisionsQuery.java index 5958289..03006ba 100644 --- a/src/main/java/io/autoinvestor/application/GetDecisionsQuery.java +++ b/src/main/java/io/autoinvestor/application/GetDecisionsQuery.java @@ -1,3 +1,3 @@ package io.autoinvestor.application; -public record GetDecisionsQuery(String userId) { } +public record GetDecisionsQuery(String userId) {} diff --git a/src/main/java/io/autoinvestor/application/InboxReadModelRepository.java b/src/main/java/io/autoinvestor/application/InboxReadModelRepository.java index 01508c7..bee76ce 100644 --- a/src/main/java/io/autoinvestor/application/InboxReadModelRepository.java +++ b/src/main/java/io/autoinvestor/application/InboxReadModelRepository.java @@ -7,5 +7,6 @@ public interface InboxReadModelRepository { void save(UserId userId, InboxId inboxId); + Optional getInboxId(UserId userId); } diff --git a/src/main/java/io/autoinvestor/application/RegisterPortfolioAssetCommand.java b/src/main/java/io/autoinvestor/application/RegisterPortfolioAssetCommand.java index 4a50678..80f055a 100644 --- a/src/main/java/io/autoinvestor/application/RegisterPortfolioAssetCommand.java +++ b/src/main/java/io/autoinvestor/application/RegisterPortfolioAssetCommand.java @@ -1,4 +1,3 @@ package io.autoinvestor.application; -public record RegisterPortfolioAssetCommand(String userId, String assetId) { -} +public record RegisterPortfolioAssetCommand(String userId, String assetId) {} diff --git a/src/main/java/io/autoinvestor/application/RegisterPortfolioAssetCommandHandler.java b/src/main/java/io/autoinvestor/application/RegisterPortfolioAssetCommandHandler.java index 8ca0609..475d43e 100644 --- a/src/main/java/io/autoinvestor/application/RegisterPortfolioAssetCommandHandler.java +++ b/src/main/java/io/autoinvestor/application/RegisterPortfolioAssetCommandHandler.java @@ -7,13 +7,12 @@ import io.autoinvestor.domain.model.Inbox; import io.autoinvestor.domain.model.InboxId; import io.autoinvestor.domain.model.UserId; -import io.autoinvestor.exceptions.InternalErrorException; import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Service; import java.util.List; import java.util.Optional; +import org.springframework.stereotype.Service; @Slf4j @Service @@ -24,8 +23,11 @@ public class RegisterPortfolioAssetCommandHandler { private final InboxReadModelRepository inboxReadModel; private final EventPublisher eventPublisher; - public RegisterPortfolioAssetCommandHandler(EventStoreRepository eventStore, PortfolioRepository portfolioRepository, - InboxReadModelRepository inboxReadModel, EventPublisher eventPublisher) { + public RegisterPortfolioAssetCommandHandler( + EventStoreRepository eventStore, + PortfolioRepository portfolioRepository, + InboxReadModelRepository inboxReadModel, + EventPublisher eventPublisher) { this.eventStore = eventStore; this.portfolioRepository = portfolioRepository; this.inboxReadModel = inboxReadModel; @@ -34,7 +36,8 @@ public RegisterPortfolioAssetCommandHandler(EventStoreRepository eventStore, Por public void handle(RegisterPortfolioAssetCommand command) { if (this.portfolioRepository.existsPortfolioAsset(command.userId(), command.assetId())) { - log.info("Asset {} already registered for user {}", command.assetId(), command.userId()); + log.info( + "Asset {} already registered for user {}", command.assetId(), command.userId()); return; } diff --git a/src/main/java/io/autoinvestor/application/RegisterUserCommand.java b/src/main/java/io/autoinvestor/application/RegisterUserCommand.java index d9c100b..7dc384f 100644 --- a/src/main/java/io/autoinvestor/application/RegisterUserCommand.java +++ b/src/main/java/io/autoinvestor/application/RegisterUserCommand.java @@ -1,4 +1,3 @@ package io.autoinvestor.application; -public record RegisterUserCommand(String userId, int riskLevel) { -} +public record RegisterUserCommand(String userId, int riskLevel) {} diff --git a/src/main/java/io/autoinvestor/application/RegisterUserCommandHandler.java b/src/main/java/io/autoinvestor/application/RegisterUserCommandHandler.java index 386ff95..2b79f42 100644 --- a/src/main/java/io/autoinvestor/application/RegisterUserCommandHandler.java +++ b/src/main/java/io/autoinvestor/application/RegisterUserCommandHandler.java @@ -7,10 +7,10 @@ import io.autoinvestor.domain.model.Inbox; import io.autoinvestor.domain.model.UserId; import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Service; import java.util.List; +import org.springframework.stereotype.Service; @Slf4j @Service @@ -21,8 +21,11 @@ public class RegisterUserCommandHandler { private final InboxReadModelRepository inboxReadModel; private final EventPublisher eventPublisher; - public RegisterUserCommandHandler(EventStoreRepository eventStore, PortfolioRepository portfolioRepository, - InboxReadModelRepository inboxReadModel, EventPublisher eventPublisher) { + public RegisterUserCommandHandler( + EventStoreRepository eventStore, + PortfolioRepository portfolioRepository, + InboxReadModelRepository inboxReadModel, + EventPublisher eventPublisher) { this.eventStore = eventStore; this.portfolioRepository = portfolioRepository; this.inboxReadModel = inboxReadModel; diff --git a/src/main/java/io/autoinvestor/domain/Id.java b/src/main/java/io/autoinvestor/domain/Id.java index 7e541e8..4af1ee3 100644 --- a/src/main/java/io/autoinvestor/domain/Id.java +++ b/src/main/java/io/autoinvestor/domain/Id.java @@ -20,10 +20,8 @@ protected static String generateId() { @Override public boolean equals(Object o) { - if (this == o) - return true; - if (!(o instanceof Id that)) - return false; + if (this == o) return true; + if (!(o instanceof Id that)) return false; return id.equals(that.id); } diff --git a/src/main/java/io/autoinvestor/domain/PortfolioRepository.java b/src/main/java/io/autoinvestor/domain/PortfolioRepository.java index cf89ec0..30acca6 100644 --- a/src/main/java/io/autoinvestor/domain/PortfolioRepository.java +++ b/src/main/java/io/autoinvestor/domain/PortfolioRepository.java @@ -6,7 +6,10 @@ public interface PortfolioRepository { List getUsersIdByAssetAndRiskLevel(String assetId, int riskLevel); + void addUser(String userId, int riskLevel); + void addPortfolioAsset(String userId, String assetId); + boolean existsPortfolioAsset(String userId, String assetId); } diff --git a/src/main/java/io/autoinvestor/domain/events/AlertEmittedEvent.java b/src/main/java/io/autoinvestor/domain/events/AlertEmittedEvent.java index 069f181..1e8df7f 100644 --- a/src/main/java/io/autoinvestor/domain/events/AlertEmittedEvent.java +++ b/src/main/java/io/autoinvestor/domain/events/AlertEmittedEvent.java @@ -2,13 +2,12 @@ import io.autoinvestor.domain.Id; import io.autoinvestor.domain.model.AssetId; +import io.autoinvestor.domain.model.Decision; import io.autoinvestor.domain.model.InboxId; import io.autoinvestor.domain.model.UserId; -import io.autoinvestor.domain.model.Decision; import java.util.Date; - public class AlertEmittedEvent extends Event { public static final String TYPE = "ALERT_EMITTED_EVENT"; @@ -17,28 +16,28 @@ private AlertEmittedEvent(Id aggregateId, AlertEmittedEventPayload payload) { super(aggregateId, TYPE, payload); } - protected AlertEmittedEvent(EventId id, - Id aggregateId, - AlertEmittedEventPayload payload, - Date occurredAt, - int version) { + protected AlertEmittedEvent( + EventId id, + Id aggregateId, + AlertEmittedEventPayload payload, + Date occurredAt, + int version) { super(id, aggregateId, TYPE, payload, occurredAt, version); } - public static AlertEmittedEvent with(InboxId inboxId, UserId userId, AssetId assetId, Decision decision) { - AlertEmittedEventPayload payload = new AlertEmittedEventPayload( - userId.value(), - assetId.value(), - decision.name() - ); + public static AlertEmittedEvent with( + InboxId inboxId, UserId userId, AssetId assetId, Decision decision) { + AlertEmittedEventPayload payload = + new AlertEmittedEventPayload(userId.value(), assetId.value(), decision.name()); return new AlertEmittedEvent(inboxId, payload); } - public static AlertEmittedEvent hydrate(EventId id, - Id aggregateId, - AlertEmittedEventPayload payload, - Date occurredAt, - int version) { + public static AlertEmittedEvent hydrate( + EventId id, + Id aggregateId, + AlertEmittedEventPayload payload, + Date occurredAt, + int version) { return new AlertEmittedEvent(id, aggregateId, payload, occurredAt, version); } } diff --git a/src/main/java/io/autoinvestor/domain/events/AlertEmittedEventPayload.java b/src/main/java/io/autoinvestor/domain/events/AlertEmittedEventPayload.java index 2f6e6e4..1abb2ac 100644 --- a/src/main/java/io/autoinvestor/domain/events/AlertEmittedEventPayload.java +++ b/src/main/java/io/autoinvestor/domain/events/AlertEmittedEventPayload.java @@ -2,18 +2,14 @@ import java.util.Map; -public record AlertEmittedEventPayload( - String userId, - String assetId, - String decision -) implements EventPayload { +public record AlertEmittedEventPayload(String userId, String assetId, String decision) + implements EventPayload { @Override public Map asMap() { return Map.of( "userId", userId, "assetId", assetId, - "decision", decision - ); + "decision", decision); } } diff --git a/src/main/java/io/autoinvestor/domain/events/Event.java b/src/main/java/io/autoinvestor/domain/events/Event.java index 940207c..fe6725e 100644 --- a/src/main/java/io/autoinvestor/domain/events/Event.java +++ b/src/main/java/io/autoinvestor/domain/events/Event.java @@ -27,7 +27,8 @@ protected Event(Id aggregateId, String type, P payload, int version) { this.version = version; } - protected Event(EventId id, Id aggregateId, String type, P payload, Date occurredAt, int version) { + protected Event( + EventId id, Id aggregateId, String type, P payload, Date occurredAt, int version) { this.id = id; this.aggregateId = aggregateId; this.type = type; diff --git a/src/main/java/io/autoinvestor/domain/events/EventId.java b/src/main/java/io/autoinvestor/domain/events/EventId.java index 04eb658..1b08665 100644 --- a/src/main/java/io/autoinvestor/domain/events/EventId.java +++ b/src/main/java/io/autoinvestor/domain/events/EventId.java @@ -1,6 +1,5 @@ package io.autoinvestor.domain.events; - import io.autoinvestor.domain.Id; public class EventId extends Id { diff --git a/src/main/java/io/autoinvestor/domain/events/EventSourcedEntity.java b/src/main/java/io/autoinvestor/domain/events/EventSourcedEntity.java index e9b67b0..911eb3b 100644 --- a/src/main/java/io/autoinvestor/domain/events/EventSourcedEntity.java +++ b/src/main/java/io/autoinvestor/domain/events/EventSourcedEntity.java @@ -37,4 +37,3 @@ public void markEventsAsCommitted() { appliedEvents.clear(); } } - diff --git a/src/main/java/io/autoinvestor/domain/events/EventStoreRepository.java b/src/main/java/io/autoinvestor/domain/events/EventStoreRepository.java index d9bc041..233c26e 100644 --- a/src/main/java/io/autoinvestor/domain/events/EventStoreRepository.java +++ b/src/main/java/io/autoinvestor/domain/events/EventStoreRepository.java @@ -5,9 +5,10 @@ import java.util.Optional; - public interface EventStoreRepository { boolean exists(InboxId inboxId); + Optional get(InboxId inboxId); + void save(Inbox inbox); } diff --git a/src/main/java/io/autoinvestor/domain/events/InboxCreatedEvent.java b/src/main/java/io/autoinvestor/domain/events/InboxCreatedEvent.java index 8a5698a..4d17d88 100644 --- a/src/main/java/io/autoinvestor/domain/events/InboxCreatedEvent.java +++ b/src/main/java/io/autoinvestor/domain/events/InboxCreatedEvent.java @@ -6,7 +6,6 @@ import java.util.Date; - public class InboxCreatedEvent extends Event { public static final String TYPE = "INBOX_CREATED"; @@ -15,27 +14,26 @@ private InboxCreatedEvent(Id aggregateId, InboxCreatedEventPayload payload) { super(aggregateId, TYPE, payload); } - protected InboxCreatedEvent(EventId id, - Id aggregateId, - InboxCreatedEventPayload payload, - Date occurredAt, - int version) { + protected InboxCreatedEvent( + EventId id, + Id aggregateId, + InboxCreatedEventPayload payload, + Date occurredAt, + int version) { super(id, aggregateId, TYPE, payload, occurredAt, version); } public static InboxCreatedEvent with(InboxId inboxId, UserId userId, int riskLevel) { - InboxCreatedEventPayload payload = new InboxCreatedEventPayload( - userId.value(), - riskLevel - ); + InboxCreatedEventPayload payload = new InboxCreatedEventPayload(userId.value(), riskLevel); return new InboxCreatedEvent(inboxId, payload); } - public static InboxCreatedEvent hydrate(EventId id, - Id aggregateId, - InboxCreatedEventPayload payload, - Date occurredAt, - int version) { + public static InboxCreatedEvent hydrate( + EventId id, + Id aggregateId, + InboxCreatedEventPayload payload, + Date occurredAt, + int version) { return new InboxCreatedEvent(id, aggregateId, payload, occurredAt, version); } } diff --git a/src/main/java/io/autoinvestor/domain/events/InboxCreatedEventPayload.java b/src/main/java/io/autoinvestor/domain/events/InboxCreatedEventPayload.java index 1e32655..fbd497f 100644 --- a/src/main/java/io/autoinvestor/domain/events/InboxCreatedEventPayload.java +++ b/src/main/java/io/autoinvestor/domain/events/InboxCreatedEventPayload.java @@ -2,15 +2,12 @@ import java.util.Map; -public record InboxCreatedEventPayload( - String userId, int riskLevel -) implements EventPayload { +public record InboxCreatedEventPayload(String userId, int riskLevel) implements EventPayload { @Override public Map asMap() { return Map.of( "userId", userId, - "riskLevel", riskLevel - ); + "riskLevel", riskLevel); } } diff --git a/src/main/java/io/autoinvestor/domain/events/SubscriptionCreatedEvent.java b/src/main/java/io/autoinvestor/domain/events/SubscriptionCreatedEvent.java index b5b7eb5..059eef2 100644 --- a/src/main/java/io/autoinvestor/domain/events/SubscriptionCreatedEvent.java +++ b/src/main/java/io/autoinvestor/domain/events/SubscriptionCreatedEvent.java @@ -3,11 +3,9 @@ import io.autoinvestor.domain.Id; import io.autoinvestor.domain.model.AssetId; import io.autoinvestor.domain.model.InboxId; -import io.autoinvestor.domain.model.UserId; import java.util.Date; - public class SubscriptionCreatedEvent extends Event { public static final String TYPE = "SUBSCRIPTION_CREATED"; @@ -16,26 +14,27 @@ private SubscriptionCreatedEvent(Id aggregateId, SubscriptionCreatedEventPayload super(aggregateId, TYPE, payload); } - protected SubscriptionCreatedEvent(EventId id, - Id aggregateId, - SubscriptionCreatedEventPayload payload, - Date occurredAt, - int version) { + protected SubscriptionCreatedEvent( + EventId id, + Id aggregateId, + SubscriptionCreatedEventPayload payload, + Date occurredAt, + int version) { super(id, aggregateId, TYPE, payload, occurredAt, version); } public static SubscriptionCreatedEvent with(InboxId inboxId, AssetId assetId) { - SubscriptionCreatedEventPayload payload = new SubscriptionCreatedEventPayload( - assetId.value() - ); + SubscriptionCreatedEventPayload payload = + new SubscriptionCreatedEventPayload(assetId.value()); return new SubscriptionCreatedEvent(inboxId, payload); } - public static SubscriptionCreatedEvent hydrate(EventId id, - Id aggregateId, - SubscriptionCreatedEventPayload payload, - Date occurredAt, - int version) { + public static SubscriptionCreatedEvent hydrate( + EventId id, + Id aggregateId, + SubscriptionCreatedEventPayload payload, + Date occurredAt, + int version) { return new SubscriptionCreatedEvent(id, aggregateId, payload, occurredAt, version); } } diff --git a/src/main/java/io/autoinvestor/domain/events/SubscriptionCreatedEventPayload.java b/src/main/java/io/autoinvestor/domain/events/SubscriptionCreatedEventPayload.java index fbd0b83..30156d8 100644 --- a/src/main/java/io/autoinvestor/domain/events/SubscriptionCreatedEventPayload.java +++ b/src/main/java/io/autoinvestor/domain/events/SubscriptionCreatedEventPayload.java @@ -2,14 +2,10 @@ import java.util.Map; -public record SubscriptionCreatedEventPayload( - String assetId -) implements EventPayload { +public record SubscriptionCreatedEventPayload(String assetId) implements EventPayload { @Override public Map asMap() { - return Map.of( - "assetId", assetId - ); + return Map.of("assetId", assetId); } } diff --git a/src/main/java/io/autoinvestor/domain/model/Alert.java b/src/main/java/io/autoinvestor/domain/model/Alert.java index adaf33b..ac93e59 100644 --- a/src/main/java/io/autoinvestor/domain/model/Alert.java +++ b/src/main/java/io/autoinvestor/domain/model/Alert.java @@ -2,5 +2,4 @@ import java.util.Date; -public record Alert(AssetId assetId, Decision decision, Date date) { -} +public record Alert(AssetId assetId, Decision decision, Date date) {} diff --git a/src/main/java/io/autoinvestor/domain/model/Inbox.java b/src/main/java/io/autoinvestor/domain/model/Inbox.java index 385261d..2c690d3 100644 --- a/src/main/java/io/autoinvestor/domain/model/Inbox.java +++ b/src/main/java/io/autoinvestor/domain/model/Inbox.java @@ -29,29 +29,24 @@ public static Inbox from(List> stream) { public static Inbox create(String userId, int riskLevel) { Inbox inbox = Inbox.empty(); - inbox.apply(InboxCreatedEvent.with( - inbox.getState().getInboxId(), - UserId.from(userId), - riskLevel - )); + inbox.apply( + InboxCreatedEvent.with( + inbox.getState().getInboxId(), UserId.from(userId), riskLevel)); return inbox; } public void addPortfolioAsset(String assetId) { - this.apply(SubscriptionCreatedEvent.with( - this.state.getInboxId(), - AssetId.of(assetId) - )); + this.apply(SubscriptionCreatedEvent.with(this.state.getInboxId(), AssetId.of(assetId))); } public void emitAlert(String assetId, String decision) { - this.apply(AlertEmittedEvent.with( - this.state.getInboxId(), - this.state.getUserId(), - AssetId.of(assetId), - Decision.from(decision) - )); + this.apply( + AlertEmittedEvent.with( + this.state.getInboxId(), + this.state.getUserId(), + AssetId.of(assetId), + Decision.from(decision))); } @Override diff --git a/src/main/java/io/autoinvestor/domain/model/InboxId.java b/src/main/java/io/autoinvestor/domain/model/InboxId.java index 5f3a12f..2750261 100644 --- a/src/main/java/io/autoinvestor/domain/model/InboxId.java +++ b/src/main/java/io/autoinvestor/domain/model/InboxId.java @@ -2,7 +2,6 @@ import io.autoinvestor.domain.Id; - public class InboxId extends Id { InboxId(String id) { super(id); diff --git a/src/main/java/io/autoinvestor/domain/model/InboxState.java b/src/main/java/io/autoinvestor/domain/model/InboxState.java index 21d19f5..f06f060 100644 --- a/src/main/java/io/autoinvestor/domain/model/InboxState.java +++ b/src/main/java/io/autoinvestor/domain/model/InboxState.java @@ -15,7 +15,12 @@ public class InboxState { private final Set portfolioAssets; private final List alerts; - private InboxState(InboxId inboxId, UserId userId, int riskLevel, Set portfolioAssets, List alerts) { + private InboxState( + InboxId inboxId, + UserId userId, + int riskLevel, + Set portfolioAssets, + List alerts) { this.inboxId = inboxId; this.userId = userId; this.riskLevel = riskLevel; @@ -37,8 +42,7 @@ public InboxState withInboxCreated(InboxCreatedEvent event) { UserId.from(event.getPayload().userId()), event.getPayload().riskLevel(), Set.of(), - List.of() - ); + List.of()); } public InboxState withSubscriptionCreated(SubscriptionCreatedEvent event) { @@ -47,29 +51,19 @@ public InboxState withSubscriptionCreated(SubscriptionCreatedEvent event) { updatedAssets.add(AssetId.of(assetId)); return new InboxState( - this.inboxId, - this.userId, - this.riskLevel, - updatedAssets, - this.alerts - ); + this.inboxId, this.userId, this.riskLevel, updatedAssets, this.alerts); } public InboxState withAlertEmitted(AlertEmittedEvent event) { - Alert alert = new Alert( - AssetId.of(event.getPayload().assetId()), - Decision.from(event.getPayload().decision()), - event.getOccurredAt() - ); + Alert alert = + new Alert( + AssetId.of(event.getPayload().assetId()), + Decision.from(event.getPayload().decision()), + event.getOccurredAt()); List updatedAlerts = new ArrayList<>(this.alerts); updatedAlerts.add(alert); return new InboxState( - this.inboxId, - this.userId, - this.riskLevel, - this.portfolioAssets, - updatedAlerts - ); + this.inboxId, this.userId, this.riskLevel, this.portfolioAssets, updatedAlerts); } } diff --git a/src/main/java/io/autoinvestor/domain/model/UserId.java b/src/main/java/io/autoinvestor/domain/model/UserId.java index 49cfe0f..b8d922d 100644 --- a/src/main/java/io/autoinvestor/domain/model/UserId.java +++ b/src/main/java/io/autoinvestor/domain/model/UserId.java @@ -2,7 +2,6 @@ import io.autoinvestor.domain.Id; - public class UserId extends Id { UserId(String id) { super(id); diff --git a/src/main/java/io/autoinvestor/infrastructure/event_publishers/EventMessageMapper.java b/src/main/java/io/autoinvestor/infrastructure/event_publishers/EventMessageMapper.java index 5f2c762..a1a81d2 100644 --- a/src/main/java/io/autoinvestor/infrastructure/event_publishers/EventMessageMapper.java +++ b/src/main/java/io/autoinvestor/infrastructure/event_publishers/EventMessageMapper.java @@ -1,9 +1,5 @@ package io.autoinvestor.infrastructure.event_publishers; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.protobuf.ByteString; -import com.google.pubsub.v1.PubsubMessage; import io.autoinvestor.domain.events.Event; import io.autoinvestor.exceptions.InternalErrorException; @@ -11,6 +7,10 @@ import java.util.HashMap; import java.util.Map; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; final class EventMessageMapper { @@ -23,18 +23,16 @@ final class EventMessageMapper { PubsubMessage toMessage(Event event) { try { Map envelope = new HashMap<>(); - envelope.put("payload", event.getPayload().asMap()); - envelope.put("eventId", event.getId().value()); - envelope.put("type", event.getType()); + envelope.put("payload", event.getPayload().asMap()); + envelope.put("eventId", event.getId().value()); + envelope.put("type", event.getType()); envelope.put("aggregateId", event.getAggregateId().value()); - envelope.put("occurredAt", - Instant.ofEpochMilli(event.getOccurredAt().getTime()).toString()); - envelope.put("version", event.getVersion()); + envelope.put( + "occurredAt", Instant.ofEpochMilli(event.getOccurredAt().getTime()).toString()); + envelope.put("version", event.getVersion()); String json = objectMapper.writeValueAsString(envelope); - return PubsubMessage.newBuilder() - .setData(ByteString.copyFromUtf8(json)) - .build(); + return PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(json)).build(); } catch (JsonProcessingException ex) { throw new InternalErrorException("Failed to serialise domain event"); diff --git a/src/main/java/io/autoinvestor/infrastructure/event_publishers/InMemoryEventPublisher.java b/src/main/java/io/autoinvestor/infrastructure/event_publishers/InMemoryEventPublisher.java index e29f9ea..f2e4760 100644 --- a/src/main/java/io/autoinvestor/infrastructure/event_publishers/InMemoryEventPublisher.java +++ b/src/main/java/io/autoinvestor/infrastructure/event_publishers/InMemoryEventPublisher.java @@ -2,13 +2,14 @@ import io.autoinvestor.domain.events.Event; import io.autoinvestor.domain.events.EventPublisher; -import org.springframework.context.ApplicationEventPublisher; -import org.springframework.context.annotation.Profile; -import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.List; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.annotation.Profile; +import org.springframework.stereotype.Component; + @Component @Profile("local") public class InMemoryEventPublisher implements EventPublisher { @@ -28,6 +29,9 @@ public void publish(List> events) { public boolean hasPublishedEvent(String type, String aggregateId) { return publishedEvents.stream() - .anyMatch(event -> event.getType().equals(type) && event.getAggregateId().value().equals(aggregateId)); + .anyMatch( + event -> + event.getType().equals(type) + && event.getAggregateId().value().equals(aggregateId)); } } diff --git a/src/main/java/io/autoinvestor/infrastructure/event_publishers/PubsubEventPublisher.java b/src/main/java/io/autoinvestor/infrastructure/event_publishers/PubsubEventPublisher.java index 488a965..8c3a585 100644 --- a/src/main/java/io/autoinvestor/infrastructure/event_publishers/PubsubEventPublisher.java +++ b/src/main/java/io/autoinvestor/infrastructure/event_publishers/PubsubEventPublisher.java @@ -1,18 +1,20 @@ package io.autoinvestor.infrastructure.event_publishers; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.cloud.pubsub.v1.Publisher; -import com.google.pubsub.v1.ProjectTopicName; import io.autoinvestor.domain.events.Event; import io.autoinvestor.domain.events.EventPublisher; import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; + +import java.util.List; +import java.util.concurrent.TimeUnit; + import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Profile; import org.springframework.stereotype.Component; -import java.util.List; -import java.util.concurrent.TimeUnit; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.pubsub.v1.ProjectTopicName; @Slf4j @Component @@ -25,8 +27,8 @@ public class PubsubEventPublisher implements EventPublisher { public PubsubEventPublisher( @Value("${GCP_PROJECT}") String projectId, @Value("${PUBSUB_TOPIC}") String topic, - ObjectMapper objectMapper - ) throws Exception { + ObjectMapper objectMapper) + throws Exception { this.mapper = new EventMessageMapper(objectMapper); ProjectTopicName topicName = ProjectTopicName.of(projectId, topic); this.publisher = Publisher.newBuilder(topicName).build(); @@ -45,12 +47,17 @@ public void publish(List> events) { events.stream() .map(mapper::toMessage) - .forEach(msg -> { - publisher.publish(msg).addListener( - () -> log.debug("Published msgId={}", msg.getMessageId()), - Runnable::run - ); - }); + .forEach( + msg -> { + publisher + .publish(msg) + .addListener( + () -> + log.debug( + "Published msgId={}", + msg.getMessageId()), + Runnable::run); + }); } @PreDestroy diff --git a/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubDecisionsEventSubscriber.java b/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubDecisionsEventSubscriber.java index bdd94d6..64f48bd 100644 --- a/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubDecisionsEventSubscriber.java +++ b/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubDecisionsEventSubscriber.java @@ -1,24 +1,26 @@ package io.autoinvestor.infrastructure.listeners; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.api.core.ApiService.Listener; -import com.google.api.core.ApiService.State; -import com.google.cloud.pubsub.v1.AckReplyConsumer; -import com.google.cloud.pubsub.v1.MessageReceiver; -import com.google.cloud.pubsub.v1.Subscriber; -import com.google.pubsub.v1.ProjectSubscriptionName; -import com.google.pubsub.v1.PubsubMessage; import io.autoinvestor.application.EmitAlertsCommand; import io.autoinvestor.application.EmitAlertsCommandHandler; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; + +import java.util.Map; + import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Profile; import org.springframework.stereotype.Component; -import java.util.Map; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.core.ApiService.Listener; +import com.google.api.core.ApiService.State; +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; @Slf4j @Component @@ -37,8 +39,8 @@ public PubsubDecisionsEventSubscriber( PubsubEventMapper eventMapper, @Value("${GCP_PROJECT}") String projectId, @Value("${PUBSUB_SUBSCRIPTION_DECISION_MAKING}") String subscriptionId) { - this.commandHandler = commandHandler; - this.eventMapper = eventMapper; + this.commandHandler = commandHandler; + this.eventMapper = eventMapper; this.subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId); } @@ -49,11 +51,18 @@ public void listen() { MessageReceiver receiver = this::processMessage; this.subscriber = Subscriber.newBuilder(subscriptionName, receiver).build(); - this.subscriber.addListener(new Listener() { - @Override public void failed(State from, Throwable failure) { - log.error("Subscriber failed from state {}: {}", from, failure.toString(), failure); - } - }, Runnable::run); + this.subscriber.addListener( + new Listener() { + @Override + public void failed(State from, Throwable failure) { + log.error( + "Subscriber failed from state {}: {}", + from, + failure.toString(), + failure); + } + }, + Runnable::run); this.subscriber.startAsync().awaitRunning(); log.info("Subscriber running"); } @@ -71,34 +80,46 @@ private void processMessage(PubsubMessage message, AckReplyConsumer consumer) { log.debug("Received message msgId={} size={}B", msgId, message.getData().size()); try { - Map raw = objectMapper.readValue(message.getData().toByteArray(), new TypeReference<>() {}); + Map raw = + objectMapper.readValue( + message.getData().toByteArray(), new TypeReference<>() {}); PubsubEvent event = eventMapper.fromMap(raw); log.info("Processing event type={} msgId={}", event.getType(), msgId); if ("ASSET_DECISION_TAKEN".equals(event.getType())) { if (!event.getPayload().containsKey("assetId")) { - log.warn("Event payload missing 'assetId' field, ignoring event msgId={}", msgId); + log.warn( + "Event payload missing 'assetId' field, ignoring event msgId={}", + msgId); consumer.nack(); return; } if (!event.getPayload().containsKey("decision")) { - log.warn("Event payload missing 'decision' field, ignoring event msgId={}", msgId); + log.warn( + "Event payload missing 'decision' field, ignoring event msgId={}", + msgId); consumer.nack(); return; } if (!event.getPayload().containsKey("riskLevel")) { - log.warn("Event payload missing 'riskLevel' field, ignoring event msgId={}", msgId); + log.warn( + "Event payload missing 'riskLevel' field, ignoring event msgId={}", + msgId); consumer.nack(); return; } - EmitAlertsCommand cmd = new EmitAlertsCommand( - (String) event.getPayload().get("assetId"), - (String) event.getPayload().get("decision"), - (int) event.getPayload().get("riskLevel") - ); + EmitAlertsCommand cmd = + new EmitAlertsCommand( + (String) event.getPayload().get("assetId"), + (String) event.getPayload().get("decision"), + (int) event.getPayload().get("riskLevel")); this.commandHandler.handle(cmd); - log.info("Decision registered for asset={} decision={} riskLevel={} msgId={}", - cmd.assetId(), cmd.decision(), cmd.riskLevel(), msgId); + log.info( + "Decision registered for asset={} decision={} riskLevel={} msgId={}", + cmd.assetId(), + cmd.decision(), + cmd.riskLevel(), + msgId); } else { log.debug("Ignored unsupported event type={} msgId={}", event.getType(), msgId); } diff --git a/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubEvent.java b/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubEvent.java index 41e0dd1..b212c11 100644 --- a/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubEvent.java +++ b/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubEvent.java @@ -1,6 +1,5 @@ package io.autoinvestor.infrastructure.listeners; -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; @@ -8,9 +7,13 @@ import java.util.Map; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; @JsonIgnoreProperties(ignoreUnknown = true) -@Data @Builder @NoArgsConstructor @AllArgsConstructor +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor public class PubsubEvent { private String aggregateId; private String type; diff --git a/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubEventMapper.java b/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubEventMapper.java index 3c1384e..5b50853 100644 --- a/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubEventMapper.java +++ b/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubEventMapper.java @@ -1,12 +1,12 @@ package io.autoinvestor.infrastructure.listeners; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.PropertyNamingStrategies; import lombok.RequiredArgsConstructor; -import org.springframework.stereotype.Component; import java.util.Map; +import org.springframework.stereotype.Component; + +import com.fasterxml.jackson.databind.ObjectMapper; @Component @RequiredArgsConstructor diff --git a/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubPortfolioAssetEventSubscriber.java b/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubPortfolioAssetEventSubscriber.java index db54fdc..e82bddd 100644 --- a/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubPortfolioAssetEventSubscriber.java +++ b/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubPortfolioAssetEventSubscriber.java @@ -1,25 +1,26 @@ package io.autoinvestor.infrastructure.listeners; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.api.core.ApiService.Listener; -import com.google.api.core.ApiService.State; -import com.google.cloud.pubsub.v1.AckReplyConsumer; -import com.google.cloud.pubsub.v1.MessageReceiver; -import com.google.cloud.pubsub.v1.Subscriber; -import com.google.pubsub.v1.ProjectSubscriptionName; -import com.google.pubsub.v1.PubsubMessage; import io.autoinvestor.application.RegisterPortfolioAssetCommand; import io.autoinvestor.application.RegisterPortfolioAssetCommandHandler; -import io.autoinvestor.application.RegisterUserCommand; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; + +import java.util.Map; + import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Profile; import org.springframework.stereotype.Component; -import java.util.Map; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.core.ApiService.Listener; +import com.google.api.core.ApiService.State; +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; @Slf4j @Component @@ -38,8 +39,8 @@ public PubsubPortfolioAssetEventSubscriber( PubsubEventMapper eventMapper, @Value("${GCP_PROJECT}") String projectId, @Value("${PUBSUB_SUBSCRIPTION_PORTFOLIO}") String subscriptionId) { - this.commandHandler = commandHandler; - this.eventMapper = eventMapper; + this.commandHandler = commandHandler; + this.eventMapper = eventMapper; this.subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId); } @@ -50,11 +51,18 @@ public void listen() { MessageReceiver receiver = this::processMessage; this.subscriber = Subscriber.newBuilder(subscriptionName, receiver).build(); - this.subscriber.addListener(new Listener() { - @Override public void failed(State from, Throwable failure) { - log.error("Subscriber failed from state {}: {}", from, failure.toString(), failure); // ERROR - } - }, Runnable::run); + this.subscriber.addListener( + new Listener() { + @Override + public void failed(State from, Throwable failure) { + log.error( + "Subscriber failed from state {}: {}", + from, + failure.toString(), + failure); // ERROR + } + }, + Runnable::run); this.subscriber.startAsync().awaitRunning(); log.info("Subscriber running"); } @@ -72,25 +80,34 @@ private void processMessage(PubsubMessage message, AckReplyConsumer consumer) { log.debug("Received message msgId={} size={}B", msgId, message.getData().size()); try { - Map raw = objectMapper.readValue(message.getData().toByteArray(), new TypeReference<>() {}); + Map raw = + objectMapper.readValue( + message.getData().toByteArray(), new TypeReference<>() {}); PubsubEvent event = eventMapper.fromMap(raw); log.info("Processing event type={} msgId={}", event.getType(), msgId); if ("PORTFOLIO_ASSET_ADDED".equals(event.getType())) { - if (event.getAggregateId() == null || event.getPayload() == null || - !event.getPayload().containsKey("userId") || !event.getPayload().containsKey("assetId")) { - log.warn("Malformed event: Skipping PORTFOLIO_ASSET_ADDED event with missing fields msgId={}", msgId); + if (event.getAggregateId() == null + || event.getPayload() == null + || !event.getPayload().containsKey("userId") + || !event.getPayload().containsKey("assetId")) { + log.warn( + "Malformed event: Skipping PORTFOLIO_ASSET_ADDED event with missing fields msgId={}", + msgId); consumer.ack(); return; } - RegisterPortfolioAssetCommand cmd = new RegisterPortfolioAssetCommand( - (String) event.getPayload().get("userId"), - (String) event.getPayload().get("assetId") - ); + RegisterPortfolioAssetCommand cmd = + new RegisterPortfolioAssetCommand( + (String) event.getPayload().get("userId"), + (String) event.getPayload().get("assetId")); this.commandHandler.handle(cmd); - log.info("Decision registered for userId={} assetId={} msgId={}", - cmd.userId(), cmd.assetId(), msgId); + log.info( + "Decision registered for userId={} assetId={} msgId={}", + cmd.userId(), + cmd.assetId(), + msgId); } else { log.debug("Ignored unsupported event type={} msgId={}", event.getType(), msgId); } diff --git a/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubUsersEventSubscriber.java b/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubUsersEventSubscriber.java index 179e827..7cbc6bd 100644 --- a/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubUsersEventSubscriber.java +++ b/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubUsersEventSubscriber.java @@ -1,24 +1,26 @@ package io.autoinvestor.infrastructure.listeners; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.api.core.ApiService.Listener; -import com.google.api.core.ApiService.State; -import com.google.cloud.pubsub.v1.AckReplyConsumer; -import com.google.cloud.pubsub.v1.MessageReceiver; -import com.google.cloud.pubsub.v1.Subscriber; -import com.google.pubsub.v1.ProjectSubscriptionName; -import com.google.pubsub.v1.PubsubMessage; import io.autoinvestor.application.RegisterUserCommand; import io.autoinvestor.application.RegisterUserCommandHandler; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; + +import java.util.Map; + import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Profile; import org.springframework.stereotype.Component; -import java.util.Map; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.core.ApiService.Listener; +import com.google.api.core.ApiService.State; +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; @Slf4j @Component @@ -37,8 +39,8 @@ public PubsubUsersEventSubscriber( PubsubEventMapper eventMapper, @Value("${GCP_PROJECT}") String projectId, @Value("${PUBSUB_SUBSCRIPTION_USERS}") String subscriptionId) { - this.commandHandler = commandHandler; - this.eventMapper = eventMapper; + this.commandHandler = commandHandler; + this.eventMapper = eventMapper; this.subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId); } @@ -49,11 +51,18 @@ public void listen() { MessageReceiver receiver = this::processMessage; this.subscriber = Subscriber.newBuilder(subscriptionName, receiver).build(); - this.subscriber.addListener(new Listener() { - @Override public void failed(State from, Throwable failure) { - log.error("Subscriber failed from state {}: {}", from, failure.toString(), failure); // ERROR - } - }, Runnable::run); + this.subscriber.addListener( + new Listener() { + @Override + public void failed(State from, Throwable failure) { + log.error( + "Subscriber failed from state {}: {}", + from, + failure.toString(), + failure); // ERROR + } + }, + Runnable::run); this.subscriber.startAsync().awaitRunning(); log.info("Subscriber running"); } @@ -71,24 +80,31 @@ private void processMessage(PubsubMessage message, AckReplyConsumer consumer) { log.debug("Received message msgId={} size={}B", msgId, message.getData().size()); try { - Map raw = objectMapper.readValue(message.getData().toByteArray(), new TypeReference<>() {}); + Map raw = + objectMapper.readValue( + message.getData().toByteArray(), new TypeReference<>() {}); PubsubEvent event = eventMapper.fromMap(raw); log.info("Processing event type={} msgId={}", event.getType(), msgId); if ("USER_CREATED".equals(event.getType())) { - if (event.getAggregateId() == null || !event.getPayload().containsKey("riskLevel")) { - log.warn("Malformed event: Skipping USER_CREATED event with missing aggregateId or riskLevel msgId={}", msgId); + if (event.getAggregateId() == null + || !event.getPayload().containsKey("riskLevel")) { + log.warn( + "Malformed event: Skipping USER_CREATED event with missing aggregateId or riskLevel msgId={}", + msgId); consumer.ack(); return; } - RegisterUserCommand cmd = new RegisterUserCommand( - event.getAggregateId(), - (int) event.getPayload().get("riskLevel") - ); + RegisterUserCommand cmd = + new RegisterUserCommand( + event.getAggregateId(), (int) event.getPayload().get("riskLevel")); this.commandHandler.handle(cmd); - log.info("Decision registered for userId={} riskLevel={} msgId={}", - cmd.userId(), cmd.riskLevel(), msgId); + log.info( + "Decision registered for userId={} riskLevel={} msgId={}", + cmd.userId(), + cmd.riskLevel(), + msgId); } else { log.debug("Ignored unsupported event type={} msgId={}", event.getType(), msgId); } diff --git a/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/DecisionDocument.java b/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/DecisionDocument.java index a7eae2e..bacd072 100644 --- a/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/DecisionDocument.java +++ b/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/DecisionDocument.java @@ -2,13 +2,11 @@ import lombok.Getter; import lombok.Setter; -import org.springframework.data.annotation.Id; -import org.springframework.data.mongodb.core.index.CompoundIndex; -import org.springframework.data.mongodb.core.index.CompoundIndexes; -import org.springframework.data.mongodb.core.mapping.Document; import java.util.Date; +import org.springframework.data.mongodb.core.mapping.Document; + @Setter @Getter @Document(collection = "alerts") @@ -18,7 +16,7 @@ public class DecisionDocument { private String type; private Date date; - public DecisionDocument() { } + public DecisionDocument() {} public DecisionDocument(String userId, String assetId, String type, Date date) { this.userId = userId; diff --git a/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/DecisionMapper.java b/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/DecisionMapper.java index bfc4109..5e2e901 100644 --- a/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/DecisionMapper.java +++ b/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/DecisionMapper.java @@ -1,27 +1,17 @@ package io.autoinvestor.infrastructure.read_models.alerts; import io.autoinvestor.application.AlertDTO; -import org.springframework.stereotype.Component; +import org.springframework.stereotype.Component; @Component public class DecisionMapper { public DecisionDocument toDocument(AlertDTO dto) { - return new DecisionDocument( - dto.userId(), - dto.assetId(), - dto.type(), - dto.date() - ); + return new DecisionDocument(dto.userId(), dto.assetId(), dto.type(), dto.date()); } public AlertDTO toDTO(DecisionDocument doc) { - return new AlertDTO( - doc.getUserId(), - doc.getAssetId(), - doc.getType(), - doc.getDate() - ); + return new AlertDTO(doc.getUserId(), doc.getAssetId(), doc.getType(), doc.getDate()); } } diff --git a/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/InMemoryAlertsReadModelRepository.java b/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/InMemoryAlertsReadModelRepository.java index fd49908..5f435c6 100644 --- a/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/InMemoryAlertsReadModelRepository.java +++ b/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/InMemoryAlertsReadModelRepository.java @@ -2,24 +2,38 @@ import io.autoinvestor.application.AlertDTO; import io.autoinvestor.application.AlertsReadModelRepository; -import org.springframework.context.annotation.Profile; -import org.springframework.stereotype.Repository; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.stream.Collectors; +import org.springframework.context.annotation.Profile; +import org.springframework.stereotype.Repository; + @Repository @Profile("local") public class InMemoryAlertsReadModelRepository implements AlertsReadModelRepository { - private final List alerts = new ArrayList<>(List.of( - new AlertDTO("user1", "BTC", "SELL", new Date(System.currentTimeMillis() - 86_400_000L)), - new AlertDTO("user1", "ETH", "HOLD", new Date(System.currentTimeMillis() - 3_600_000L)), - new AlertDTO("user2", "AAPL","SELL", new Date()), - new AlertDTO("user3", "GOOG","BUY", new Date(System.currentTimeMillis() - 7_200_000L)) - )); + private final List alerts = + new ArrayList<>( + List.of( + new AlertDTO( + "user1", + "BTC", + "SELL", + new Date(System.currentTimeMillis() - 86_400_000L)), + new AlertDTO( + "user1", + "ETH", + "HOLD", + new Date(System.currentTimeMillis() - 3_600_000L)), + new AlertDTO("user2", "AAPL", "SELL", new Date()), + new AlertDTO( + "user3", + "GOOG", + "BUY", + new Date(System.currentTimeMillis() - 7_200_000L)))); @Override public void save(AlertDTO alertDTO) { diff --git a/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/MongoAlertsReadModelRepository.java b/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/MongoAlertsReadModelRepository.java index 5d3cecf..b9183d3 100644 --- a/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/MongoAlertsReadModelRepository.java +++ b/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/MongoAlertsReadModelRepository.java @@ -2,18 +2,15 @@ import io.autoinvestor.application.AlertDTO; import io.autoinvestor.application.AlertsReadModelRepository; + +import java.util.List; + import org.springframework.context.annotation.Profile; -import org.springframework.data.domain.Sort; import org.springframework.data.mongodb.core.MongoTemplate; -import org.springframework.data.mongodb.core.index.Index; -import org.springframework.data.mongodb.core.index.IndexOperations; import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Query; import org.springframework.stereotype.Repository; -import java.util.List; - - @Repository @Profile("prod") public class MongoAlertsReadModelRepository implements AlertsReadModelRepository { @@ -35,11 +32,8 @@ public void save(AlertDTO alertDTO) { @Override public List get(String userId) { - Query query = Query.query( - Criteria.where("userId").is(userId) - ); - return template.find(query, DecisionDocument.class, COLLECTION) - .stream() + Query query = Query.query(Criteria.where("userId").is(userId)); + return template.find(query, DecisionDocument.class, COLLECTION).stream() .map(mapper::toDTO) .toList(); } diff --git a/src/main/java/io/autoinvestor/infrastructure/read_models/users/DecisionDocument.java b/src/main/java/io/autoinvestor/infrastructure/read_models/users/DecisionDocument.java index b960622..7546d70 100644 --- a/src/main/java/io/autoinvestor/infrastructure/read_models/users/DecisionDocument.java +++ b/src/main/java/io/autoinvestor/infrastructure/read_models/users/DecisionDocument.java @@ -2,25 +2,20 @@ import lombok.Getter; import lombok.Setter; + import org.springframework.data.annotation.Id; -import org.springframework.data.mongodb.core.index.CompoundIndex; -import org.springframework.data.mongodb.core.index.CompoundIndexes; import org.springframework.data.mongodb.core.mapping.Document; -import java.util.Date; - @Setter @Getter @Document(collection = "inbox") public class DecisionDocument { - @Id - private String userId; + @Id private String userId; private String inboxId; - public DecisionDocument() { } + public DecisionDocument() {} - public DecisionDocument(String userId, - String inboxId) { + public DecisionDocument(String userId, String inboxId) { this.userId = userId; this.inboxId = inboxId; } diff --git a/src/main/java/io/autoinvestor/infrastructure/read_models/users/InMemoryInboxReadModelRepository.java b/src/main/java/io/autoinvestor/infrastructure/read_models/users/InMemoryInboxReadModelRepository.java index e67e80b..394aa72 100644 --- a/src/main/java/io/autoinvestor/infrastructure/read_models/users/InMemoryInboxReadModelRepository.java +++ b/src/main/java/io/autoinvestor/infrastructure/read_models/users/InMemoryInboxReadModelRepository.java @@ -3,13 +3,14 @@ import io.autoinvestor.application.InboxReadModelRepository; import io.autoinvestor.domain.model.InboxId; import io.autoinvestor.domain.model.UserId; -import org.springframework.context.annotation.Profile; -import org.springframework.stereotype.Repository; import java.util.HashMap; import java.util.Map; import java.util.Optional; +import org.springframework.context.annotation.Profile; +import org.springframework.stereotype.Repository; + @Repository @Profile("local") public class InMemoryInboxReadModelRepository implements InboxReadModelRepository { @@ -29,8 +30,6 @@ public void save(UserId userId, InboxId inboxId) { @Override public Optional getInboxId(UserId userId) { String raw = inbox.get(userId.value()); - return raw != null - ? Optional.of(InboxId.from(raw)) - : Optional.empty(); + return raw != null ? Optional.of(InboxId.from(raw)) : Optional.empty(); } } diff --git a/src/main/java/io/autoinvestor/infrastructure/read_models/users/MongoInboxReadModelRepository.java b/src/main/java/io/autoinvestor/infrastructure/read_models/users/MongoInboxReadModelRepository.java index aff4fde..979a564 100644 --- a/src/main/java/io/autoinvestor/infrastructure/read_models/users/MongoInboxReadModelRepository.java +++ b/src/main/java/io/autoinvestor/infrastructure/read_models/users/MongoInboxReadModelRepository.java @@ -3,12 +3,13 @@ import io.autoinvestor.application.InboxReadModelRepository; import io.autoinvestor.domain.model.InboxId; import io.autoinvestor.domain.model.UserId; + +import java.util.Optional; + import org.springframework.context.annotation.Profile; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.stereotype.Repository; -import java.util.Optional; - @Repository @Profile("prod") public class MongoInboxReadModelRepository implements InboxReadModelRepository { diff --git a/src/main/java/io/autoinvestor/infrastructure/repositories/event_store/EventDocument.java b/src/main/java/io/autoinvestor/infrastructure/repositories/event_store/EventDocument.java index 4f2af5d..5b7cd75 100644 --- a/src/main/java/io/autoinvestor/infrastructure/repositories/event_store/EventDocument.java +++ b/src/main/java/io/autoinvestor/infrastructure/repositories/event_store/EventDocument.java @@ -2,49 +2,45 @@ import lombok.Getter; import lombok.Setter; -import org.springframework.data.annotation.Id; -import org.springframework.data.mongodb.core.mapping.Document; -import org.springframework.data.mongodb.core.mapping.Field; import java.util.Date; import java.util.Map; +import org.springframework.data.annotation.Id; +import org.springframework.data.mongodb.core.mapping.Document; +import org.springframework.data.mongodb.core.mapping.Field; + @Getter @Setter @Document(collection = "events") public class EventDocument { - @Id - private String id; + @Id private String id; - @Field - private String aggregateId; + @Field private String aggregateId; - @Field - private String type; + @Field private String type; - @Field - private Map payload; + @Field private Map payload; - @Field - private Date occurredAt; + @Field private Date occurredAt; - @Field - private int version; + @Field private int version; - public EventDocument() { } + public EventDocument() {} - public EventDocument(String id, - String aggregateId, - String type, - Map payload, - Date occurredAt, - int version) { - this.id = id; + public EventDocument( + String id, + String aggregateId, + String type, + Map payload, + Date occurredAt, + int version) { + this.id = id; this.aggregateId = aggregateId; - this.type = type; - this.payload = payload; - this.occurredAt = occurredAt; - this.version = version; + this.type = type; + this.payload = payload; + this.occurredAt = occurredAt; + this.version = version; } } diff --git a/src/main/java/io/autoinvestor/infrastructure/repositories/event_store/EventMapper.java b/src/main/java/io/autoinvestor/infrastructure/repositories/event_store/EventMapper.java index d4a941f..6e2a13b 100644 --- a/src/main/java/io/autoinvestor/infrastructure/repositories/event_store/EventMapper.java +++ b/src/main/java/io/autoinvestor/infrastructure/repositories/event_store/EventMapper.java @@ -1,14 +1,16 @@ package io.autoinvestor.infrastructure.repositories.event_store; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; import io.autoinvestor.domain.events.*; import io.autoinvestor.domain.model.UserId; -import org.springframework.stereotype.Component; import java.util.Date; import java.util.Map; +import org.springframework.stereotype.Component; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; + @Component public class EventMapper { @@ -24,8 +26,7 @@ public

EventDocument toDocument(Event

evt) { evt.getType(), payloadMap, evt.getOccurredAt(), - evt.getVersion() - ); + evt.getVersion()); } public Event toDomain(EventDocument doc) { @@ -53,9 +54,7 @@ public Event toDomain(EventDocument doc) { return AlertEmittedEvent.hydrate(id, aggId, payload, occurred, version); } - default -> throw new IllegalArgumentException( - "Unknown event type: " + doc.getType() - ); + default -> throw new IllegalArgumentException("Unknown event type: " + doc.getType()); } } } diff --git a/src/main/java/io/autoinvestor/infrastructure/repositories/event_store/InMemoryEventStoreRepository.java b/src/main/java/io/autoinvestor/infrastructure/repositories/event_store/InMemoryEventStoreRepository.java index 3ef8cbd..0cdadb5 100644 --- a/src/main/java/io/autoinvestor/infrastructure/repositories/event_store/InMemoryEventStoreRepository.java +++ b/src/main/java/io/autoinvestor/infrastructure/repositories/event_store/InMemoryEventStoreRepository.java @@ -4,15 +4,16 @@ import io.autoinvestor.domain.events.EventStoreRepository; import io.autoinvestor.domain.model.Inbox; import io.autoinvestor.domain.model.InboxId; -import org.springframework.context.annotation.Profile; -import org.springframework.stereotype.Repository; +import java.util.Comparator; import java.util.List; import java.util.Optional; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.Comparator; import java.util.stream.Collectors; +import org.springframework.context.annotation.Profile; +import org.springframework.stereotype.Repository; + @Repository @Profile("local") public class InMemoryEventStoreRepository implements EventStoreRepository { @@ -26,10 +27,11 @@ public void save(Inbox inbox) { @Override public Optional get(InboxId inboxId) { - List> events = eventStore.stream() - .filter(e -> e.getAggregateId().value().equals(inboxId.value())) - .sorted(Comparator.comparingLong(Event::getVersion)) - .collect(Collectors.toList()); + List> events = + eventStore.stream() + .filter(e -> e.getAggregateId().value().equals(inboxId.value())) + .sorted(Comparator.comparingLong(Event::getVersion)) + .collect(Collectors.toList()); if (events.isEmpty()) { return Optional.empty(); diff --git a/src/main/java/io/autoinvestor/infrastructure/repositories/event_store/MongoEventStoreRepository.java b/src/main/java/io/autoinvestor/infrastructure/repositories/event_store/MongoEventStoreRepository.java index ef3c5d4..78c87b7 100644 --- a/src/main/java/io/autoinvestor/infrastructure/repositories/event_store/MongoEventStoreRepository.java +++ b/src/main/java/io/autoinvestor/infrastructure/repositories/event_store/MongoEventStoreRepository.java @@ -4,7 +4,11 @@ import io.autoinvestor.domain.events.EventStoreRepository; import io.autoinvestor.domain.model.Inbox; import io.autoinvestor.domain.model.InboxId; -import io.autoinvestor.domain.model.UserId; + +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + import org.springframework.context.annotation.Profile; import org.springframework.data.domain.Sort; import org.springframework.data.mongodb.core.MongoTemplate; @@ -12,10 +16,6 @@ import org.springframework.data.mongodb.core.query.Query; import org.springframework.stereotype.Repository; -import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; - @Repository @Profile("prod") public class MongoEventStoreRepository implements EventStoreRepository { @@ -31,20 +31,18 @@ public MongoEventStoreRepository(MongoTemplate template, EventMapper mapper) { @Override public void save(Inbox inbox) { - List docs = inbox.getUncommittedEvents() - .stream() - .map(mapper::toDocument) - .collect(Collectors.toList()); + List docs = + inbox.getUncommittedEvents().stream() + .map(mapper::toDocument) + .collect(Collectors.toList()); template.insertAll(docs); } @Override public Optional get(InboxId inboxId) { - Query q = Query.query( - Criteria.where("aggregateId") - .is(inboxId.value()) - ) - .with(Sort.by("version")); + Query q = + Query.query(Criteria.where("aggregateId").is(inboxId.value())) + .with(Sort.by("version")); List docs = template.find(q, EventDocument.class, COLLECTION); @@ -52,9 +50,7 @@ public Optional get(InboxId inboxId) { return Optional.empty(); } - List> events = docs.stream() - .map(mapper::toDomain) - .collect(Collectors.toList()); + List> events = docs.stream().map(mapper::toDomain).collect(Collectors.toList()); if (events.isEmpty()) { return Optional.empty(); @@ -65,10 +61,7 @@ public Optional get(InboxId inboxId) { @Override public boolean exists(InboxId inboxId) { - Query q = Query.query( - Criteria.where("aggregateId") - .is(inboxId.value()) - ); + Query q = Query.query(Criteria.where("aggregateId").is(inboxId.value())); return template.exists(q, EventDocument.class, COLLECTION); } diff --git a/src/main/java/io/autoinvestor/infrastructure/repositories/portfolio/InMemoryPortfolioRepository.java b/src/main/java/io/autoinvestor/infrastructure/repositories/portfolio/InMemoryPortfolioRepository.java index 0e159a9..3c4715c 100644 --- a/src/main/java/io/autoinvestor/infrastructure/repositories/portfolio/InMemoryPortfolioRepository.java +++ b/src/main/java/io/autoinvestor/infrastructure/repositories/portfolio/InMemoryPortfolioRepository.java @@ -2,13 +2,14 @@ import io.autoinvestor.domain.PortfolioRepository; import io.autoinvestor.domain.model.UserId; -import org.springframework.context.annotation.Profile; -import org.springframework.stereotype.Repository; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; +import org.springframework.context.annotation.Profile; +import org.springframework.stereotype.Repository; + @Repository @Profile("local") public class InMemoryPortfolioRepository implements PortfolioRepository { diff --git a/src/main/java/io/autoinvestor/infrastructure/repositories/portfolio/MongoPortfolioRepository.java b/src/main/java/io/autoinvestor/infrastructure/repositories/portfolio/MongoPortfolioRepository.java index 69d264b..ddf2107 100644 --- a/src/main/java/io/autoinvestor/infrastructure/repositories/portfolio/MongoPortfolioRepository.java +++ b/src/main/java/io/autoinvestor/infrastructure/repositories/portfolio/MongoPortfolioRepository.java @@ -1,21 +1,23 @@ package io.autoinvestor.infrastructure.repositories.portfolio; +import static org.springframework.data.mongodb.core.aggregation.Aggregation.*; + import io.autoinvestor.domain.PortfolioRepository; import io.autoinvestor.domain.model.UserId; import lombok.Getter; import lombok.Setter; + +import java.util.List; +import java.util.stream.Collectors; + import org.springframework.context.annotation.Profile; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.aggregation.Aggregation; +import org.springframework.data.mongodb.core.aggregation.AggregationResults; import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Query; -import org.springframework.data.mongodb.core.aggregation.AggregationResults; -import static org.springframework.data.mongodb.core.aggregation.Aggregation.*; import org.springframework.stereotype.Repository; -import java.util.List; -import java.util.stream.Collectors; - @Repository @Profile("prod") public class MongoPortfolioRepository implements PortfolioRepository { @@ -30,16 +32,13 @@ public MongoPortfolioRepository(MongoTemplate template) { @Override public List getUsersIdByAssetAndRiskLevel(String assetId, int riskLevel) { - Aggregation agg = newAggregation( - match(Criteria.where("assetId").is(assetId)), - lookup(USERS_COLLECTION, - "userId", - "userId", - "userDocs"), - unwind("userDocs"), - match(Criteria.where("userDocs.riskLevel").is(riskLevel)), - project("userId") - ); + Aggregation agg = + newAggregation( + match(Criteria.where("assetId").is(assetId)), + lookup(USERS_COLLECTION, "userId", "userId", "userDocs"), + unwind("userDocs"), + match(Criteria.where("userDocs.riskLevel").is(riskLevel)), + project("userId")); AggregationResults results = template.aggregate(agg, PORTFOLIO_COLLECTION, IdProjection.class); @@ -68,9 +67,7 @@ public void addPortfolioAsset(String userId, String assetId) { @Override public boolean existsPortfolioAsset(String userId, String assetId) { return template.exists( - Query.query(Criteria.where("userId").is(userId) - .and("assetId").is(assetId)), - PortfolioDocument.class - ); + Query.query(Criteria.where("userId").is(userId).and("assetId").is(assetId)), + PortfolioDocument.class); } } diff --git a/src/main/java/io/autoinvestor/infrastructure/repositories/portfolio/PortfolioDocument.java b/src/main/java/io/autoinvestor/infrastructure/repositories/portfolio/PortfolioDocument.java index 712b600..e1b1c9b 100644 --- a/src/main/java/io/autoinvestor/infrastructure/repositories/portfolio/PortfolioDocument.java +++ b/src/main/java/io/autoinvestor/infrastructure/repositories/portfolio/PortfolioDocument.java @@ -3,6 +3,7 @@ import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; + import org.springframework.data.annotation.Id; import org.springframework.data.mongodb.core.mapping.Document; @@ -11,8 +12,7 @@ @AllArgsConstructor @Document(collection = "portfolio") public class PortfolioDocument { - @Id - private String id; + @Id private String id; private String userId; private String assetId; -} \ No newline at end of file +} diff --git a/src/main/java/io/autoinvestor/infrastructure/repositories/portfolio/UserDocument.java b/src/main/java/io/autoinvestor/infrastructure/repositories/portfolio/UserDocument.java index 46fad71..15cda79 100644 --- a/src/main/java/io/autoinvestor/infrastructure/repositories/portfolio/UserDocument.java +++ b/src/main/java/io/autoinvestor/infrastructure/repositories/portfolio/UserDocument.java @@ -3,6 +3,7 @@ import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; + import org.springframework.data.annotation.Id; import org.springframework.data.mongodb.core.mapping.Document; @@ -11,8 +12,7 @@ @AllArgsConstructor @Document(collection = "users") public class UserDocument { - @Id - private String id; + @Id private String id; private String userId; private int riskLevel; } diff --git a/src/main/java/io/autoinvestor/ui/GetAlertsController.java b/src/main/java/io/autoinvestor/ui/GetAlertsController.java index 5559a65..2b2594e 100644 --- a/src/main/java/io/autoinvestor/ui/GetAlertsController.java +++ b/src/main/java/io/autoinvestor/ui/GetAlertsController.java @@ -1,13 +1,14 @@ package io.autoinvestor.ui; -import io.autoinvestor.application.GetDecisionsQuery; import io.autoinvestor.application.GetAlertsQueryHandler; import io.autoinvestor.application.GetAlertsQueryResponse; -import org.springframework.http.ResponseEntity; -import org.springframework.web.bind.annotation.*; +import io.autoinvestor.application.GetDecisionsQuery; import java.util.List; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.*; + @RestController @RequestMapping("/alerts") public class GetAlertsController { @@ -19,19 +20,16 @@ public GetAlertsController(GetAlertsQueryHandler handler) { } @GetMapping - public ResponseEntity> getAlerts(@RequestHeader(value = "X-User-Id") String userId) { - - List queryResponse = this.handler.handle( - new GetDecisionsQuery(userId) - ); - - List dto = queryResponse.stream() - .map(d -> new GetAlertsDTO( - d.assetId(), - d.type(), - d.date() - )) - .toList(); + public ResponseEntity> getAlerts( + @RequestHeader(value = "X-User-Id") String userId) { + + List queryResponse = + this.handler.handle(new GetDecisionsQuery(userId)); + + List dto = + queryResponse.stream() + .map(d -> new GetAlertsDTO(d.assetId(), d.type(), d.date())) + .toList(); return ResponseEntity.ok(dto); } diff --git a/src/main/java/io/autoinvestor/ui/GetAlertsDTO.java b/src/main/java/io/autoinvestor/ui/GetAlertsDTO.java index 41e66a9..12b0ae3 100644 --- a/src/main/java/io/autoinvestor/ui/GetAlertsDTO.java +++ b/src/main/java/io/autoinvestor/ui/GetAlertsDTO.java @@ -2,5 +2,4 @@ import java.util.Date; -public record GetAlertsDTO(String assetId, String type, Date date) { -} +public record GetAlertsDTO(String assetId, String type, Date date) {} From 511ab6561140f63aafa5062e88e1403475be8d38 Mon Sep 17 00:00:00 2001 From: Pol Pinol Castuera Date: Sat, 31 May 2025 10:55:21 +0200 Subject: [PATCH 3/3] Reduce duplicated code --- .../AbstractPubsubEventSubscriber.java | 95 +++++++++++++ .../PubsubDecisionsEventSubscriber.java | 128 ++++-------------- .../PubsubPortfolioAssetEventSubscriber.java | 117 ++++------------ .../listeners/PubsubUsersEventSubscriber.java | 107 ++++----------- 4 files changed, 179 insertions(+), 268 deletions(-) create mode 100644 src/main/java/io/autoinvestor/infrastructure/listeners/AbstractPubsubEventSubscriber.java diff --git a/src/main/java/io/autoinvestor/infrastructure/listeners/AbstractPubsubEventSubscriber.java b/src/main/java/io/autoinvestor/infrastructure/listeners/AbstractPubsubEventSubscriber.java new file mode 100644 index 0000000..ae2c286 --- /dev/null +++ b/src/main/java/io/autoinvestor/infrastructure/listeners/AbstractPubsubEventSubscriber.java @@ -0,0 +1,95 @@ +package io.autoinvestor.infrastructure.listeners; + +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import lombok.extern.slf4j.Slf4j; + +import java.util.Map; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.core.ApiService.Listener; +import com.google.api.core.ApiService.State; +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; + +@Slf4j +public abstract class AbstractPubsubEventSubscriber { + + private final PubsubEventMapper eventMapper; + private final ProjectSubscriptionName subscriptionName; + private final ObjectMapper objectMapper = new ObjectMapper(); + private Subscriber subscriber; + + protected AbstractPubsubEventSubscriber( + PubsubEventMapper eventMapper, String projectId, String subscriptionId) { + this.eventMapper = eventMapper; + this.subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId); + } + + @PostConstruct + public void listen() { + log.info("Starting Pub/Sub subscriber for {}", subscriptionName); + + MessageReceiver receiver = this::processMessage; + this.subscriber = Subscriber.newBuilder(subscriptionName, receiver).build(); + + this.subscriber.addListener( + new Listener() { + @Override + public void failed(State from, Throwable failure) { + log.error( + "Subscriber failed from state {}: {}", + from, + failure.toString(), + failure); + } + }, + Runnable::run); + + this.subscriber.startAsync().awaitRunning(); + log.info("Subscriber running"); + } + + @PreDestroy + public void stop() { + if (this.subscriber != null) { + log.info("Stopping subscriber..."); + this.subscriber.stopAsync(); + } + } + + private void processMessage(PubsubMessage message, AckReplyConsumer consumer) { + String msgId = message.getMessageId(); + log.debug("Received message msgId={} size={}B", msgId, message.getData().size()); + + try { + // 1) deserialize into Map + Map raw = + objectMapper.readValue( + message.getData().toByteArray(), new TypeReference<>() {}); + + // 2) convert into our domain‐level PubsubEvent + PubsubEvent event = eventMapper.fromMap(raw); + log.info("Processing event type={} msgId={}", event.getType(), msgId); + + // 3) only dispatch if it matches the one type this subscriber cares about + if (getEventType().equals(event.getType())) { + handleEvent(event, msgId, consumer); + } else { + log.debug("Ignored unsupported event type={} msgId={}", event.getType(), msgId); + consumer.ack(); + } + } catch (Exception ex) { + log.error("Failed to handle msgId={} — nacking", msgId, ex); + consumer.nack(); + } + } + + protected abstract String getEventType(); + + protected abstract void handleEvent(PubsubEvent event, String msgId, AckReplyConsumer consumer); +} diff --git a/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubDecisionsEventSubscriber.java b/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubDecisionsEventSubscriber.java index 64f48bd..3695ec5 100644 --- a/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubDecisionsEventSubscriber.java +++ b/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubDecisionsEventSubscriber.java @@ -2,8 +2,6 @@ import io.autoinvestor.application.EmitAlertsCommand; import io.autoinvestor.application.EmitAlertsCommandHandler; -import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; import java.util.Map; @@ -12,122 +10,56 @@ import org.springframework.context.annotation.Profile; import org.springframework.stereotype.Component; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.api.core.ApiService.Listener; -import com.google.api.core.ApiService.State; import com.google.cloud.pubsub.v1.AckReplyConsumer; -import com.google.cloud.pubsub.v1.MessageReceiver; -import com.google.cloud.pubsub.v1.Subscriber; -import com.google.pubsub.v1.ProjectSubscriptionName; -import com.google.pubsub.v1.PubsubMessage; @Slf4j @Component @Profile("prod") -public class PubsubDecisionsEventSubscriber { +public class PubsubDecisionsEventSubscriber extends AbstractPubsubEventSubscriber { private final EmitAlertsCommandHandler commandHandler; - private final PubsubEventMapper eventMapper; - private final ProjectSubscriptionName subscriptionName; - private final ObjectMapper objectMapper = new ObjectMapper(); - - private Subscriber subscriber; public PubsubDecisionsEventSubscriber( EmitAlertsCommandHandler commandHandler, PubsubEventMapper eventMapper, @Value("${GCP_PROJECT}") String projectId, @Value("${PUBSUB_SUBSCRIPTION_DECISION_MAKING}") String subscriptionId) { + super(eventMapper, projectId, subscriptionId); this.commandHandler = commandHandler; - this.eventMapper = eventMapper; - this.subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId); } - @PostConstruct - public void listen() { - log.info("Starting Pub/Sub subscriber for {}", subscriptionName); - - MessageReceiver receiver = this::processMessage; - - this.subscriber = Subscriber.newBuilder(subscriptionName, receiver).build(); - this.subscriber.addListener( - new Listener() { - @Override - public void failed(State from, Throwable failure) { - log.error( - "Subscriber failed from state {}: {}", - from, - failure.toString(), - failure); - } - }, - Runnable::run); - this.subscriber.startAsync().awaitRunning(); - log.info("Subscriber running"); + @Override + protected String getEventType() { + return "ASSET_DECISION_TAKEN"; } - @PreDestroy - public void stop() { - if (this.subscriber != null) { - log.info("Stopping subscriber..."); - this.subscriber.stopAsync(); - } - } - - private void processMessage(PubsubMessage message, AckReplyConsumer consumer) { - String msgId = message.getMessageId(); - log.debug("Received message msgId={} size={}B", msgId, message.getData().size()); - - try { - Map raw = - objectMapper.readValue( - message.getData().toByteArray(), new TypeReference<>() {}); - PubsubEvent event = eventMapper.fromMap(raw); - log.info("Processing event type={} msgId={}", event.getType(), msgId); - - if ("ASSET_DECISION_TAKEN".equals(event.getType())) { - if (!event.getPayload().containsKey("assetId")) { - log.warn( - "Event payload missing 'assetId' field, ignoring event msgId={}", - msgId); - consumer.nack(); - return; - } - if (!event.getPayload().containsKey("decision")) { - log.warn( - "Event payload missing 'decision' field, ignoring event msgId={}", - msgId); - consumer.nack(); - return; - } - if (!event.getPayload().containsKey("riskLevel")) { - log.warn( - "Event payload missing 'riskLevel' field, ignoring event msgId={}", - msgId); - consumer.nack(); - return; - } - EmitAlertsCommand cmd = - new EmitAlertsCommand( - (String) event.getPayload().get("assetId"), - (String) event.getPayload().get("decision"), - (int) event.getPayload().get("riskLevel")); - this.commandHandler.handle(cmd); - log.info( - "Decision registered for asset={} decision={} riskLevel={} msgId={}", - cmd.assetId(), - cmd.decision(), - cmd.riskLevel(), - msgId); - } else { - log.debug("Ignored unsupported event type={} msgId={}", event.getType(), msgId); - } + @Override + protected void handleEvent(PubsubEvent event, String msgId, AckReplyConsumer consumer) { + Map payload = event.getPayload(); + if (!payload.containsKey("assetId") + || !payload.containsKey("decision") + || !payload.containsKey("riskLevel")) { + log.warn( + "Malformed event: Event payload missing required fields (assetId, decision, riskLevel). Ignoring event msgId={}", + msgId); consumer.ack(); - } catch (Exception ex) { - log.error("Failed to handle msgId={} — nacking", msgId, ex); - consumer.nack(); + return; } + + EmitAlertsCommand cmd = + new EmitAlertsCommand( + (String) payload.get("assetId"), + (String) payload.get("decision"), + (int) payload.get("riskLevel")); + this.commandHandler.handle(cmd); + + log.info( + "Decision registered for asset={} decision={} riskLevel={} msgId={}", + cmd.assetId(), + cmd.decision(), + cmd.riskLevel(), + msgId); + consumer.ack(); } } diff --git a/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubPortfolioAssetEventSubscriber.java b/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubPortfolioAssetEventSubscriber.java index e82bddd..fea8255 100644 --- a/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubPortfolioAssetEventSubscriber.java +++ b/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubPortfolioAssetEventSubscriber.java @@ -2,8 +2,6 @@ import io.autoinvestor.application.RegisterPortfolioAssetCommand; import io.autoinvestor.application.RegisterPortfolioAssetCommandHandler; -import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; import java.util.Map; @@ -12,110 +10,55 @@ import org.springframework.context.annotation.Profile; import org.springframework.stereotype.Component; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.api.core.ApiService.Listener; -import com.google.api.core.ApiService.State; import com.google.cloud.pubsub.v1.AckReplyConsumer; -import com.google.cloud.pubsub.v1.MessageReceiver; -import com.google.cloud.pubsub.v1.Subscriber; -import com.google.pubsub.v1.ProjectSubscriptionName; -import com.google.pubsub.v1.PubsubMessage; @Slf4j @Component @Profile("prod") -public class PubsubPortfolioAssetEventSubscriber { +public class PubsubPortfolioAssetEventSubscriber extends AbstractPubsubEventSubscriber { private final RegisterPortfolioAssetCommandHandler commandHandler; - private final PubsubEventMapper eventMapper; - private final ProjectSubscriptionName subscriptionName; - private final ObjectMapper objectMapper = new ObjectMapper(); - - private Subscriber subscriber; public PubsubPortfolioAssetEventSubscriber( RegisterPortfolioAssetCommandHandler commandHandler, PubsubEventMapper eventMapper, @Value("${GCP_PROJECT}") String projectId, @Value("${PUBSUB_SUBSCRIPTION_PORTFOLIO}") String subscriptionId) { + super(eventMapper, projectId, subscriptionId); this.commandHandler = commandHandler; - this.eventMapper = eventMapper; - this.subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId); - } - - @PostConstruct - public void listen() { - log.info("Starting Pub/Sub subscriber for {}", subscriptionName); - - MessageReceiver receiver = this::processMessage; - - this.subscriber = Subscriber.newBuilder(subscriptionName, receiver).build(); - this.subscriber.addListener( - new Listener() { - @Override - public void failed(State from, Throwable failure) { - log.error( - "Subscriber failed from state {}: {}", - from, - failure.toString(), - failure); // ERROR - } - }, - Runnable::run); - this.subscriber.startAsync().awaitRunning(); - log.info("Subscriber running"); } - @PreDestroy - public void stop() { - if (this.subscriber != null) { - log.info("Stopping subscriber..."); - this.subscriber.stopAsync(); - } + @Override + protected String getEventType() { + return "PORTFOLIO_ASSET_ADDED"; } - private void processMessage(PubsubMessage message, AckReplyConsumer consumer) { - String msgId = message.getMessageId(); - log.debug("Received message msgId={} size={}B", msgId, message.getData().size()); - - try { - Map raw = - objectMapper.readValue( - message.getData().toByteArray(), new TypeReference<>() {}); - PubsubEvent event = eventMapper.fromMap(raw); - log.info("Processing event type={} msgId={}", event.getType(), msgId); - - if ("PORTFOLIO_ASSET_ADDED".equals(event.getType())) { - if (event.getAggregateId() == null - || event.getPayload() == null - || !event.getPayload().containsKey("userId") - || !event.getPayload().containsKey("assetId")) { - log.warn( - "Malformed event: Skipping PORTFOLIO_ASSET_ADDED event with missing fields msgId={}", - msgId); - consumer.ack(); - return; - } - - RegisterPortfolioAssetCommand cmd = - new RegisterPortfolioAssetCommand( - (String) event.getPayload().get("userId"), - (String) event.getPayload().get("assetId")); - this.commandHandler.handle(cmd); - log.info( - "Decision registered for userId={} assetId={} msgId={}", - cmd.userId(), - cmd.assetId(), - msgId); - } else { - log.debug("Ignored unsupported event type={} msgId={}", event.getType(), msgId); - } - + @Override + protected void handleEvent(PubsubEvent event, String msgId, AckReplyConsumer consumer) { + Map payload = event.getPayload(); + + if (event.getAggregateId() == null + || payload == null + || !payload.containsKey("userId") + || !payload.containsKey("assetId")) { + log.warn( + "Malformed event: Skipping PORTFOLIO_ASSET_ADDED " + + "event with missing fields msgId={}", + msgId); consumer.ack(); - } catch (Exception ex) { - log.error("Failed to handle msgId={} — nacking", msgId, ex); - consumer.nack(); + return; } + + RegisterPortfolioAssetCommand cmd = + new RegisterPortfolioAssetCommand( + (String) payload.get("userId"), (String) payload.get("assetId")); + this.commandHandler.handle(cmd); + + log.info( + "Portfolio asset registered for userId={} assetId={} msgId={}", + cmd.userId(), + cmd.assetId(), + msgId); + consumer.ack(); } } diff --git a/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubUsersEventSubscriber.java b/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubUsersEventSubscriber.java index 7cbc6bd..0b39557 100644 --- a/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubUsersEventSubscriber.java +++ b/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubUsersEventSubscriber.java @@ -2,8 +2,6 @@ import io.autoinvestor.application.RegisterUserCommand; import io.autoinvestor.application.RegisterUserCommandHandler; -import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; import java.util.Map; @@ -12,107 +10,50 @@ import org.springframework.context.annotation.Profile; import org.springframework.stereotype.Component; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.api.core.ApiService.Listener; -import com.google.api.core.ApiService.State; import com.google.cloud.pubsub.v1.AckReplyConsumer; -import com.google.cloud.pubsub.v1.MessageReceiver; -import com.google.cloud.pubsub.v1.Subscriber; -import com.google.pubsub.v1.ProjectSubscriptionName; -import com.google.pubsub.v1.PubsubMessage; @Slf4j @Component @Profile("prod") -public class PubsubUsersEventSubscriber { +public class PubsubUsersEventSubscriber extends AbstractPubsubEventSubscriber { private final RegisterUserCommandHandler commandHandler; - private final PubsubEventMapper eventMapper; - private final ProjectSubscriptionName subscriptionName; - private final ObjectMapper objectMapper = new ObjectMapper(); - - private Subscriber subscriber; public PubsubUsersEventSubscriber( RegisterUserCommandHandler commandHandler, PubsubEventMapper eventMapper, @Value("${GCP_PROJECT}") String projectId, @Value("${PUBSUB_SUBSCRIPTION_USERS}") String subscriptionId) { + super(eventMapper, projectId, subscriptionId); this.commandHandler = commandHandler; - this.eventMapper = eventMapper; - this.subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId); - } - - @PostConstruct - public void listen() { - log.info("Starting Pub/Sub subscriber for {}", subscriptionName); - - MessageReceiver receiver = this::processMessage; - - this.subscriber = Subscriber.newBuilder(subscriptionName, receiver).build(); - this.subscriber.addListener( - new Listener() { - @Override - public void failed(State from, Throwable failure) { - log.error( - "Subscriber failed from state {}: {}", - from, - failure.toString(), - failure); // ERROR - } - }, - Runnable::run); - this.subscriber.startAsync().awaitRunning(); - log.info("Subscriber running"); } - @PreDestroy - public void stop() { - if (this.subscriber != null) { - log.info("Stopping subscriber..."); - this.subscriber.stopAsync(); - } + @Override + protected String getEventType() { + return "USER_CREATED"; } - private void processMessage(PubsubMessage message, AckReplyConsumer consumer) { - String msgId = message.getMessageId(); - log.debug("Received message msgId={} size={}B", msgId, message.getData().size()); - - try { - Map raw = - objectMapper.readValue( - message.getData().toByteArray(), new TypeReference<>() {}); - PubsubEvent event = eventMapper.fromMap(raw); - log.info("Processing event type={} msgId={}", event.getType(), msgId); - - if ("USER_CREATED".equals(event.getType())) { - if (event.getAggregateId() == null - || !event.getPayload().containsKey("riskLevel")) { - log.warn( - "Malformed event: Skipping USER_CREATED event with missing aggregateId or riskLevel msgId={}", - msgId); - consumer.ack(); - return; - } - - RegisterUserCommand cmd = - new RegisterUserCommand( - event.getAggregateId(), (int) event.getPayload().get("riskLevel")); - this.commandHandler.handle(cmd); - log.info( - "Decision registered for userId={} riskLevel={} msgId={}", - cmd.userId(), - cmd.riskLevel(), - msgId); - } else { - log.debug("Ignored unsupported event type={} msgId={}", event.getType(), msgId); - } + @Override + protected void handleEvent(PubsubEvent event, String msgId, AckReplyConsumer consumer) { + Map payload = event.getPayload(); + if (event.getAggregateId() == null || !payload.containsKey("riskLevel")) { + log.warn( + "Malformed event: Skipping USER_CREATED event with missing aggregateId or riskLevel msgId={}", + msgId); consumer.ack(); - } catch (Exception ex) { - log.error("Failed to handle msgId={} — nacking", msgId, ex); - consumer.nack(); + return; } + + RegisterUserCommand cmd = + new RegisterUserCommand(event.getAggregateId(), (int) payload.get("riskLevel")); + this.commandHandler.handle(cmd); + + log.info( + "User registered userId={} riskLevel={} msgId={}", + cmd.userId(), + cmd.riskLevel(), + msgId); + consumer.ack(); } }