diff --git a/README.md b/README.md index a5bb51a..5b031a7 100644 --- a/README.md +++ b/README.md @@ -119,6 +119,7 @@ PCM uses a centralized configuration model via **Spring Cloud Config**. All core ## 📚 Documentation - [**Quick Start Guide**](docs/QUICKSTART.md) - Get PCM running locally in 5 minutes. +- [**API Reference**](docs/API_REFERENCE.md) - Endpoints, payloads, and Examples. - [**Architecture Decision Records**](docs/architecture/) - Design decisions and rationale. --- diff --git a/config-service/src/main/resources/config/application.yml b/config-service/src/main/resources/config/application.yml index 5f4bd11..1a3ff48 100644 --- a/config-service/src/main/resources/config/application.yml +++ b/config-service/src/main/resources/config/application.yml @@ -32,21 +32,50 @@ spring: port: ${REDIS_PORT:6779} timeout: 2000ms - # Shared Kafka - kafka: - bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092} - producer: - key-serializer: org.apache.kafka.common.serialization.StringSerializer - value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer - acks: all - retries: 3 - consumer: - group-id: ${spring.application.name}-group - key-deserializer: org.apache.kafka.common.serialization.StringDeserializer - value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer - properties: - schema.registry.url: ${SCHEMA_REGISTRY_URL:http://localhost:8081} - specific.avro.reader: true + # Shared Spring Cloud Stream (Messaging Abstraction) + cloud: + function: + definition: ${PCM_STREAM_FUNCTIONS:} # To be overridden in service-specific configs if needed + stream: + kafka: + binder: + brokers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092} + configuration: + schema.registry.url: ${SCHEMA_REGISTRY_URL:http://localhost:8081} + specific.avro.reader: true + bindings: + # Producers (profile-service) + profileCreated-out-0: + destination: profile-events + content-type: application/*+avro + profileUpdated-out-0: + destination: profile-events + content-type: application/*+avro + profileErased-out-0: + destination: profile-events + content-type: application/*+avro + + # Consumers + profileCreated-in-0: + destination: profile-events + group: ${spring.application.name}-group + content-type: application/*+avro + profileUpdated-in-0: + destination: profile-events + group: ${spring.application.name}-group + content-type: application/*+avro + profileErased-in-0: + destination: profile-events + group: ${spring.application.name}-group + content-type: application/*+avro + + # Producers (consent-service) + consentGranted-out-0: + destination: consent-events + content-type: application/*+avro + consentRevoked-out-0: + destination: consent-events + content-type: application/*+avro # Shared Vault cloud: diff --git a/config-service/src/main/resources/config/preference-service.yml b/config-service/src/main/resources/config/preference-service.yml index 48bdfb3..ca1757d 100644 --- a/config-service/src/main/resources/config/preference-service.yml +++ b/config-service/src/main/resources/config/preference-service.yml @@ -7,6 +7,9 @@ spring: kafka: consumer: group-id: preference-service-group + cloud: + function: + definition: profileErased grpc: server: diff --git a/config-service/src/main/resources/config/segment-service.yml b/config-service/src/main/resources/config/segment-service.yml index a7e960b..f3461b2 100644 --- a/config-service/src/main/resources/config/segment-service.yml +++ b/config-service/src/main/resources/config/segment-service.yml @@ -9,6 +9,9 @@ spring: kafka: consumer: group-id: segment-service-group + cloud: + function: + definition: profileCreated;profileUpdated grpc: server: diff --git a/consent-service/pom.xml b/consent-service/pom.xml index 2762018..608e8c3 100644 --- a/consent-service/pom.xml +++ b/consent-service/pom.xml @@ -61,10 +61,10 @@ 2.15.0.RELEASE - + - org.springframework.kafka - spring-kafka + org.springframework.cloud + spring-cloud-starter-stream-kafka diff --git a/consent-service/src/main/java/dev/vibeafrika/pcm/consent/infrastructure/messaging/KafkaConsentEventPublisher.java b/consent-service/src/main/java/dev/vibeafrika/pcm/consent/infrastructure/messaging/KafkaConsentEventPublisher.java index 112fb58..9be9815 100644 --- a/consent-service/src/main/java/dev/vibeafrika/pcm/consent/infrastructure/messaging/KafkaConsentEventPublisher.java +++ b/consent-service/src/main/java/dev/vibeafrika/pcm/consent/infrastructure/messaging/KafkaConsentEventPublisher.java @@ -6,8 +6,8 @@ import dev.vibeafrika.pcm.events.ConsentPurpose; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.kafka.core.KafkaTemplate; + +import org.springframework.cloud.stream.function.StreamBridge; import org.springframework.stereotype.Component; /** @@ -19,53 +19,50 @@ @RequiredArgsConstructor public class KafkaConsentEventPublisher implements ConsentEventPublisher { - private final KafkaTemplate kafkaTemplate; - - @Value("${pcm.topics.consent-events:consent-events}") - private String consentEventsTopic; + private final StreamBridge streamBridge; @Override public void publish(ConsentGrantedEvent domainEvent) { - dev.vibeafrika.pcm.events.ConsentGrantedEvent avroEvent = dev.vibeafrika.pcm.events.ConsentGrantedEvent.newBuilder() - .setEventId(domainEvent.getEventId()) - .setEventType(domainEvent.getEventType()) - .setOccurredAt(domainEvent.getOccurredAt().toEpochMilli()) - .setVersion(1) - .setTenantId(domainEvent.getTenantId()) - .setConsentId(domainEvent.getAggregateId()) - .setProfileId(domainEvent.getProfileId()) - .setPurpose(ConsentPurpose.valueOf(domainEvent.getPurpose())) - .setConsentVersion(domainEvent.getConsentVersion()) - .setProofHash(domainEvent.getProofHash()) - .build(); + dev.vibeafrika.pcm.events.ConsentGrantedEvent avroEvent = dev.vibeafrika.pcm.events.ConsentGrantedEvent + .newBuilder() + .setEventId(domainEvent.getEventId()) + .setEventType(domainEvent.getEventType()) + .setOccurredAt(domainEvent.getOccurredAt().toEpochMilli()) + .setVersion(1) + .setTenantId(domainEvent.getTenantId()) + .setConsentId(domainEvent.getAggregateId()) + .setProfileId(domainEvent.getProfileId()) + .setPurpose(ConsentPurpose.valueOf(domainEvent.getPurpose())) + .setConsentVersion(domainEvent.getConsentVersion()) + .setProofHash(domainEvent.getProofHash()) + .build(); - publish(avroEvent.getProfileId().toString(), avroEvent); + send("consentGranted-out-0", avroEvent.getProfileId().toString(), avroEvent); } @Override public void publish(ConsentRevokedEvent domainEvent) { - dev.vibeafrika.pcm.events.ConsentRevokedEvent avroEvent = dev.vibeafrika.pcm.events.ConsentRevokedEvent.newBuilder() - .setEventId(domainEvent.getEventId()) - .setEventType(domainEvent.getEventType()) - .setOccurredAt(domainEvent.getOccurredAt().toEpochMilli()) - .setVersion(1) - .setTenantId(domainEvent.getTenantId()) - .setConsentId(domainEvent.getAggregateId()) - .setProfileId(domainEvent.getProfileId()) - .setPurpose(ConsentPurpose.valueOf(domainEvent.getPurpose())) - .build(); + dev.vibeafrika.pcm.events.ConsentRevokedEvent avroEvent = dev.vibeafrika.pcm.events.ConsentRevokedEvent + .newBuilder() + .setEventId(domainEvent.getEventId()) + .setEventType(domainEvent.getEventType()) + .setOccurredAt(domainEvent.getOccurredAt().toEpochMilli()) + .setVersion(1) + .setTenantId(domainEvent.getTenantId()) + .setConsentId(domainEvent.getAggregateId()) + .setProfileId(domainEvent.getProfileId()) + .setPurpose(ConsentPurpose.valueOf(domainEvent.getPurpose())) + .build(); - publish(avroEvent.getProfileId().toString(), avroEvent); + send("consentRevoked-out-0", avroEvent.getProfileId().toString(), avroEvent); } - private void publish(String key, Object event) { - kafkaTemplate.send(consentEventsTopic, key, event) - .whenComplete((result, ex) -> { - if (ex == null) { - log.debug("Published consent event to topic {}: {}", consentEventsTopic, event); - } else { - log.error("Failed to publish consent event to topic {}: {}", consentEventsTopic, ex.getMessage()); - } - }); + private void send(String bindingName, String key, Object event) { + log.debug("Sending consent event to binding {}: {}", bindingName, event); + streamBridge.send(bindingName, + org.springframework.messaging.support.MessageBuilder + .withPayload(event) + .setHeader("partitionKey", key) + .build()); } } diff --git a/docs/API_REFERENCE.md b/docs/API_REFERENCE.md new file mode 100644 index 0000000..28c689b --- /dev/null +++ b/docs/API_REFERENCE.md @@ -0,0 +1,178 @@ +# API Reference & Curl Examples + +This document provide a comprehensive reference for the PCM (Profile & Consent Manager) REST APIs, including expected payloads and `curl` examples. + +## Base Infrastructure +| Service | Port | Description | +| :--- | :--- | :--- | +| **API Gateway** | `9880` | Entry point (Auth & Aggregation) | +| **Profile Service** | `18081` | PII & Identity Management | +| **Preference Service** | `18082` | UX & Application Settings | +| **Consent Service** | `18083` | Consent Ledger & GDPR Compliance | +| **Segment Service** | `18084` | User Classification | + +--- + +## Authentication & Headers +- **X-Tenant-Id**: Required for all requests. Default is `default`. +- **Authorization**: Bearer token required for Gateway endpoints (Keycloak JWT). + +--- + +## 1. Profile Service (`:18081`) + +### Create a Profile +Used to initialize a new user record. + +**Endpoint**: `POST /api/v1/profiles` + +**Payload**: +```json +{ + "id": "550e8400-e29b-41d4-a716-446655440000", + "handle": "jdoe", + "attributes": { + "fullName": "John Doe", + "email": "john.doe@example.com", + "country": "FR" + } +} +``` + +**Curl**: +```bash +curl -X POST http://localhost:18081/api/v1/profiles \ + -H "Content-Type: application/json" \ + -H "X-Tenant-Id: default" \ + -d '{ + "id": "550e8400-e29b-41d4-a716-446655440000", + "handle": "jdoe", + "attributes": { + "fullName": "John Doe", + "email": "john.doe@example.com", + "country": "FR" + } + }' +``` + +### Get Profile +Retrieve a profile. Sensitive attributes are decrypted automatically if Vault is enabled. + +**Endpoint**: `GET /api/v1/profiles/{id}` + +**Curl**: +```bash +curl http://localhost:18081/api/v1/profiles/550e8400-e29b-41d4-a716-446655440000 \ + -H "X-Tenant-Id: default" +``` + +--- + +## 2. Consent Service (`:18083`) + +### Grant Consent +Records a positive consent action in the ledger. + +**Endpoint**: `POST /api/v1/consents/{profileId}/grant` + +**Payload**: +```json +{ + "purpose": "MARKETING", + "version": "v1.2", + "consentText": "I agree to receive marketing emails.", + "metadata": { + "source": "web-form-footer" + } +} +``` + +**Curl**: +```bash +curl -X POST http://localhost:18083/api/v1/consents/550e8400-e29b-41d4-a716-446655440000/grant \ + -H "Content-Type: application/json" \ + -H "X-Tenant-ID: default" \ + -d '{ + "purpose": "MARKETING", + "version": "v1.2", + "consentText": "I agree to receive marketing emails." + }' +``` + +### Verify Consent +Check if a user currently has granted permission for a specific purpose. + +**Endpoint**: `GET /api/v1/consents/{profileId}/verify?purpose=MARKETING` + +**Curl**: +```bash +curl "http://localhost:18083/api/v1/consents/550e8400-e29b-41d4-a716-446655440000/verify?purpose=MARKETING" \ + -H "X-Tenant-ID: default" +``` + +--- + +## 3. Preference Service (`:18082`) + +### Update Preferences +Update key-value settings for a user. + +**Endpoint**: `PATCH /api/v1/preferences/{profileId}` + +**Payload**: +```json +{ + "theme": "dark", + "language": "fr", + "notifications_enabled": "true" +} +``` + +**Curl**: +```bash +curl -X PATCH http://localhost:18082/api/v1/preferences/550e8400-e29b-41d4-a716-446655440000 \ + -H "Content-Type: application/json" \ + -H "X-Tenant-Id: default" \ + -d '{"theme": "dark", "language": "fr"}' +``` + +--- + +## 4. Segment Service (`:18084`) + +### Get User Segments +Retrieve computed segments (classification) for a user. + +**Endpoint**: `GET /api/v1/segments/{profileId}` + +**Curl**: +```bash +curl http://localhost:18084/api/v1/segments/550e8400-e29b-41d4-a716-446655440000 +``` + +--- + +## 5. API Gateway (Aggregation & Auth) (`:9880`) + +### Unified "Me" Endpoint +Returns an aggregated view of the authenticated user (Profile + Preferences + Segments). + +**Endpoint**: `GET /api/v1/users/me` (or `GET /api/v1/me`) + +**Curl**: +```bash +curl http://localhost:9880/api/v1/users/me \ + -H "Authorization: Bearer " \ + -H "X-Tenant-Id: default" +``` + +--- + +## Purpose Reference +Standard values for `purpose` in Consent & Segments: +- `MARKETING` +- `ANALYTICS` +- `PERSONALIZATION` +- `THIRD_PARTY_SHARING` +- `TERMS_AND_CONDITIONS` +- `PRIVACY_POLICY` diff --git a/preference-service/pom.xml b/preference-service/pom.xml index 1754a95..6f171bc 100644 --- a/preference-service/pom.xml +++ b/preference-service/pom.xml @@ -40,10 +40,10 @@ spring-boot-starter-data-redis - + - org.springframework.kafka - spring-kafka + org.springframework.cloud + spring-cloud-starter-stream-kafka dev.vibe-afrika diff --git a/preference-service/src/main/java/dev/vibeafrika/pcm/preference/infrastructure/messaging/KafkaProfileEventConsumer.java b/preference-service/src/main/java/dev/vibeafrika/pcm/preference/infrastructure/messaging/KafkaProfileEventConsumer.java index 5cc3284..cfbee96 100644 --- a/preference-service/src/main/java/dev/vibeafrika/pcm/preference/infrastructure/messaging/KafkaProfileEventConsumer.java +++ b/preference-service/src/main/java/dev/vibeafrika/pcm/preference/infrastructure/messaging/KafkaProfileEventConsumer.java @@ -4,13 +4,14 @@ import dev.vibeafrika.pcm.preference.domain.repository.PreferenceRepository; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import java.util.UUID; +import java.util.function.Consumer; /** - * Kafka consumer for profile erasure events. + * Consumer for profile erasure events using Spring Cloud Stream. * Ensures user preferences are deleted when a profile is erased (GDPR). */ @Component @@ -20,17 +21,19 @@ public class KafkaProfileEventConsumer { private final PreferenceRepository preferenceRepository; - @KafkaListener(topics = "profile-events") - public void handleProfileErased(ProfileErasedEvent event) { - log.info("Received ProfileErasedEvent for profile: {}. Cleaning up preferences.", event.getProfileId()); - try { - UUID profileId = UUID.fromString(event.getProfileId().toString()); - preferenceRepository.findByProfileId(profileId).ifPresent(preference -> { - log.info("Deleting preferences for profile: {}", profileId); - preferenceRepository.delete(preference); - }); - } catch (Exception e) { - log.error("Failed to process profile erasure for preferences: {}", e.getMessage()); - } + @Bean + public Consumer profileErased() { + return event -> { + log.info("Received ProfileErasedEvent for profile: {}. Cleaning up preferences.", event.getProfileId()); + try { + UUID profileId = UUID.fromString(event.getProfileId().toString()); + preferenceRepository.findByProfileId(profileId).ifPresent(preference -> { + log.info("Deleting preferences for profile: {}", profileId); + preferenceRepository.delete(preference); + }); + } catch (Exception e) { + log.error("Failed to process profile erasure for preferences: {}", e.getMessage()); + } + }; } } diff --git a/profile-service/pom.xml b/profile-service/pom.xml index e9379e2..c8ea113 100644 --- a/profile-service/pom.xml +++ b/profile-service/pom.xml @@ -65,10 +65,10 @@ 2.15.0.RELEASE - + - org.springframework.kafka - spring-kafka + org.springframework.cloud + spring-cloud-starter-stream-kafka diff --git a/profile-service/src/main/java/dev/vibeafrika/pcm/profile/infrastructure/messaging/KafkaProfileEventPublisher.java b/profile-service/src/main/java/dev/vibeafrika/pcm/profile/infrastructure/messaging/KafkaProfileEventPublisher.java index 930d87d..99f2481 100644 --- a/profile-service/src/main/java/dev/vibeafrika/pcm/profile/infrastructure/messaging/KafkaProfileEventPublisher.java +++ b/profile-service/src/main/java/dev/vibeafrika/pcm/profile/infrastructure/messaging/KafkaProfileEventPublisher.java @@ -8,8 +8,7 @@ import dev.vibeafrika.pcm.profile.domain.event.ProfileEventPublisher; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.cloud.stream.function.StreamBridge; import org.springframework.stereotype.Component; import java.util.Map; @@ -23,12 +22,9 @@ @RequiredArgsConstructor public class KafkaProfileEventPublisher implements ProfileEventPublisher { - private final KafkaTemplate kafkaTemplate; + private final StreamBridge streamBridge; private final ObjectMapper objectMapper; - @Value("${pcm.topics.profile-events:profile-events}") - private String profileEventsTopic; - @Override public void publish(ProfileCreatedEvent domainEvent) { dev.vibeafrika.pcm.events.ProfileCreatedEvent avroEvent = dev.vibeafrika.pcm.events.ProfileCreatedEvent @@ -43,7 +39,7 @@ public void publish(ProfileCreatedEvent domainEvent) { .setAttributes(toJson(domainEvent.getAttributes())) .build(); - publish(avroEvent.getProfileId(), avroEvent); + send("profileCreated-out-0", avroEvent.getProfileId(), avroEvent); } @Override @@ -59,7 +55,7 @@ public void publish(ProfileUpdatedEvent domainEvent) { .setUpdatedAttributes(toJson(domainEvent.getUpdatedAttributes())) .build(); - publish(avroEvent.getProfileId(), avroEvent); + send("profileUpdated-out-0", avroEvent.getProfileId(), avroEvent); } @Override @@ -74,18 +70,22 @@ public void publish(ProfileErasedEvent domainEvent) { .setProfileId(domainEvent.getAggregateId()) .build(); - publish(avroEvent.getProfileId(), avroEvent); + send("profileErased-out-0", avroEvent.getProfileId(), avroEvent); } - private void publish(String key, Object event) { - kafkaTemplate.send(profileEventsTopic, key, event) - .whenComplete((result, ex) -> { - if (ex == null) { - log.debug("Published event to topic {}: {}", profileEventsTopic, event); - } else { - log.error("Failed to publish event to topic {}: {}", profileEventsTopic, ex.getMessage()); - } - }); + private void send(String bindingName, String key, Object event) { + log.debug("Sending event to binding {}: {}", bindingName, event); + boolean sent = streamBridge.send(bindingName, + org.springframework.messaging.support.MessageBuilder + .withPayload(event) + .setHeader("partitionKey", key) + .build()); + + if (sent) { + log.debug("Successfully sent event to binding {}", bindingName); + } else { + log.error("Failed to send event to binding {}", bindingName); + } } private String toJson(Map map) { diff --git a/profile-service/src/test/java/dev/vibeafrika/pcm/profile/application/usecase/CreateProfileUseCaseTest.java b/profile-service/src/test/java/dev/vibeafrika/pcm/profile/application/usecase/CreateProfileUseCaseTest.java index ad62a34..079ffa1 100644 --- a/profile-service/src/test/java/dev/vibeafrika/pcm/profile/application/usecase/CreateProfileUseCaseTest.java +++ b/profile-service/src/test/java/dev/vibeafrika/pcm/profile/application/usecase/CreateProfileUseCaseTest.java @@ -6,7 +6,6 @@ import dev.vibeafrika.pcm.profile.application.service.PIIProtectionService; import dev.vibeafrika.pcm.profile.domain.event.ProfileCreatedEvent; import dev.vibeafrika.pcm.profile.domain.event.ProfileEventPublisher; -import dev.vibeafrika.pcm.profile.domain.model.Handle; import dev.vibeafrika.pcm.profile.domain.model.Profile; import dev.vibeafrika.pcm.profile.domain.repository.ProfileRepository; import org.junit.jupiter.api.BeforeEach; diff --git a/segment-service/pom.xml b/segment-service/pom.xml index b9a2bf5..718d4ab 100644 --- a/segment-service/pom.xml +++ b/segment-service/pom.xml @@ -44,10 +44,10 @@ spring-boot-starter-data-elasticsearch - + - org.springframework.kafka - spring-kafka + org.springframework.cloud + spring-cloud-starter-stream-kafka dev.vibe-afrika diff --git a/segment-service/src/main/java/dev/vibeafrika/pcm/segment/infrastructure/messaging/KafkaProfileEventConsumer.java b/segment-service/src/main/java/dev/vibeafrika/pcm/segment/infrastructure/messaging/KafkaProfileEventConsumer.java index 8630b9a..e3d9ea1 100644 --- a/segment-service/src/main/java/dev/vibeafrika/pcm/segment/infrastructure/messaging/KafkaProfileEventConsumer.java +++ b/segment-service/src/main/java/dev/vibeafrika/pcm/segment/infrastructure/messaging/KafkaProfileEventConsumer.java @@ -5,14 +5,17 @@ import dev.vibeafrika.pcm.segment.application.usecase.ClassifyUserUseCase; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; +import com.fasterxml.jackson.core.type.TypeReference; + import java.util.Map; import java.util.UUID; +import java.util.function.Consumer; /** - * Kafka consumer for profile-related events. + * Consumer for profile-related events using Spring Cloud Stream. * Triggers re-classification whenever a profile is created or updated. */ @Component @@ -23,37 +26,41 @@ public class KafkaProfileEventConsumer { private final ClassifyUserUseCase classifyUserUseCase; private final com.fasterxml.jackson.databind.ObjectMapper objectMapper; - @KafkaListener(topics = "${pcm.topics.profile-events:profile-events}") - public void handleProfileCreated(ProfileCreatedEvent event) { - log.info("Received ProfileCreatedEvent for profile: {}", event.getProfileId()); - try { - Map attributes = objectMapper.readValue(event.getAttributes().toString(), - new com.fasterxml.jackson.core.type.TypeReference>() {}); - - classifyUserUseCase.execute(new ClassifyUserUseCase.Input( - event.getTenantId().toString(), - UUID.fromString(event.getProfileId().toString()), - attributes - )); - } catch (Exception e) { - log.error("Failed to parse profile attributes for classification: {}", e.getMessage()); - } + @Bean + public Consumer profileCreated() { + return event -> { + log.info("Received ProfileCreatedEvent for profile: {}", event.getProfileId()); + try { + Map attributes = objectMapper.readValue(event.getAttributes().toString(), + new TypeReference>() { + }); + + classifyUserUseCase.execute(new ClassifyUserUseCase.Input( + event.getTenantId().toString(), + UUID.fromString(event.getProfileId().toString()), + attributes)); + } catch (Exception e) { + log.error("Failed to parse profile attributes for classification: {}", e.getMessage()); + } + }; } - @KafkaListener(topics = "${pcm.topics.profile-events:profile-events}") - public void handleProfileUpdated(ProfileUpdatedEvent event) { - log.info("Received ProfileUpdatedEvent for profile: {}", event.getProfileId()); - try { - Map attributes = objectMapper.readValue(event.getUpdatedAttributes().toString(), - new com.fasterxml.jackson.core.type.TypeReference>() {}); - - classifyUserUseCase.execute(new ClassifyUserUseCase.Input( - event.getTenantId().toString(), - UUID.fromString(event.getProfileId().toString()), - attributes - )); - } catch (Exception e) { - log.error("Failed to parse updated profile attributes for classification: {}", e.getMessage()); - } + @Bean + public Consumer profileUpdated() { + return event -> { + log.info("Received ProfileUpdatedEvent for profile: {}", event.getProfileId()); + try { + Map attributes = objectMapper.readValue(event.getUpdatedAttributes().toString(), + new TypeReference>() { + }); + + classifyUserUseCase.execute(new ClassifyUserUseCase.Input( + event.getTenantId().toString(), + UUID.fromString(event.getProfileId().toString()), + attributes)); + } catch (Exception e) { + log.error("Failed to parse updated profile attributes for classification: {}", e.getMessage()); + } + }; } }