From a71a34709819bc67a1a728268b87f3b4e7da13cc Mon Sep 17 00:00:00 2001 From: Daniel Date: Wed, 31 Dec 2025 09:21:50 +1100 Subject: [PATCH 1/5] Add kafka integration for external events --- build.gradle.kts | 8 ++- docker-compose.yml | 30 ++++++++++ ...ner.java => ApplicationEventListener.java} | 10 +++- .../application/KafkaEventListener.java | 57 +++++++++++++++++++ .../application/UserEventListener.java | 20 ------- .../domain/event/OtpRequestedEvent.java | 4 +- .../domain/event/UserRegisteredEvent.java | 2 + src/main/resources/application-local.yml | 2 + src/main/resources/application.yml | 17 ++++++ .../integration/OAuth2IntegrationTest.java | 56 +++++++++--------- 10 files changed, 154 insertions(+), 52 deletions(-) rename src/main/java/org/nkcoder/notification/application/{OtpEventListener.java => ApplicationEventListener.java} (63%) create mode 100644 src/main/java/org/nkcoder/notification/application/KafkaEventListener.java delete mode 100644 src/main/java/org/nkcoder/notification/application/UserEventListener.java diff --git a/build.gradle.kts b/build.gradle.kts index a212468..962e125 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -42,6 +42,7 @@ dependencies { implementation("org.springdoc:springdoc-openapi-starter-webmvc-ui:3.0.0") implementation("org.springframework.modulith:spring-modulith-starter-core") implementation("org.springframework.modulith:spring-modulith-starter-jpa") + implementation("org.springframework.modulith:spring-modulith-events-kafka") // Database implementation("org.springframework.boot:spring-boot-starter-flyway") @@ -58,6 +59,9 @@ dependencies { annotationProcessor("org.springframework.boot:spring-boot-configuration-processor") developmentOnly("org.springframework.boot:spring-boot-docker-compose") + // Messaging + implementation("org.springframework.kafka:spring-kafka") + // Testing testImplementation("org.springframework.boot:spring-boot-starter-webmvc-test") testImplementation("org.springframework.boot:spring-boot-starter-webflux-test") // For WebTestClient @@ -67,7 +71,9 @@ dependencies { testImplementation("org.junit.jupiter:junit-jupiter:5.13.3") testImplementation("org.testcontainers:junit-jupiter") testImplementation("org.testcontainers:postgresql") + testImplementation("org.testcontainers:kafka") testImplementation("org.springframework.modulith:spring-modulith-starter-test") + testImplementation("org.springframework.kafka:spring-kafka-test") // gRPC and Protobuf implementation("io.grpc:grpc-netty-shaded:1.77.0") @@ -124,7 +130,7 @@ tasks.register("runLocal") { // JVM optimization for microservices tasks.named("bootRun") { jvmArgs = listOf( - "-Xms256m", "-Xmx512m", "-XX:+UseG1GC", "-XX:+UseStringDeduplication" + "-Xms512m", "-Xmx1024m", "-XX:+UseG1GC", "-XX:+UseStringDeduplication" ) // Pass environment variables at execution time (not configuration time) // This ensures .env variables sourced by auto/run are available diff --git a/docker-compose.yml b/docker-compose.yml index 05e50a4..31f6445 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -35,6 +35,36 @@ services: timeout: 5s retries: 5 + kafka: + image: apache/kafka:4.1.1 + container_name: user-service-kafka + hostname: kafka + ports: + - "29092:29092" + environment: + KAFKA_NODE_ID: 1 + KAFKA_PROCESS_ROLES: broker,controller + KAFKA_LISTENERS: PLAINTEXT://:29092,CONTROLLER://:9093 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092 + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_LOG_DIRS: /var/lib/kafka/data + volumes: + - kafka_data:/var/lib/kafka/data + healthcheck: + test: ["CMD-SHELL", "/opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server localhost:29092 > /dev/null 2>&1"] + interval: 10s + timeout: 10s + retries: 5 + start_period: 30s + volumes: postgres_data: name: user-service-local-postgres-data + kafka_data: + name: user-service-local-kafka-data diff --git a/src/main/java/org/nkcoder/notification/application/OtpEventListener.java b/src/main/java/org/nkcoder/notification/application/ApplicationEventListener.java similarity index 63% rename from src/main/java/org/nkcoder/notification/application/OtpEventListener.java rename to src/main/java/org/nkcoder/notification/application/ApplicationEventListener.java index 87c3601..66cf23d 100644 --- a/src/main/java/org/nkcoder/notification/application/OtpEventListener.java +++ b/src/main/java/org/nkcoder/notification/application/ApplicationEventListener.java @@ -2,15 +2,16 @@ import org.nkcoder.notification.NotificationService; import org.nkcoder.shared.kernel.domain.event.OtpRequestedEvent; +import org.nkcoder.shared.kernel.domain.event.UserRegisteredEvent; import org.springframework.modulith.events.ApplicationModuleListener; import org.springframework.stereotype.Component; @Component -public class OtpEventListener { +public class ApplicationEventListener { private final NotificationService notificationService; - public OtpEventListener(NotificationService notificationService) { + public ApplicationEventListener(NotificationService notificationService) { this.notificationService = notificationService; } @@ -18,4 +19,9 @@ public OtpEventListener(NotificationService notificationService) { public void onOtpRequested(OtpRequestedEvent event) { notificationService.sendOtpEmail(event.email(), event.userName(), event.otpCode(), event.expirationMinutes()); } + + @ApplicationModuleListener + public void onUserRegistered(UserRegisteredEvent event) { + notificationService.sendWelcomeEmail(event.email(), event.userName()); + } } diff --git a/src/main/java/org/nkcoder/notification/application/KafkaEventListener.java b/src/main/java/org/nkcoder/notification/application/KafkaEventListener.java new file mode 100644 index 0000000..44f3679 --- /dev/null +++ b/src/main/java/org/nkcoder/notification/application/KafkaEventListener.java @@ -0,0 +1,57 @@ +package org.nkcoder.notification.application; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import java.util.Base64; +import org.nkcoder.shared.kernel.domain.event.OtpRequestedEvent; +import org.nkcoder.shared.kernel.domain.event.UserRegisteredEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +@Component +public class KafkaEventListener { + + private static final Logger logger = LoggerFactory.getLogger(KafkaEventListener.class); + private final ObjectMapper objectMapper; + + public KafkaEventListener() { + this.objectMapper = new ObjectMapper(); + this.objectMapper.registerModule(new JavaTimeModule()); + } + + @KafkaListener(topics = "user.otp.requested", groupId = "notification-service") + public void onOtpRequested(String message) { + logger.info("Received OTP event from Kafka: {}", message); + + try { + String json = new String(Base64.getDecoder().decode(message)); + logger.debug("Decoded JSON: {}", json); + + OtpRequestedEvent event = objectMapper.readValue(json, OtpRequestedEvent.class); + logger.info( + "Processed OTP event - email: {}, userName: {}, otpCode: {}", + event.email(), + event.userName(), + event.otpCode()); + } catch (Exception e) { + logger.error("Failed to process OTP event: {}", e.getMessage(), e); + } + } + + @KafkaListener(topics = "user.registered", groupId = "notification-service") + public void onUserRegistered(String message) { + logger.info("Received UserRegistered event from Kafka: {}", message); + + try { + String json = new String(Base64.getDecoder().decode(message)); + logger.debug("Decoded JSON: {}", json); + + UserRegisteredEvent event = objectMapper.readValue(json, UserRegisteredEvent.class); + logger.info("Processed UserRegistered event - email: {}, userName: {}", event.email(), event.userName()); + } catch (Exception e) { + logger.error("Failed to process UserRegistered event: {}", e.getMessage(), e); + } + } +} diff --git a/src/main/java/org/nkcoder/notification/application/UserEventListener.java b/src/main/java/org/nkcoder/notification/application/UserEventListener.java deleted file mode 100644 index 046f8e4..0000000 --- a/src/main/java/org/nkcoder/notification/application/UserEventListener.java +++ /dev/null @@ -1,20 +0,0 @@ -package org.nkcoder.notification.application; - -import org.nkcoder.notification.NotificationService; -import org.nkcoder.shared.kernel.domain.event.UserRegisteredEvent; -import org.springframework.modulith.events.ApplicationModuleListener; -import org.springframework.stereotype.Component; - -@Component -public class UserEventListener { - private final NotificationService notificationService; - - public UserEventListener(NotificationService notificationService) { - this.notificationService = notificationService; - } - - @ApplicationModuleListener - public void onUserRegistered(UserRegisteredEvent event) { - notificationService.sendWelcomeEmail(event.email(), event.userName()); - } -} diff --git a/src/main/java/org/nkcoder/shared/kernel/domain/event/OtpRequestedEvent.java b/src/main/java/org/nkcoder/shared/kernel/domain/event/OtpRequestedEvent.java index 6044523..c1130ae 100644 --- a/src/main/java/org/nkcoder/shared/kernel/domain/event/OtpRequestedEvent.java +++ b/src/main/java/org/nkcoder/shared/kernel/domain/event/OtpRequestedEvent.java @@ -1,7 +1,9 @@ package org.nkcoder.shared.kernel.domain.event; import java.time.LocalDateTime; +import org.springframework.modulith.events.Externalized; +@Externalized("user.otp.requested") public record OtpRequestedEvent( String email, String userName, String otpCode, int expirationMinutes, LocalDateTime occurredOn) implements DomainEvent { @@ -12,7 +14,7 @@ public OtpRequestedEvent(String email, String userName, String otpCode, int expi @Override public String eventType() { - return "otp.requested"; + return "user.otp.requested"; } @Override diff --git a/src/main/java/org/nkcoder/shared/kernel/domain/event/UserRegisteredEvent.java b/src/main/java/org/nkcoder/shared/kernel/domain/event/UserRegisteredEvent.java index 0b88cf5..47b65eb 100644 --- a/src/main/java/org/nkcoder/shared/kernel/domain/event/UserRegisteredEvent.java +++ b/src/main/java/org/nkcoder/shared/kernel/domain/event/UserRegisteredEvent.java @@ -2,7 +2,9 @@ import java.time.LocalDateTime; import java.util.UUID; +import org.springframework.modulith.events.Externalized; +@Externalized("user.registered") public record UserRegisteredEvent(UUID userId, String email, String userName, LocalDateTime occurredOn) implements DomainEvent { public UserRegisteredEvent(UUID userId, String email, String userName) { diff --git a/src/main/resources/application-local.yml b/src/main/resources/application-local.yml index 477f1c6..a3c964c 100644 --- a/src/main/resources/application-local.yml +++ b/src/main/resources/application-local.yml @@ -12,6 +12,8 @@ # ============================================================================= spring: + kafka: + bootstrap-servers: localhost:29092 # Auto-start PostgreSQL via Docker Compose docker: compose: diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 95b8a2c..32bf4a0 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -25,6 +25,23 @@ spring: virtual: enabled: true + # Spring Modulith Event Externalization + modulith: + events: + externalization: + enabled: true + + # Kafka Configuration + kafka: + producer: + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.springframework.kafka.support.serializer.JsonSerializer + consumer: + group-id: notification-service + auto-offset-reset: earliest + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.apache.kafka.common.serialization.StringDeserializer + # Flyway migrations flyway: enabled: true diff --git a/src/test/java/org/nkcoder/user/integration/OAuth2IntegrationTest.java b/src/test/java/org/nkcoder/user/integration/OAuth2IntegrationTest.java index f319e5f..970ea87 100644 --- a/src/test/java/org/nkcoder/user/integration/OAuth2IntegrationTest.java +++ b/src/test/java/org/nkcoder/user/integration/OAuth2IntegrationTest.java @@ -63,13 +63,13 @@ void returnsOAuth2ConnectionsForUser() { .uri("/api/auth/register") .contentType(MediaType.APPLICATION_JSON) .bodyValue(""" - { - "email": "oauth2-test@example.com", - "password": "Password123", - "name": "OAuth2 Test User", - "role": "MEMBER" - } - """) + { + "email": "oauth2-test@example.com", + "password": "Password123", + "name": "OAuth2 Test User", + "role": "MEMBER" + } + """) .exchange() .expectStatus() .is2xxSuccessful() @@ -134,13 +134,13 @@ void unlinksProviderWhenUserHasPassword() { .uri("/api/auth/register") .contentType(MediaType.APPLICATION_JSON) .bodyValue(""" - { - "email": "oauth2-unlink@example.com", - "password": "Password123", - "name": "OAuth2 Unlink User", - "role": "MEMBER" - } - """) + { + "email": "oauth2-unlink@example.com", + "password": "Password123", + "name": "OAuth2 Unlink User", + "role": "MEMBER" + } + """) .exchange() .expectStatus() .is2xxSuccessful() @@ -247,13 +247,13 @@ void canLinkMultipleProviders() { .uri("/api/auth/register") .contentType(MediaType.APPLICATION_JSON) .bodyValue(""" - { - "email": "oauth2-multi@example.com", - "password": "Password123", - "name": "Multi Provider User", - "role": "MEMBER" - } - """) + { + "email": "oauth2-multi@example.com", + "password": "Password123", + "name": "Multi Provider User", + "role": "MEMBER" + } + """) .exchange() .expectStatus() .is2xxSuccessful() @@ -306,13 +306,13 @@ private String registerAndGetToken(String email, String password, String name) { .uri("/api/auth/register") .contentType(MediaType.APPLICATION_JSON) .bodyValue(""" - { - "email": "%s", - "password": "%s", - "name": "%s", - "role": "MEMBER" - } - """.formatted(email, password, name)) + { + "email": "%s", + "password": "%s", + "name": "%s", + "role": "MEMBER" + } + """.formatted(email, password, name)) .exchange() .expectStatus() .is2xxSuccessful() From b76593ae41a1d3b0fb7a1dca180b5bcef8c46019 Mon Sep 17 00:00:00 2001 From: Daniel Date: Wed, 31 Dec 2025 09:38:26 +1100 Subject: [PATCH 2/5] Add kafka to docker-compose-all --- docker-compose-all.yml | 62 ++++++++++++++++++++++++++++++++++++++---- docker-compose.yml | 14 ++++------ 2 files changed, 61 insertions(+), 15 deletions(-) diff --git a/docker-compose-all.yml b/docker-compose-all.yml index 4f5a3d0..fb4fdde 100644 --- a/docker-compose-all.yml +++ b/docker-compose-all.yml @@ -3,14 +3,15 @@ # ============================================================================= # Simulates dev/prod environment with all services running in containers. # -# Usage: -# Start: docker compose -f docker-compose-all.yml up -d -# Stop: docker compose -f docker-compose-all.yml down -# Logs: docker compose -f docker-compose-all.yml logs -f user-service -# Rebuild: docker compose -f docker-compose-all.yml up -d --build -# # For production, use external secrets management (Vault, AWS Secrets Manager) # instead of environment variables in this file. +# +# For container communications: App, kafka and PostgreSQL are all running inside Docker. +# Kafka has two listeners: 9092 (internal) and 29092 (external) +# Container app connects via `kafka:9092` (Docker network) +# Host debugging via `localhost:29092` +# KAFKA_LISTENERS: PLAINTEXT://:9092,EXTERNAL://:29092 +# KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,EXTERNAL://localhost:29092 # ============================================================================= services: @@ -28,6 +29,8 @@ services: depends_on: postgres: condition: service_healthy + kafka: + condition: service_healthy environment: # Profile: use 'dev' for development simulation, 'prod' for production - SPRING_PROFILES_ACTIVE=dev @@ -37,6 +40,9 @@ services: - DATABASE_USERNAME=app_user - DATABASE_PASSWORD=${DB_PASSWORD:-changeme_in_production} + # Kafka connection + - SPRING_KAFKA_BOOTSTRAP_SERVERS=kafka:9092 + # JWT secrets - MUST be overridden in production! # Generate with: openssl rand -base64 64 - JWT_ACCESS_SECRET=${JWT_ACCESS_SECRET:-dev-only-access-secret-key-must-be-at-least-64-bytes-long-for-hs512} @@ -65,6 +71,48 @@ services: networks: - app-network + # --------------------------------------------------------------------------- + # Apache Kafka + # --------------------------------------------------------------------------- + kafka: + image: apache/kafka:4.1.1 + container_name: user-application-kafka + hostname: kafka + ports: + - "29092:29092" # External port for debugging (remove in production) + environment: + KAFKA_NODE_ID: 1 + KAFKA_PROCESS_ROLES: broker,controller + KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:29092 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,EXTERNAL://localhost:29092 + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_LOG_DIRS: /var/lib/kafka/data + volumes: + - kafka_data:/var/lib/kafka/data + healthcheck: + test: ["CMD-SHELL", "/opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092 > /dev/null 2>&1"] + interval: 10s + timeout: 10s + retries: 5 + start_period: 30s + restart: unless-stopped + deploy: + resources: + limits: + cpus: '1' + memory: 1G + reservations: + cpus: '0.25' + memory: 512M + networks: + - app-network + # --------------------------------------------------------------------------- # PostgreSQL Database # --------------------------------------------------------------------------- @@ -102,6 +150,8 @@ services: volumes: postgres_data: name: user-service-postgres-data + kafka_data: + name: user-service-kafka-data networks: app-network: diff --git a/docker-compose.yml b/docker-compose.yml index 31f6445..6f805e1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -5,16 +5,12 @@ # spring.docker.compose.enabled=true (in application-local.yml) # # This file is auto-detected and managed by Spring Boot. -# The application runs on your host machine, only PostgreSQL runs in Docker. +# The application runs on your host machine, only PostgreSQL and Kafka runs in Docker. # -# Usage: -# ./gradlew bootRun --args='--spring.profiles.active=local' -# (Spring Boot automatically starts/stops this compose file) -# -# Manual control: -# Start: docker compose up -d -# Stop: docker compose down -# Reset: docker compose down -v (removes data volume) +# For Kafka: +# Kafka has one listener on port 29092, host app connects via `localhost:29092` +# AFKA_LISTENERS: PLAINTEXT://:29092 +# KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092 # ============================================================================= services: From 5a7086eeb169ebf2a1dc25bd142973111ed04146 Mon Sep 17 00:00:00 2001 From: Daniel Date: Wed, 31 Dec 2025 10:11:55 +1100 Subject: [PATCH 3/5] Fix the dev config --- auto/docker_logs | 2 +- docker-compose-all.yml | 14 ++++++++++++-- src/main/resources/application-dev.yml | 4 ++++ 3 files changed, 17 insertions(+), 3 deletions(-) diff --git a/auto/docker_logs b/auto/docker_logs index 5a913ae..ca5cab2 100755 --- a/auto/docker_logs +++ b/auto/docker_logs @@ -1,3 +1,3 @@ #!/usr/bin/env sh -docker compose -f docker-compose-all.yml logs -f --tail 100 +docker compose -f docker-compose-all.yml logs -f --tail 50 diff --git a/docker-compose-all.yml b/docker-compose-all.yml index fb4fdde..d4769c2 100644 --- a/docker-compose-all.yml +++ b/docker-compose-all.yml @@ -24,7 +24,7 @@ services: dockerfile: Dockerfile container_name: user-application ports: - - "3001:3001" # REST API + - "8080:8080" # REST API - "9090:9090" # gRPC API depends_on: postgres: @@ -53,8 +53,18 @@ services: # JVM options for container environment - JAVA_OPTS=-XX:+UseContainerSupport -XX:MaxRAMPercentage=75.0 + + # Mail + - MAIL_USERNAME=${MAIL_USERNAME} + - MAIL_PASSWORD=${MAIL_PASSWORD} + + # OAuth 2 + - GOOGLE_CLIENT_ID=${GOOGLE_CLIENT_ID} + - GOOGLE_CLIENT_SECRET=${GOOGLE_CLIENT_SECRET} + - GITHUB_CLIENT_ID=${GITHUB_CLIENT_ID} + - GITHUB_CLIENT_SECRET=${GITHUB_CLIENT_SECRET} healthcheck: - test: [ "CMD", "wget", "--no-verbose", "--tries=1", "--spider", "http://localhost:3001/actuator/health" ] + test: [ "CMD", "wget", "--no-verbose", "--tries=1", "--spider", "http://localhost:8080/actuator/health" ] interval: 30s timeout: 10s retries: 3 diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index d4bde96..bc91662 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -43,6 +43,10 @@ spring: reflection: enabled: true +app: + mail: + from: noreply@daniel-guo.com + # Enable Swagger UI for dev/staging springdoc: api-docs: From 93a8ac17bb93db3a0810bd4f49524b23f025f314 Mon Sep 17 00:00:00 2001 From: Daniel Date: Wed, 31 Dec 2025 11:08:33 +1100 Subject: [PATCH 4/5] Update docs about Kafka --- CLAUDE.md | 57 ++++++++++++++++++++++++++++++++++--------- README.md | 3 ++- docs/architecture.md | 22 +++++++++++++++-- docs/configuration.md | 14 ++++++++--- docs/development.md | 41 +++++++++++++++++++++++++------ 5 files changed, 110 insertions(+), 27 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 31ddbb5..731525a 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -20,6 +20,7 @@ role-based access control (ADMIN/MEMBER), and PostgreSQL persistence with Flyway - **Java 25** with virtual threads (Project Loom) - **Spring Boot 4.0** with Spring Framework 7.0 - **Spring Modulith 2.0** for modular architecture +- **Apache Kafka** for event externalization (Spring Modulith integration) - **PostgreSQL** with Flyway migrations - **gRPC** alongside REST APIs @@ -187,7 +188,8 @@ notification ──→ shared ←── user **Notification Module** (`org.nkcoder.notification`): - `NotificationService` - Public API for sending notifications -- `application/UserEventListener` - Listens to UserRegisteredEvent +- `application/ApplicationEventListener` - In-process listener for domain events (sends emails) +- `application/KafkaEventListener` - Kafka consumer for externalized events **Shared Module** (`org.nkcoder.shared`): @@ -329,34 +331,54 @@ PATCH /api/users/{userId}/password - Reset password (admin only) ### Event-Driven Communication -Modules communicate via domain events using Spring Modulith's event infrastructure: +Modules communicate via domain events using Spring Modulith's event infrastructure with **Kafka externalization**. + +**Event Externalization**: Domain events marked with `@Externalized` are automatically published to Kafka topics: + +| Event | Kafka Topic | Description | +|-------|-------------|-------------| +| `UserRegisteredEvent` | `user.registered` | Published when user completes registration | +| `OtpRequestedEvent` | `user.otp.requested` | Published when user requests OTP | **Publishing Events** (in User module): ```java // In AuthApplicationService after registration -domainEventPublisher.publish(new UserRegisteredEvent(user.getId(),user. +domainEventPublisher.publish(new UserRegisteredEvent(user.getId(), user.getEmail(), user.getName())); +``` -getEmail(),user. +**Event Definition with Kafka Externalization**: -getName())); +```java +@Externalized("user.registered") // Kafka topic name +public record UserRegisteredEvent(UUID userId, String email, String userName, LocalDateTime occurredOn) + implements DomainEvent {} ``` **Listening to Events** (in Notification module): ```java - +// In-process listener (immediate, same JVM) @Component -public class UserEventListener { +public class ApplicationEventListener { @ApplicationModuleListener public void onUserRegistered(UserRegisteredEvent event) { notificationService.sendWelcomeEmail(event.email(), event.userName()); } } + +// Kafka consumer (for external/distributed processing) +@Component +public class KafkaEventListener { + @KafkaListener(topics = "user.registered", groupId = "notification-service") + public void onUserRegistered(String message) { + // Decode Base64 and deserialize JSON + } +} ``` **Event Publication Table**: Spring Modulith persists events to `event_publication` table for reliable delivery ( -transactional outbox pattern). +transactional outbox pattern). Events are stored before being sent to Kafka, ensuring at-least-once delivery. ### Configuration Management @@ -378,6 +400,7 @@ JWT_REFRESH_SECRET= JWT_ACCESS_EXPIRES_IN=15m JWT_REFRESH_EXPIRES_IN=7d CLIENT_URL=http://localhost:3000 +SPRING_KAFKA_BOOTSTRAP_SERVERS=kafka:9092 ``` **Configuration Binding**: @@ -564,9 +587,10 @@ class ModulithArchitectureTest { 1. Create event record in `shared/kernel/domain/event/` (if cross-module) or `{module}/domain/event/` (if module-internal) -2. Inject `DomainEventPublisher` in your service -3. Call `domainEventPublisher.publish(event)` after business logic -4. Create `@ApplicationModuleListener` in consuming module +2. Add `@Externalized("topic-name")` annotation to publish to Kafka +3. Inject `DomainEventPublisher` in your service +4. Call `domainEventPublisher.publish(event)` after business logic +5. Create `@ApplicationModuleListener` in consuming module (in-process) and/or `@KafkaListener` (Kafka consumer) **Database Schema Change**: @@ -604,11 +628,20 @@ class ModulithArchitectureTest { - Cross-module events go in `shared.kernel.domain.event/` - Use `@ApplicationModuleListener` for reliable event handling (auto-retry, persistence) +**Kafka Integration**: + +- Events with `@Externalized` annotation are automatically published to Kafka topics +- Consumer group: `notification-service` +- Messages are Base64-encoded JSON +- Kafka ports: `9092` (internal Docker), `29092` (external/host) +- Topics are auto-created on first publish + **Future Microservice Extraction**: When ready to extract a module as a microservice: -1. Events become messages (Kafka/RabbitMQ) +1. Events are already externalized to Kafka - no change needed 2. REST/gRPC calls replace direct method calls 3. Module's `infrastructure/` adapters change, domain stays the same 4. Database can be separated per module +5. Kafka consumers in extracted service continue to receive events diff --git a/README.md b/README.md index c08428c..3820351 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,7 @@ A comprehensive user authentication and management service featuring OAuth2, OTP | **Passwordless** | One-Time Password (OTP) login flow via email | | **Governance** | Role-based access control (MEMBER, ADMIN), profile management | | **Architecture** | **Modular Monolith** (Spring Modulith), DDD, Event-driven communication | +| **Messaging** | **Apache Kafka** for event externalization (Spring Modulith integration)| | **Performance** | **Java 25 Virtual Threads**, gRPC for high-speed communication | ## Documentation Hub @@ -28,7 +29,7 @@ A comprehensive user authentication and management service featuring OAuth2, OTP ### Prerequisites - **Java 25** & Gradle 8+ -- PostgreSQL 17 or Docker +- PostgreSQL 17, Apache Kafka, or Docker ### Running Locally 1. Copy `.env.example` to `.env` and configure secrets (JWT, Mail, OAuth2). diff --git a/docs/architecture.md b/docs/architecture.md index 3aa9ca1..f8a510a 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -15,16 +15,33 @@ The application uses **Spring Modulith** to enforce a clean, modular structure w ### Communication Pattern Modules communicate primarily via **Domain Events** to ensure loose coupling: - When a user registers or requests an OTP, a domain event is published. -- The `NotificationModule` listens for these events (`UserEventListener`, `OtpEventListener`) and processes them (e.g., sends an email via SMTP). +- Events are **externalized to Apache Kafka** via Spring Modulith's event externalization. +- The `NotificationModule` listens for these events via both in-process listeners (`ApplicationEventListener`) and Kafka consumers (`KafkaEventListener`). ```mermaid graph TD - UserModule[User Module] -- "Domain Event" --> NotificationModule[Notification Module] + UserModule[User Module] -- "Domain Event" --> EventPublisher[Spring Event Publisher] + EventPublisher -- "In-Process" --> ApplicationListener[ApplicationEventListener] + EventPublisher -- "Kafka" --> KafkaTopic[Kafka Topics] + KafkaTopic -- "Subscribe" --> KafkaListener[KafkaEventListener] + ApplicationListener --> NotificationModule[Notification Module] + KafkaListener --> NotificationModule UserModule --> SharedKernel[Shared Kernel] NotificationModule --> SharedKernel[Shared Kernel] Infrastructure --> SharedKernel[Shared Kernel] ``` +### Kafka Event Externalization + +Domain events marked with `@Externalized` are automatically published to Kafka topics: + +| Event | Kafka Topic | Description | +|-------|-------------|-------------| +| `UserRegisteredEvent` | `user.registered` | Published when a user completes registration | +| `OtpRequestedEvent` | `user.otp.requested` | Published when a user requests an OTP code | + +**Transactional Outbox Pattern**: Events are first persisted to the `event_publication` database table, ensuring at-least-once delivery even if Kafka is temporarily unavailable. + ## Security Design ### JWT & Token Strategy @@ -44,6 +61,7 @@ graph TD - **Virtual Threads (Project Loom)**: Enabled to handle high concurrency with minimal resource overhead. - **PostgreSQL 17**: Leverages modern database features with UUID-based primary keys. - **gRPC Support**: High-performance binary communication for internal service-to-service calls. +- **Apache Kafka**: Reliable, scalable event streaming for cross-service communication and future microservice extraction. ## Observability - **Actuator**: Comprehensive health and metrics exposure. diff --git a/docs/configuration.md b/docs/configuration.md index d429552..b68cda5 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -40,6 +40,12 @@ This document details the configuration options and environment variables for th | `MAIL_FROM` | Sender email address | `noreply@example.com` | | `MAIL_FROM_NAME` | Sender display name | `User Service` | +### Kafka +| Variable | Description | Default | +| ---------------------------------- | -------------------------------- | ----------------- | +| `SPRING_KAFKA_BOOTSTRAP_SERVERS` | Kafka broker addresses | `localhost:29092` | +| `SPRING_KAFKA_CONSUMER_GROUP_ID` | Consumer group ID | `notification-service` | + ### System & Ports | Variable | Description | Default | | ---------------------- | -------------------- | ----------------------- | @@ -48,7 +54,7 @@ This document details the configuration options and environment variables for th | `CORS_ALLOWED_ORIGINS` | Allowed CORS origins | `http://localhost:3000` | ## Application Profiles -- `local`: Enables Docker Compose for local PostgreSQL and gRPC reflection. -- `dev`: Standard development settings. -- `prod`: Optimized for production (caching, production database). -- `test`: Used for integration tests with Testcontainers. +- `local`: Enables Docker Compose for local PostgreSQL, Kafka, and gRPC reflection. +- `dev`: Standard development settings with external services. +- `prod`: Optimized for production (caching, production database, external Kafka). +- `test`: Used for integration tests with Testcontainers (PostgreSQL and Kafka). diff --git a/docs/development.md b/docs/development.md index 323b907..648a168 100644 --- a/docs/development.md +++ b/docs/development.md @@ -16,11 +16,11 @@ The project includes a set of scripts in the `auto/` directory to simplify commo | `./auto/clean` | Cleans the build directory. | ### Docker Operations -| Script | Description | -| --------------------- | -------------------------------------------------------- | -| `./auto/docker_start` | Starts the entire stack (App + DB) using Docker Compose. | -| `./auto/docker_stop` | Stops all running containers for the project. | -| `./auto/docker_logs` | Tails the logs for the application container. | +| Script | Description | +| --------------------- | ------------------------------------------------------------------ | +| `./auto/docker_start` | Starts the entire stack (App + DB + Kafka) using Docker Compose. | +| `./auto/docker_stop` | Stops all running containers for the project. | +| `./auto/docker_logs` | Tails the logs for the application container. | ### Advanced | Script | Description | @@ -31,7 +31,7 @@ The project includes a set of scripts in the `auto/` directory to simplify commo ## Detailed Workflows ### Running Locally -To start the application with local environment variables and a PostgreSQL database (via Docker Compose): +To start the application with local environment variables, PostgreSQL, and Kafka (via Docker Compose): ```bash ./auto/run ``` @@ -48,7 +48,7 @@ We use JUnit 5 and Testcontainers for testing. # Run all tests and generate coverage ./auto/test ``` -*Stack: JUnit 5, Mockito, Testcontainers (PostgreSQL), AssertJ, WebTestClient.* +*Stack: JUnit 5, Mockito, Testcontainers (PostgreSQL, Kafka), AssertJ, WebTestClient.* ### Code Quality Maintain code style and verify architecture boundaries: @@ -61,5 +61,30 @@ Maintain code style and verify architecture boundaries: ``` ## Local Testing Environment -- `spring-boot-docker-compose`: Automatically manages the `docker-compose.yml` for local development. +- `spring-boot-docker-compose`: Automatically manages the `docker-compose.yml` for local development (PostgreSQL + Kafka). - `application-test.yml`: Test-specific configuration. + +## Kafka Development + +### Local Kafka Access +When running locally with Docker Compose: +- **Internal (container-to-container)**: `kafka:9092` +- **External (host machine)**: `localhost:29092` + +### Debugging Kafka Topics +```bash +# List all topics +docker exec user-application-kafka /opt/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092 + +# Consume messages from a topic +docker exec user-application-kafka /opt/kafka/bin/kafka-console-consumer.sh \ + --bootstrap-server localhost:9092 \ + --topic user.registered \ + --from-beginning +``` + +### Event Topics +| Topic | Event | Published When | +|-------|-------|----------------| +| `user.registered` | `UserRegisteredEvent` | User completes registration | +| `user.otp.requested` | `OtpRequestedEvent` | User requests an OTP code | From 8763fe66946309c88d254b07cbccb9e6301fe23b Mon Sep 17 00:00:00 2001 From: Daniel Date: Wed, 31 Dec 2025 11:27:37 +1100 Subject: [PATCH 5/5] Fix docker build failure --- Dockerfile | 8 ++++---- settings.gradle.kts | 1 + 2 files changed, 5 insertions(+), 4 deletions(-) create mode 100644 settings.gradle.kts diff --git a/Dockerfile b/Dockerfile index 2ba3ad1..e3c5e96 100644 --- a/Dockerfile +++ b/Dockerfile @@ -2,7 +2,7 @@ # Multi-stage Dockerfile for Spring Boot Application # ============================================================================= # Build: docker build -t user-service . -# Run: docker run -p 3001:3001 -p 9090:9090 user-service +# Run: docker run -p 8080:8080 -p 9090:9090 user-service # ============================================================================= # ----------------------------------------------------------------------------- @@ -14,7 +14,7 @@ WORKDIR /app # Copy Gradle wrapper and build files first (for layer caching) COPY gradle/ gradle/ -COPY gradlew build.gradle.kts ./ +COPY gradlew build.gradle.kts settings.gradle.kts gradle.properties ./ # Make gradlew executable RUN chmod +x ./gradlew @@ -49,11 +49,11 @@ RUN chown -R appuser:appgroup /app USER appuser # Expose REST and gRPC ports -EXPOSE 3001 9090 +EXPOSE 8080 9090 # Health check HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \ - CMD wget --no-verbose --tries=1 --spider http://localhost:3001/actuator/health || exit 1 + CMD wget --no-verbose --tries=1 --spider http://localhost:8080/actuator/health || exit 1 # JVM optimizations for containers ENV JAVA_OPTS="-XX:+UseContainerSupport \ diff --git a/settings.gradle.kts b/settings.gradle.kts new file mode 100644 index 0000000..3e0e9bc --- /dev/null +++ b/settings.gradle.kts @@ -0,0 +1 @@ +rootProject.name = "user-service"