From 4f8f0a69109ae1d7fe30f36b15fbfe0a7cbf33fe Mon Sep 17 00:00:00 2001 From: baojizhong Date: Wed, 27 Feb 2019 10:53:44 +0800 Subject: [PATCH] add createConsumers() --- .../client/BaseCarreraConsumerPool.java | 55 ++++++++----------- 1 file changed, 23 insertions(+), 32 deletions(-) diff --git a/carrera-sdk/consumer/java/carrera-consumer-sdk/src/main/java/com/xiaojukeji/carrera/consumer/thrift/client/BaseCarreraConsumerPool.java b/carrera-sdk/consumer/java/carrera-consumer-sdk/src/main/java/com/xiaojukeji/carrera/consumer/thrift/client/BaseCarreraConsumerPool.java index f89e1fc..03252ec 100644 --- a/carrera-sdk/consumer/java/carrera-consumer-sdk/src/main/java/com/xiaojukeji/carrera/consumer/thrift/client/BaseCarreraConsumerPool.java +++ b/carrera-sdk/consumer/java/carrera-consumer-sdk/src/main/java/com/xiaojukeji/carrera/consumer/thrift/client/BaseCarreraConsumerPool.java @@ -82,47 +82,38 @@ protected void baseStartConsume(BaseMessageProcessor processor, int concurrency, } private void startConsume(BaseMessageProcessor processor, int concurrency, Map extraConcurrency, List servers) { - int totalThreads = concurrency > 0 ? Math.max(concurrency, servers.size()) : 0; + int serverCnt = servers.size(); + int totalThreads = concurrency > 0 ? Math.max(concurrency, serverCnt) : 0; for (Integer topicConcurrency : extraConcurrency.values()) { - totalThreads += topicConcurrency > 0 ? Math.max(topicConcurrency, servers.size()) : 0; + totalThreads += topicConcurrency > 0 ? Math.max(topicConcurrency, serverCnt) : 0; } if (totalThreads == 0) { throw new RuntimeException("concurrency is too small, at least one for each server."); } executorService = Executors.newFixedThreadPool(totalThreads, new ThreadFactoryBuilder().setNameFormat("MessageProcess-%d").build()); + createConsumers(processor, concurrency, servers, null); + for (Map.Entry entry : extraConcurrency.entrySet()) { + int c = entry.getValue(); + String topic = entry.getKey(); + createConsumers(processor, c, servers, topic); + } + } - Collections.shuffle(servers); - + protected void createConsumers(BaseMessageProcessor processor, int concurrency, List servers, String topic) { + if (concurrency <= 0) { + return; + } int serverCnt = servers.size(); - if (concurrency > 0) { - if (concurrency < serverCnt) { - LOGGER.warn("concurrency({}) entry : extraConcurrency.entrySet()) { - int c = entry.getValue(); - if (c == 0) continue; - if (c < serverCnt) { - LOGGER.warn("concurrency({})