diff --git a/devops-boot-project/devops-boot-core/devops-pulsar/src/main/kotlin/com/tencent/devops/stream/binder/pulsar/integration/inbound/PulsarAcknowledgmentCallback.kt b/devops-boot-project/devops-boot-core/devops-pulsar/src/main/kotlin/com/tencent/devops/stream/binder/pulsar/integration/inbound/PulsarAcknowledgmentCallback.kt new file mode 100644 index 0000000..330b3a7 --- /dev/null +++ b/devops-boot-project/devops-boot-core/devops-pulsar/src/main/kotlin/com/tencent/devops/stream/binder/pulsar/integration/inbound/PulsarAcknowledgmentCallback.kt @@ -0,0 +1,114 @@ +/* + * Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-CI 蓝鲸持续集成平台 is licensed under the MIT license. + * + * A copy of the MIT License is included in this file. + * + * + * Terms of the MIT License: + * --------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT + * LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN + * NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package com.tencent.devops.stream.binder.pulsar.integration.inbound + +import org.apache.pulsar.client.api.Consumer +import org.apache.pulsar.client.api.Message +import org.slf4j.LoggerFactory +import org.springframework.integration.acks.AcknowledgmentCallback +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean + +/** + * Pulsar 消息确认回调实现 + * 用于支持业务层手动控制消息确认 + */ +class PulsarAcknowledgmentCallback( + private val consumer: Consumer<*>, + private val message: Message<*> +) : AcknowledgmentCallback { + + private val acknowledged = AtomicBoolean(false) + private val autoAckEnabled = AtomicBoolean(false) + + companion object { + private val log = LoggerFactory.getLogger(PulsarAcknowledgmentCallback::class.java) + } + + /** + * 确认消息 + * @return true 如果确认成功,false 如果已经确认过 + */ + override fun acknowledge(status: AcknowledgmentCallback.Status) { + if (acknowledged.compareAndSet(false, true)) { + try { + when (status) { + AcknowledgmentCallback.Status.ACCEPT -> { + // 正常确认消息 + consumer.acknowledge(message) + if (log.isDebugEnabled) { + log.debug("Message acknowledged: ${message.messageId}") + } + } + + AcknowledgmentCallback.Status.REJECT -> { + // 拒绝消息,不重试 + consumer.negativeAcknowledge(message) + if (log.isDebugEnabled) { + log.debug("Message rejected: ${message.messageId}") + } + } + + AcknowledgmentCallback.Status.REQUEUE -> { + // 重新入队,触发重试 + consumer.reconsumeLater(message, 1000, TimeUnit.MILLISECONDS) // 延迟1秒重新消费 + if (log.isDebugEnabled) { + log.debug("Message requeued: ${message.messageId}") + } + } + } + } catch (e: Exception) { + log.error("Error while acknowledging message ${message.messageId}", e) + } + } else { + log.warn("Message ${message.messageId} has already been acknowledged") + } + } + + /** + * 判断消息是否已确认 + */ + override fun isAcknowledged(): Boolean { + return acknowledged.get() + } + + /** + * 不支持自动确认 + */ + override fun noAutoAck() { + autoAckEnabled.set(false) + } + + /** + * 判断是否自动确认 + */ + override fun isAutoAck(): Boolean { + return autoAckEnabled.get() + } +} + diff --git a/devops-boot-project/devops-boot-core/devops-pulsar/src/main/kotlin/com/tencent/devops/stream/binder/pulsar/integration/inbound/PulsarInboundChannelAdapter.kt b/devops-boot-project/devops-boot-core/devops-pulsar/src/main/kotlin/com/tencent/devops/stream/binder/pulsar/integration/inbound/PulsarInboundChannelAdapter.kt index 313a72d..4ba1e65 100644 --- a/devops-boot-project/devops-boot-core/devops-pulsar/src/main/kotlin/com/tencent/devops/stream/binder/pulsar/integration/inbound/PulsarInboundChannelAdapter.kt +++ b/devops-boot-project/devops-boot-core/devops-pulsar/src/main/kotlin/com/tencent/devops/stream/binder/pulsar/integration/inbound/PulsarInboundChannelAdapter.kt @@ -27,6 +27,7 @@ package com.tencent.devops.stream.binder.pulsar.integration.inbound +import com.tencent.devops.stream.binder.pulsar.properties.AckMode import com.tencent.devops.stream.binder.pulsar.properties.PulsarConsumerProperties import com.tencent.devops.stream.binder.pulsar.properties.PulsarProperties import com.tencent.devops.stream.binder.pulsar.support.PulsarMessageConverterSupport @@ -130,37 +131,73 @@ class PulsarInboundChannelAdapter( } private fun createListener(): (Consumer<*>, Message<*>) -> Unit { - return { it: Consumer<*>, msg: Message<*> -> + return { consumer: Consumer<*>, msg: Message<*> -> try { if (log.isDebugEnabled) { - log.debug("Message received $msg ${msg.messageId}") + log.debug("Message received ${msg.messageId}") } - val message = PulsarMessageConverterSupport.convertMessage2Spring(msg) + + // 根据 ACK 模式决定是否创建 ACK 回调 + val ackCallback = if (extendedConsumerProperties.extension.ackMode == AckMode.MANUAL.name) { + PulsarAcknowledgmentCallback(consumer, msg) + } else { + null + } + + val message = PulsarMessageConverterSupport.convertMessage2Spring(msg, ackCallback) + if (retryTemplate != null) { retryTemplate!!.execute( RetryCallback { _: RetryContext? -> sendMessage(message) - if (log.isDebugEnabled) { - log.debug("will send acknowledge: ${msg.messageId}") + + // 自动 ACK 模式下,消息处理完成后自动确认 + if (extendedConsumerProperties.extension.ackMode == AckMode.AUTO.name) { + if (log.isDebugEnabled) { + log.debug("will send acknowledge: ${msg.messageId}") + } + consumer.acknowledge(msg) + } else { + // 手动 ACK 模式下,记录日志提示业务层需要手动确认 + if (log.isDebugEnabled) { + log.debug("Manual ACK mode, waiting for business layer to acknowledge: ${msg.messageId}") + } } - it.acknowledge(msg) message }, recoveryCallback ) } else { sendMessage(message) - if (log.isDebugEnabled) { - log.debug("will send acknowledge: ${msg.messageId}") + + // 自动 ACK 模式下,消息处理完成后自动确认 + if (extendedConsumerProperties.extension.ackMode == AckMode.AUTO.name) { + if (log.isDebugEnabled) { + log.debug("will send acknowledge: ${msg.messageId}") + } + consumer.acknowledge(msg) + } else { + // 手动 ACK 模式下,记录日志提示业务层需要手动确认 + if (log.isDebugEnabled) { + log.debug("Manual ACK mode, waiting for business layer to acknowledge: ${msg.messageId}") + } } - it.acknowledge(msg) } + if (log.isDebugEnabled) { - log.debug("Message ${msg.messageId} has been consumed") + log.debug("Message ${msg.messageId} has been processed") } } catch (e: Exception) { log.warn("Error occurred while consuming message ${msg.messageId}: $e") - it.negativeAcknowledge(msg) + + // 如果是自动 ACK 模式或者发生异常,发送 negative ack + if (extendedConsumerProperties.extension.ackMode == AckMode.AUTO.name) { + consumer.negativeAcknowledge(msg) + } else { + // 手动 ACK 模式下,即使发生异常也不自动 negative ack + // 让业务层通过 AcknowledgmentCallback 处理 + log.warn("Manual ACK mode, exception occurred but not sending negative acknowledgement automatically") + } } } } diff --git a/devops-boot-project/devops-boot-core/devops-pulsar/src/main/kotlin/com/tencent/devops/stream/binder/pulsar/properties/PulsarConsumerProperties.kt b/devops-boot-project/devops-boot-core/devops-pulsar/src/main/kotlin/com/tencent/devops/stream/binder/pulsar/properties/PulsarConsumerProperties.kt index 8727fef..b7d6f58 100644 --- a/devops-boot-project/devops-boot-core/devops-pulsar/src/main/kotlin/com/tencent/devops/stream/binder/pulsar/properties/PulsarConsumerProperties.kt +++ b/devops-boot-project/devops-boot-core/devops-pulsar/src/main/kotlin/com/tencent/devops/stream/binder/pulsar/properties/PulsarConsumerProperties.kt @@ -200,5 +200,25 @@ data class PulsarConsumerProperties( /** * If replicateSubscriptionState is enabled, a subscription state is replicated to geo-replicated clusters. */ - var replicateSubscriptionState: Boolean = false + var replicateSubscriptionState: Boolean = false, + /** + * 消息确认模式 + * MANUAL: 手动确认,业务层需要通过 AcknowledgmentCallback 手动确认消息 + * AUTO: 自动确认,消息处理完成后自动确认 + */ + var ackMode: String = AckMode.AUTO.name ) : PulsarCommonProperties() + +/** + * 消息确认模式枚举 + */ +enum class AckMode { + /** + * 自动确认模式:消息处理完成后自动确认 + */ + AUTO, + /** + * 手动确认模式:业务层通过 AcknowledgmentCallback 手动确认 + */ + MANUAL +} diff --git a/devops-boot-project/devops-boot-core/devops-pulsar/src/main/kotlin/com/tencent/devops/stream/binder/pulsar/support/PulsarMessageConverterSupport.kt b/devops-boot-project/devops-boot-core/devops-pulsar/src/main/kotlin/com/tencent/devops/stream/binder/pulsar/support/PulsarMessageConverterSupport.kt index 9508b92..41011bf 100644 --- a/devops-boot-project/devops-boot-core/devops-pulsar/src/main/kotlin/com/tencent/devops/stream/binder/pulsar/support/PulsarMessageConverterSupport.kt +++ b/devops-boot-project/devops-boot-core/devops-pulsar/src/main/kotlin/com/tencent/devops/stream/binder/pulsar/support/PulsarMessageConverterSupport.kt @@ -33,7 +33,10 @@ import com.tencent.devops.stream.binder.pulsar.constant.PUBLISH_TIME import com.tencent.devops.stream.binder.pulsar.constant.TOPIC_NAME import com.tencent.devops.stream.binder.pulsar.convert.PulsarMessageConverter import com.tencent.devops.stream.binder.pulsar.custom.PulsarBeanContainerCache +import org.apache.pulsar.client.api.Consumer import org.apache.pulsar.client.api.TypedMessageBuilder +import org.springframework.integration.IntegrationMessageHeaderAccessor +import org.springframework.integration.acks.AcknowledgmentCallback import org.springframework.messaging.Message import org.springframework.messaging.MessageHeaders import org.springframework.messaging.converter.CompositeMessageConverter @@ -45,7 +48,10 @@ import java.util.Objects object PulsarMessageConverterSupport { - fun convertMessage2Spring(message: org.apache.pulsar.client.api.Message): Message<*> { + fun convertMessage2Spring( + message: org.apache.pulsar.client.api.Message, + ackCallback: AcknowledgmentCallback? = null + ): Message<*> { val messageBuilder = MessageBuilder.withPayload(message.data) if (!message.key.isNullOrEmpty()) { messageBuilder.setHeader(toPulsarHeaderKey(TypedMessageBuilder.CONF_KEY), message.key) @@ -56,6 +62,12 @@ object PulsarMessageConverterSupport { .setHeader(toPulsarHeaderKey(PUBLISH_TIME), message.publishTime) .setHeader(toPulsarHeaderKey(PRODUCER_NAME), message.producerName) .setHeader(toPulsarHeaderKey(TOPIC_NAME), message.topicName) + + // 如果提供了 ACK 回调,则添加到消息头中 + if (ackCallback != null) { + messageBuilder.setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, ackCallback) + } + addUserProperties(message.properties, messageBuilder) return messageBuilder.build() } diff --git a/version.txt b/version.txt index a8e8241..4fb5f66 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -1.1.1-SNAPSHOT \ No newline at end of file +1.1.2-SNAPSHOT \ No newline at end of file