diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index f613f749bd0fe..d433f48d280c7 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -752,6 +752,10 @@ Consumer getConsumer(String topic, int partition) throws PulsarClientExceptio throw new PulsarClientException("Getting consumer is not supported"); } + if ((partition != 0) && (!TopicName.get(topic).isPartitioned())) { + throw new PulsarClientException("No Partioned topic present"); + } + Consumer consumer = tryGetConsumer(topic, partition); if (consumer == null) { // MultiTopicsConsumer's list of consumers could change diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java index cb4c93f153fd9..82daee9be5cea 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java @@ -335,6 +335,7 @@ FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(), Assert.fail("Expected exception"); } catch (PulsarClientException e) { // pass + Assert.assertTrue(e.getMessage().contains("No Partioned topic present")); } }