diff --git a/packages/Kafka/src/Configuration/KafkaModule.php b/packages/Kafka/src/Configuration/KafkaModule.php index a24edb7bd..3e1142765 100644 --- a/packages/Kafka/src/Configuration/KafkaModule.php +++ b/packages/Kafka/src/Configuration/KafkaModule.php @@ -133,6 +133,9 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO foreach ($publisherConfigurations as $publisherConfiguration) { $kafkaBrokerConfigurations[$publisherConfiguration->getBrokerConfigurationReference()] = Reference::to($publisherConfiguration->getBrokerConfigurationReference()); } + if ($kafkaConsumers !== [] && ! array_key_exists(KafkaBrokerConfiguration::class, $kafkaBrokerConfigurations)) { + $kafkaBrokerConfigurations[KafkaBrokerConfiguration::class] = Reference::to(KafkaBrokerConfiguration::class); + } foreach ($this->kafkaConsumersAnnotatedMethods as $kafkaConsumerAnnotatedMethod) { /** @var KafkaConsumer $kafkaConsumer */ diff --git a/packages/Kafka/tests/Integration/KafkaChannelAdapterTest.php b/packages/Kafka/tests/Integration/KafkaChannelAdapterTest.php index 888987c4a..d5b4f0518 100644 --- a/packages/Kafka/tests/Integration/KafkaChannelAdapterTest.php +++ b/packages/Kafka/tests/Integration/KafkaChannelAdapterTest.php @@ -5,11 +5,13 @@ namespace Test\Ecotone\Kafka\Integration; use Ecotone\Kafka\Api\KafkaHeader; +use Ecotone\Kafka\Attribute\KafkaConsumer; use Ecotone\Kafka\Configuration\KafkaBrokerConfiguration; use Ecotone\Kafka\Configuration\KafkaConsumerConfiguration; use Ecotone\Kafka\Configuration\KafkaPublisherConfiguration; use Ecotone\Kafka\Configuration\TopicConfiguration; use Ecotone\Kafka\Outbound\MessagePublishingException; +use Ecotone\Modelling\Attribute\QueryHandler; use Ecotone\Lite\EcotoneLite; use Ecotone\Lite\Test\FlowTestSupport; use Ecotone\Messaging\Channel\SimpleMessageChannelBuilder; @@ -360,4 +362,102 @@ public function test_sending_and_receiving_with_kafka_consumer_configuration(): self::assertCount(1, $messages); self::assertEquals('exampleData', $messages[0]['payload']); } + + public function test_sending_and_receiving_without_kafka_consumer_configuration(): void + { + $topicName = Uuid::uuid4()->toString(); + $ecotoneLite = EcotoneLite::bootstrapFlowTesting( + [ExampleKafkaConsumer::class], + [ + KafkaBrokerConfiguration::class => ConnectionTestCase::getConnection(), + new ExampleKafkaConsumer(), + ], + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE, ModulePackageList::KAFKA_PACKAGE])) + ->withExtensionObjects([ + KafkaPublisherConfiguration::createWithDefaults($topicName), + TopicConfiguration::createWithReferenceName('exampleTopic', $topicName), + ]), + licenceKey: LicenceTesting::VALID_LICENCE, + ); + + /** @var MessagePublisher $kafkaPublisher */ + $kafkaPublisher = $ecotoneLite->getGateway(MessagePublisher::class); + + $kafkaPublisher->sendWithMetadata('exampleData', 'application/text', ['key' => 'value']); + + $ecotoneLite->run('exampleConsumer', ExecutionPollingMetadata::createWithTestingSetup( + maxExecutionTimeInMilliseconds: 30000 + )); + + $messages = $ecotoneLite->sendQueryWithRouting('getMessages'); + + self::assertCount(1, $messages); + self::assertEquals('exampleData', $messages[0]['payload']); + } + + public function test_kafka_consumer_works_without_explicit_configuration(): void + { + $topicName = 'test_topic_no_config_' . Uuid::uuid4()->toString(); + + $consumer = new class { + private array $messages = []; + + #[KafkaConsumer('ordersConsumer', 'orders')] + public function handle(string $payload): void + { + $this->messages[] = $payload; + } + + #[QueryHandler('consumer.getMessages')] + public function getMessages(): array + { + return $this->messages; + } + }; + + $ecotoneLite = EcotoneLite::bootstrapFlowTesting( + [$consumer::class], + [ + KafkaBrokerConfiguration::class => ConnectionTestCase::getConnection(), + $consumer, + ], + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::KAFKA_PACKAGE])) + ->withExtensionObjects([ + TopicConfiguration::createWithReferenceName('orders', $topicName), + ]), + licenceKey: LicenceTesting::VALID_LICENCE, + ); + + $ecotoneLite->run('ordersConsumer', ExecutionPollingMetadata::createWithTestingSetup()); + + $messages = $ecotoneLite->sendQueryWithRouting('consumer.getMessages'); + + self::assertCount(0, $messages); + } + + public function test_kafka_publisher_works_without_explicit_configuration(): void + { + $topicName = 'test_topic_publisher_' . Uuid::uuid4()->toString(); + + $ecotoneLite = EcotoneLite::bootstrapFlowTesting( + [], + [ + KafkaBrokerConfiguration::class => ConnectionTestCase::getConnection(), + ], + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::KAFKA_PACKAGE])) + ->withExtensionObjects([ + KafkaPublisherConfiguration::createWithDefaults($topicName), + ]), + licenceKey: LicenceTesting::VALID_LICENCE, + ); + + /** @var MessagePublisher $kafkaPublisher */ + $kafkaPublisher = $ecotoneLite->getGateway(MessagePublisher::class); + $kafkaPublisher->send('test-payload'); + + $this->assertTrue(true); + } } diff --git a/packages/PdoEventSourcing/src/Projecting/AggregateIdPartitionProvider.php b/packages/PdoEventSourcing/src/Projecting/AggregateIdPartitionProvider.php index f20af93c3..ce6d8795a 100644 --- a/packages/PdoEventSourcing/src/Projecting/AggregateIdPartitionProvider.php +++ b/packages/PdoEventSourcing/src/Projecting/AggregateIdPartitionProvider.php @@ -11,6 +11,7 @@ use Doctrine\DBAL\Platforms\MariaDBPlatform; use Doctrine\DBAL\Platforms\MySQLPlatform; use Doctrine\DBAL\Platforms\PostgreSQLPlatform; +use Ecotone\Dbal\AlreadyConnectedDbalConnectionFactory; use Ecotone\Dbal\MultiTenant\MultiTenantConnectionFactory; use Ecotone\EventSourcing\PdoStreamTableNameProvider; use Ecotone\Projecting\PartitionProvider; @@ -27,7 +28,7 @@ class AggregateIdPartitionProvider implements PartitionProvider * @param array $partitionedProjections List of projection names this provider handles */ public function __construct( - private DbalConnectionFactory|MultiTenantConnectionFactory $connectionFactory, + private DbalConnectionFactory|MultiTenantConnectionFactory|AlreadyConnectedDbalConnectionFactory $connectionFactory, private PdoStreamTableNameProvider $tableNameProvider, private array $partitionedProjections = [], ) { @@ -117,6 +118,10 @@ private function getConnection(): Connection return $this->connectionFactory->getConnection(); } + if ($this->connectionFactory instanceof AlreadyConnectedDbalConnectionFactory) { + return $this->connectionFactory->getConnection(); + } + return $this->connectionFactory->establishConnection(); } } diff --git a/packages/PdoEventSourcing/src/Projecting/PartitionState/DbalProjectionStateStorage.php b/packages/PdoEventSourcing/src/Projecting/PartitionState/DbalProjectionStateStorage.php index e4914bfb3..fec82a2fc 100644 --- a/packages/PdoEventSourcing/src/Projecting/PartitionState/DbalProjectionStateStorage.php +++ b/packages/PdoEventSourcing/src/Projecting/PartitionState/DbalProjectionStateStorage.php @@ -9,6 +9,7 @@ use Doctrine\DBAL\Connection; use Doctrine\DBAL\Platforms\MySQLPlatform; +use Ecotone\Dbal\AlreadyConnectedDbalConnectionFactory; use Ecotone\Dbal\MultiTenant\MultiTenantConnectionFactory; use Ecotone\EventSourcing\Database\ProjectionStateTableManager; use Ecotone\Projecting\NoOpTransaction; @@ -34,7 +35,7 @@ class DbalProjectionStateStorage implements ProjectionStateStorage * @param string[]|null $projectionNames */ public function __construct( - private DbalConnectionFactory|ManagerRegistryConnectionFactory|MultiTenantConnectionFactory $connectionFactory, + private DbalConnectionFactory|ManagerRegistryConnectionFactory|MultiTenantConnectionFactory|AlreadyConnectedDbalConnectionFactory $connectionFactory, private ProjectionStateTableManager $tableManager, private ?array $projectionNames = null, ) { diff --git a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSource.php b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSource.php index 5d0a9e249..a9b30c34b 100644 --- a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSource.php +++ b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSource.php @@ -12,6 +12,7 @@ use DateTimeZone; use Doctrine\DBAL\ArrayParameterType; use Doctrine\DBAL\Connection; +use Ecotone\Dbal\AlreadyConnectedDbalConnectionFactory; use Ecotone\Dbal\Compatibility\SchemaManagerCompatibility; use Ecotone\Dbal\MultiTenant\MultiTenantConnectionFactory; use Ecotone\EventSourcing\PdoStreamTableNameProvider; @@ -35,7 +36,7 @@ class EventStoreGlobalStreamSource implements StreamSource * @param string[] $handledProjectionNames */ public function __construct( - private DbalConnectionFactory|ManagerRegistryConnectionFactory|MultiTenantConnectionFactory $connectionFactory, + private DbalConnectionFactory|ManagerRegistryConnectionFactory|MultiTenantConnectionFactory|AlreadyConnectedDbalConnectionFactory $connectionFactory, private EcotoneClockInterface $clock, private PdoStreamTableNameProvider $tableNameProvider, private StreamFilterRegistry $streamFilterRegistry, diff --git a/packages/PdoEventSourcing/tests/Projecting/ProophIntegrationTest.php b/packages/PdoEventSourcing/tests/Projecting/ProophIntegrationTest.php index 1bf344fdf..d5026479e 100644 --- a/packages/PdoEventSourcing/tests/Projecting/ProophIntegrationTest.php +++ b/packages/PdoEventSourcing/tests/Projecting/ProophIntegrationTest.php @@ -7,6 +7,7 @@ namespace Test\Ecotone\EventSourcing\Projecting; +use Ecotone\Dbal\DbalConnection; use Ecotone\EventSourcing\Attribute\FromStream; use Ecotone\EventSourcing\Attribute\ProjectionInitialization; use Ecotone\Lite\EcotoneLite; @@ -451,4 +452,72 @@ public function init(): void self::assertSame(1, $projection->initCallCount, 'Init should be called once'); } + + public function test_projecting_with_already_connected_dbal_connection_factory(): void + { + $dbalConnectionFactory = self::getConnectionFactory(); + $connection = $dbalConnectionFactory->createContext()->getDbalConnection(); + $alreadyConnectedFactory = DbalConnection::create($connection); + + $projection = new #[ProjectionV2(self::NAME), FromStream(Ticket::STREAM_NAME)] class ($connection) extends DbalTicketProjection { + public const NAME = 'already_connected_projection'; + }; + + $ecotone = EcotoneLite::bootstrapFlowTestingWithEventStore( + [$projection::class, Ticket::class, TicketEventConverter::class, TicketAssigned::class], + [\Enqueue\Dbal\DbalConnectionFactory::class => $alreadyConnectedFactory, $projection, new TicketEventConverter()], + runForProductionEventStore: true, + licenceKey: LicenceTesting::VALID_LICENCE, + ); + + $ticketsCount = $ecotone->deleteEventStream(Ticket::STREAM_NAME) + ->deleteProjection($projection::NAME) + ->sendCommand(new CreateTicketCommand($ticketId = Uuid::uuid4()->toString())) + ->sendCommandWithRoutingKey(Ticket::ASSIGN_COMMAND, metadata: ['aggregate.id' => $ticketId]) + ->sendQueryWithRouting('getTicketsCount'); + + self::assertSame(1, $ticketsCount); + self::assertSame('assigned', $ecotone->sendQueryWithRouting('getTicketStatus', $ticketId)); + } + + public function test_partitioned_projection_with_already_connected_dbal_connection_factory(): void + { + $dbalConnectionFactory = self::getConnectionFactory(); + $connection = $dbalConnectionFactory->createContext()->getDbalConnection(); + $alreadyConnectedFactory = DbalConnection::create($connection); + + $projection = new #[ProjectionV2(self::NAME), Partitioned(MessageHeaders::EVENT_AGGREGATE_ID), FromStream(Ticket::STREAM_NAME, Ticket::class)] class ($connection) extends DbalTicketProjection { + public const NAME = 'already_connected_partitioned_projection'; + public int $initCallCount = 0; + + #[ProjectionInitialization] + public function init(): void + { + parent::init(); + $this->initCallCount++; + } + }; + + $ecotone = EcotoneLite::bootstrapFlowTestingWithEventStore( + [$projection::class, Ticket::class, TicketEventConverter::class, TicketAssigned::class], + [\Enqueue\Dbal\DbalConnectionFactory::class => $alreadyConnectedFactory, $projection, new TicketEventConverter()], + runForProductionEventStore: true, + licenceKey: LicenceTesting::VALID_LICENCE, + ); + + $ecotone->deleteEventStream(Ticket::STREAM_NAME) + ->deleteProjection($projection::NAME); + + $ticketId1 = Uuid::uuid4()->toString(); + $ticketId2 = Uuid::uuid4()->toString(); + + $ticketsCount = $ecotone->sendCommand(new CreateTicketCommand($ticketId1)) + ->sendCommand(new CreateTicketCommand($ticketId2)) + ->sendCommandWithRoutingKey(Ticket::ASSIGN_COMMAND, metadata: ['aggregate.id' => $ticketId1]) + ->sendCommandWithRoutingKey(Ticket::ASSIGN_COMMAND, metadata: ['aggregate.id' => $ticketId2]) + ->sendQueryWithRouting('getTicketsCount'); + + self::assertSame(2, $ticketsCount); + self::assertSame(2, $projection->initCallCount); + } }