From c2999c580abb6671e589471ee8a3b74d14966d01 Mon Sep 17 00:00:00 2001 From: maheshnikam <55378196+nikam14@users.noreply.github.com> Date: Thu, 26 Sep 2024 14:50:55 +0530 Subject: [PATCH 1/6] improve getConsumer --- .../pulsar/functions/instance/ContextImpl.java | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index f613f749bd0fe..2b4b138f6eea2 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -122,6 +122,7 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable private static final String[] userMetricsLabelNames; private boolean exposePulsarAdminClientEnabled; + private boolean partionedTopicPresent; private List> inputConsumers; private final Map topicConsumers = new ConcurrentHashMap<>(); @@ -716,7 +717,11 @@ public void setInputConsumers(List> inputConsumers) { consumer instanceof MultiTopicsConsumerImpl ? ((MultiTopicsConsumerImpl) consumer).getConsumers().stream() : Stream.of(consumer)) - .forEach(consumer -> topicConsumers.putIfAbsent(TopicName.get(consumer.getTopic()), consumer)); + .forEach(consumer -> { + topicConsumers.putIfAbsent(TopicName.get(consumer.getTopic()), consumer) + if (consumer.getTopic().contains("-partition-")) { + partionedTopicPresent = true; + }}); } private void reloadConsumersFromMultiTopicsConsumers() { @@ -729,7 +734,11 @@ private void reloadConsumersFromMultiTopicsConsumers() { c instanceof MultiTopicsConsumerImpl ? ((MultiTopicsConsumerImpl) c).getConsumers().stream() : Stream.empty() // no changes expected in regular consumers - ).forEach(c -> topicConsumers.putIfAbsent(TopicName.get(c.getTopic()), c)); + ).forEach(c -> { + topicConsumers.putIfAbsent(TopicName.get(c.getTopic()), c) + if (c.getTopic().contains("-partition-")) { + partionedTopicPresent = true; + }}); } // returns null if consumer not found @@ -752,6 +761,11 @@ Consumer getConsumer(String topic, int partition) throws PulsarClientExceptio throw new PulsarClientException("Getting consumer is not supported"); } + if (partition != 0 && partionedTopicPresent == false) { + System.out.println("using condition"); + throw new PulsarClientException("No Partioned topic present"); + } + Consumer consumer = tryGetConsumer(topic, partition); if (consumer == null) { // MultiTopicsConsumer's list of consumers could change From 32967dbf5c5df9ed8c4e8275bb5e92bc48e5eaf2 Mon Sep 17 00:00:00 2001 From: maheshnikam <55378196+nikam14@users.noreply.github.com> Date: Thu, 26 Sep 2024 14:58:28 +0530 Subject: [PATCH 2/6] Update ContextImpl.java --- .../org/apache/pulsar/functions/instance/ContextImpl.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index 2b4b138f6eea2..2ea2cba5b5ea0 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -718,7 +718,7 @@ public void setInputConsumers(List> inputConsumers) { ? ((MultiTopicsConsumerImpl) consumer).getConsumers().stream() : Stream.of(consumer)) .forEach(consumer -> { - topicConsumers.putIfAbsent(TopicName.get(consumer.getTopic()), consumer) + topicConsumers.putIfAbsent(TopicName.get(consumer.getTopic()), consumer); if (consumer.getTopic().contains("-partition-")) { partionedTopicPresent = true; }}); @@ -735,7 +735,7 @@ private void reloadConsumersFromMultiTopicsConsumers() { ? ((MultiTopicsConsumerImpl) c).getConsumers().stream() : Stream.empty() // no changes expected in regular consumers ).forEach(c -> { - topicConsumers.putIfAbsent(TopicName.get(c.getTopic()), c) + topicConsumers.putIfAbsent(TopicName.get(c.getTopic()), c); if (c.getTopic().contains("-partition-")) { partionedTopicPresent = true; }}); @@ -762,7 +762,6 @@ Consumer getConsumer(String topic, int partition) throws PulsarClientExceptio } if (partition != 0 && partionedTopicPresent == false) { - System.out.println("using condition"); throw new PulsarClientException("No Partioned topic present"); } From 5cdb582710244cc83784e9943c303e8facd5c38a Mon Sep 17 00:00:00 2001 From: maheshnikam <55378196+nikam14@users.noreply.github.com> Date: Thu, 26 Sep 2024 15:07:44 +0530 Subject: [PATCH 3/6] Update ContextImpl.java --- .../org/apache/pulsar/functions/instance/ContextImpl.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index 2ea2cba5b5ea0..d68311d156e9b 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -721,7 +721,8 @@ public void setInputConsumers(List> inputConsumers) { topicConsumers.putIfAbsent(TopicName.get(consumer.getTopic()), consumer); if (consumer.getTopic().contains("-partition-")) { partionedTopicPresent = true; - }}); + } + }); } private void reloadConsumersFromMultiTopicsConsumers() { @@ -738,7 +739,8 @@ private void reloadConsumersFromMultiTopicsConsumers() { topicConsumers.putIfAbsent(TopicName.get(c.getTopic()), c); if (c.getTopic().contains("-partition-")) { partionedTopicPresent = true; - }}); + } + }); } // returns null if consumer not found @@ -761,7 +763,7 @@ Consumer getConsumer(String topic, int partition) throws PulsarClientExceptio throw new PulsarClientException("Getting consumer is not supported"); } - if (partition != 0 && partionedTopicPresent == false) { + if ((partition != 0) && (partionedTopicPresent == false)) { throw new PulsarClientException("No Partioned topic present"); } From 1b895a8a0e9015bd249a75c168ae3a81989fd2c8 Mon Sep 17 00:00:00 2001 From: maheshnikam <55378196+nikam14@users.noreply.github.com> Date: Thu, 26 Sep 2024 15:17:41 +0530 Subject: [PATCH 4/6] Update ContextImpl.java --- .../java/org/apache/pulsar/functions/instance/ContextImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index d68311d156e9b..63894a8066bfd 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -763,7 +763,7 @@ Consumer getConsumer(String topic, int partition) throws PulsarClientExceptio throw new PulsarClientException("Getting consumer is not supported"); } - if ((partition != 0) && (partionedTopicPresent == false)) { + if ((partition != 0) && (!partionedTopicPresent)) { throw new PulsarClientException("No Partioned topic present"); } From c6dba720d7f7237519e745325fcfac32c444f402 Mon Sep 17 00:00:00 2001 From: maheshnikam <55378196+nikam14@users.noreply.github.com> Date: Wed, 9 Oct 2024 16:43:26 +0530 Subject: [PATCH 5/6] added TopicName.isPartitioned() method --- .../pulsar/functions/instance/ContextImpl.java | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index 63894a8066bfd..d433f48d280c7 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -122,7 +122,6 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable private static final String[] userMetricsLabelNames; private boolean exposePulsarAdminClientEnabled; - private boolean partionedTopicPresent; private List> inputConsumers; private final Map topicConsumers = new ConcurrentHashMap<>(); @@ -717,12 +716,7 @@ public void setInputConsumers(List> inputConsumers) { consumer instanceof MultiTopicsConsumerImpl ? ((MultiTopicsConsumerImpl) consumer).getConsumers().stream() : Stream.of(consumer)) - .forEach(consumer -> { - topicConsumers.putIfAbsent(TopicName.get(consumer.getTopic()), consumer); - if (consumer.getTopic().contains("-partition-")) { - partionedTopicPresent = true; - } - }); + .forEach(consumer -> topicConsumers.putIfAbsent(TopicName.get(consumer.getTopic()), consumer)); } private void reloadConsumersFromMultiTopicsConsumers() { @@ -735,12 +729,7 @@ private void reloadConsumersFromMultiTopicsConsumers() { c instanceof MultiTopicsConsumerImpl ? ((MultiTopicsConsumerImpl) c).getConsumers().stream() : Stream.empty() // no changes expected in regular consumers - ).forEach(c -> { - topicConsumers.putIfAbsent(TopicName.get(c.getTopic()), c); - if (c.getTopic().contains("-partition-")) { - partionedTopicPresent = true; - } - }); + ).forEach(c -> topicConsumers.putIfAbsent(TopicName.get(c.getTopic()), c)); } // returns null if consumer not found @@ -763,7 +752,7 @@ Consumer getConsumer(String topic, int partition) throws PulsarClientExceptio throw new PulsarClientException("Getting consumer is not supported"); } - if ((partition != 0) && (!partionedTopicPresent)) { + if ((partition != 0) && (!TopicName.get(topic).isPartitioned())) { throw new PulsarClientException("No Partioned topic present"); } From 2a01dd5c10394b211a1242754328c89b45faba50 Mon Sep 17 00:00:00 2001 From: maheshnikam <55378196+nikam14@users.noreply.github.com> Date: Wed, 9 Oct 2024 16:45:35 +0530 Subject: [PATCH 6/6] covered test --- .../org/apache/pulsar/functions/instance/ContextImplTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java index cb4c93f153fd9..82daee9be5cea 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java @@ -335,6 +335,7 @@ FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(), Assert.fail("Expected exception"); } catch (PulsarClientException e) { // pass + Assert.assertTrue(e.getMessage().contains("No Partioned topic present")); } }