From 93d9d3ee834c8e3ef96af505da880ef703fb0fc5 Mon Sep 17 00:00:00 2001 From: wadewu Date: Fri, 30 Oct 2020 00:14:47 -0700 Subject: [PATCH 1/3] throttling added into AzPubSubAclAuthorizer. --- .../azpubsub/security/auth/ThreadCounterTimerTask.java | 4 ++++ .../microsoft/azpubsub/security/auth/TopicThreadCounter.java | 4 ++++ 2 files changed, 8 insertions(+) create mode 100644 azpubsub/src/main/java/com/microsoft/azpubsub/security/auth/ThreadCounterTimerTask.java create mode 100644 azpubsub/src/main/java/com/microsoft/azpubsub/security/auth/TopicThreadCounter.java diff --git a/azpubsub/src/main/java/com/microsoft/azpubsub/security/auth/ThreadCounterTimerTask.java b/azpubsub/src/main/java/com/microsoft/azpubsub/security/auth/ThreadCounterTimerTask.java new file mode 100644 index 0000000000000..de9cfabba3d66 --- /dev/null +++ b/azpubsub/src/main/java/com/microsoft/azpubsub/security/auth/ThreadCounterTimerTask.java @@ -0,0 +1,4 @@ +package com.microsoft.azpubsub.security.auth; + +public class ThreadCounterTimerTask { +} diff --git a/azpubsub/src/main/java/com/microsoft/azpubsub/security/auth/TopicThreadCounter.java b/azpubsub/src/main/java/com/microsoft/azpubsub/security/auth/TopicThreadCounter.java new file mode 100644 index 0000000000000..e1037fb47ffcd --- /dev/null +++ b/azpubsub/src/main/java/com/microsoft/azpubsub/security/auth/TopicThreadCounter.java @@ -0,0 +1,4 @@ +package com.microsoft.azpubsub.security.auth; + +public class TopicThreadCounter { +} From eed5925632d93c9146833d0cc61dd46e5e3d7f95 Mon Sep 17 00:00:00 2001 From: wadewu Date: Fri, 30 Oct 2020 00:15:54 -0700 Subject: [PATCH 2/3] adding topic level throttling into AzPubSubAclAuthorizer. --- .../security/auth/AzPubSubConfig.java | 10 +++ .../security/auth/ThreadCounterTimerTask.java | 27 +++++++- .../security/auth/TopicThreadCounter.java | 34 ++++++++++ .../security/auth/AzPubSubAclAuthorizer.scala | 41 ++++++++++-- config/log4j.properties | 19 +++--- config/server.properties | 63 ++++++++++++++++--- 6 files changed, 171 insertions(+), 23 deletions(-) diff --git a/azpubsub/src/main/java/com/microsoft/azpubsub/security/auth/AzPubSubConfig.java b/azpubsub/src/main/java/com/microsoft/azpubsub/security/auth/AzPubSubConfig.java index eb63f1c66f646..af8577a9f3eb7 100644 --- a/azpubsub/src/main/java/com/microsoft/azpubsub/security/auth/AzPubSubConfig.java +++ b/azpubsub/src/main/java/com/microsoft/azpubsub/security/auth/AzPubSubConfig.java @@ -42,6 +42,16 @@ public class AzPubSubConfig extends AbstractConfig { "", Importance.MEDIUM, DSTS_METADATA_FILE_DOC) + .define("azpubsub.topic.max.qps", + Type.INT, + 1000, + Importance.MEDIUM, + "Topic Qps") + .define("azpubsub.enable.topic.qps.throttling", + Type.BOOLEAN, + true, + Importance.MEDIUM, + "Topic Qps throttling enabled") ; } diff --git a/azpubsub/src/main/java/com/microsoft/azpubsub/security/auth/ThreadCounterTimerTask.java b/azpubsub/src/main/java/com/microsoft/azpubsub/security/auth/ThreadCounterTimerTask.java index de9cfabba3d66..8781df3231325 100644 --- a/azpubsub/src/main/java/com/microsoft/azpubsub/security/auth/ThreadCounterTimerTask.java +++ b/azpubsub/src/main/java/com/microsoft/azpubsub/security/auth/ThreadCounterTimerTask.java @@ -1,4 +1,29 @@ package com.microsoft.azpubsub.security.auth; -public class ThreadCounterTimerTask { +import java.util.TimerTask; + +public class ThreadCounterTimerTask extends TimerTask { + private String topicName = null; + private int threadCount = 0; + private Long ioThreadId = 0L; + TopicThreadCounter topicThreadCounterInstance = TopicThreadCounter.getInstance(5000L); + + public void setTopicName(String topic) { + topicName = topic; + } + + public void setIoThreadId(Long threadId) { + ioThreadId = threadId; + } + + public int getThreadCount() { + return threadCount; + } + + @Override + public void run() { + if(null != topicName) { + threadCount = topicThreadCounterInstance.add(topicName, System.currentTimeMillis(), ioThreadId); + } + } } diff --git a/azpubsub/src/main/java/com/microsoft/azpubsub/security/auth/TopicThreadCounter.java b/azpubsub/src/main/java/com/microsoft/azpubsub/security/auth/TopicThreadCounter.java index e1037fb47ffcd..3c262f4689af0 100644 --- a/azpubsub/src/main/java/com/microsoft/azpubsub/security/auth/TopicThreadCounter.java +++ b/azpubsub/src/main/java/com/microsoft/azpubsub/security/auth/TopicThreadCounter.java @@ -1,4 +1,38 @@ package com.microsoft.azpubsub.security.auth; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + public class TopicThreadCounter { + private static TopicThreadCounter instance = null; + private static Object lock = new Object(); + private Long interval = null; + ConcurrentHashMap> topicThreadMap = new ConcurrentHashMap<>(); + + public TopicThreadCounter(Long intvl) { + interval = intvl; + } + + static TopicThreadCounter getInstance(Long interval) { + synchronized (lock) { + if(null == instance) { + instance = new TopicThreadCounter(interval); + } + } + return instance; + } + + public int add(String topic, Long currentTimeInMs, Long threadId) { + if(!topicThreadMap.containsKey(topic)) { + topicThreadMap.put(topic, new TreeMap<>()); + } + topicThreadMap.get(topic).put(currentTimeInMs, threadId); + NavigableMap subMap= topicThreadMap.get(topic).tailMap(currentTimeInMs - interval, false); + HashSet hs = new HashSet<>(); + for(Map.Entry element: subMap.entrySet()) { + hs.add((Long)element.getValue()); + } + topicThreadMap.put(topic, new TreeMap<>(subMap)); + return hs.size(); + } } diff --git a/azpubsub/src/main/scala/com/microsoft/azpubsub/security/auth/AzPubSubAclAuthorizer.scala b/azpubsub/src/main/scala/com/microsoft/azpubsub/security/auth/AzPubSubAclAuthorizer.scala index 0128b8c9e9d0c..628bd448f15df 100644 --- a/azpubsub/src/main/scala/com/microsoft/azpubsub/security/auth/AzPubSubAclAuthorizer.scala +++ b/azpubsub/src/main/scala/com/microsoft/azpubsub/security/auth/AzPubSubAclAuthorizer.scala @@ -1,19 +1,19 @@ package com.microsoft.azpubsub.security.auth +import java.util +import java.util.{Timer, TimerTask} import java.util.concurrent._ import scala.collection.JavaConverters.asScalaSetConverter - import com.yammer.metrics.core.{Meter, MetricName} - import org.apache.kafka.common.security.auth.KafkaPrincipal - import kafka.metrics.KafkaMetricsGroup import kafka.network.RequestChannel.Session -import kafka.security.auth.Operation -import kafka.security.auth.Resource -import kafka.security.auth.SimpleAclAuthorizer +import kafka.security.auth.{Operation, Resource, SimpleAclAuthorizer, Topic} import kafka.utils.Logging +import kafka.utils.timer.{SystemTimer, TimerTask} + +import scala.collection.mutable /* * AzPubSub ACL Authorizer to handle the certificate & role based principal type @@ -23,10 +23,39 @@ class AzPubSubAclAuthorizer extends SimpleAclAuthorizer with Logging with KafkaM explicitMetricName("azpubsub.security", "AuthorizerMetrics", name, metricTags) } + override def configure(javaConfigs: util.Map[String, _]): Unit = { + super.configure(javaConfigs) + config = AzPubSubConfig.fromProps(javaConfigs) + } + + private var config: AzPubSubConfig = null private val successRate: Meter = newMeter("AuthorizerSuccessPerSec", "success", TimeUnit.SECONDS) private val failureRate: Meter = newMeter("AuthorizerFailurePerSec", "failure", TimeUnit.SECONDS) + private val topicThreadCounterTimerTask = new ThreadCounterTimerTask() + private val trigger: Timer = new Timer(true) + trigger.scheduleAtFixedRate(topicThreadCounterTimerTask, 5, 5000) + var ints = new mutable.TreeSet[Long] override def authorize(session: Session, operation: Operation, resource: Resource): Boolean = { + + if(config.getBoolean("azpubsub.enable.topic.qps.throttling") && resource.resourceType.equals( Topic )) { + topicThreadCounterTimerTask.setTopicName(resource.name) + topicThreadCounterTimerTask.setIoThreadId(Thread.currentThread().getId); + + val threadCount = Math.max(topicThreadCounterTimerTask.getThreadCount, 1) + + var count = 1 + while (ints.size * threadCount > config.getInt("azpubsub.topic.max.qps")) { + val pivot = ints.minBy(x => x > System.currentTimeMillis - 1000) + val (_, after) = ints.partition(x => x > pivot) + ints = after + Thread.sleep(count) + count *= 2 + } + + ints += System.currentTimeMillis + } + val sessionPrincipal = session.principal if (classOf[AzPubSubPrincipal] == sessionPrincipal.getClass) { val principal = sessionPrincipal.asInstanceOf[AzPubSubPrincipal] diff --git a/config/log4j.properties b/config/log4j.properties index 6e7f82bd20ad6..20ca886aa7219 100644 --- a/config/log4j.properties +++ b/config/log4j.properties @@ -14,8 +14,8 @@ # limitations under the License. # Unspecified loggers and loggers with additivity=true output to server.log and stdout -# Note that INFO only applies to unspecified loggers, the log level of the child logger is used otherwise -log4j.rootLogger=INFO, stdout, kafkaAppender +# Note that DEBUG only applies to unspecified loggers, the log level of the child logger is used otherwise +log4j.rootLogger=DEBUG, stdout, kafkaAppender log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout @@ -57,12 +57,13 @@ log4j.appender.authorizerAppender.File=${kafka.logs.dir}/kafka-authorizer.log log4j.appender.authorizerAppender.layout=org.apache.log4j.PatternLayout log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n -# Change the line below to adjust ZK client logging -log4j.logger.org.apache.zookeeper=INFO +# Change the two lines below to adjust ZK client logging +log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG +log4j.logger.org.apache.zookeeper=DEBUG # Change the two lines below to adjust the general broker logging level (output to server.log and stdout) -log4j.logger.kafka=INFO -log4j.logger.org.apache.kafka=INFO +log4j.logger.kafka=DEBUG +log4j.logger.org.apache.kafka=DEBUG # Change to DEBUG or TRACE to enable request logging log4j.logger.kafka.request.logger=WARN, requestAppender @@ -79,7 +80,7 @@ log4j.additivity.kafka.network.RequestChannel$=false log4j.logger.kafka.controller=TRACE, controllerAppender log4j.additivity.kafka.controller=false -log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender +log4j.logger.kafka.log.LogCleaner=DEBUG, cleanerAppender log4j.additivity.kafka.log.LogCleaner=false log4j.logger.kafka.log.SkimpyOffsetMap=INFO, cleanerAppender @@ -88,7 +89,7 @@ log4j.additivity.kafka.log.SkimpyOffsetMap=false log4j.logger.state.change.logger=TRACE, stateChangeAppender log4j.additivity.state.change.logger=false -# Access denials are logged at INFO level, change to DEBUG to also log allowed accesses -log4j.logger.kafka.authorizer.logger=INFO, authorizerAppender +# Access denials are logged at DEBUG level, change to DEBUG to also log allowed accesses +log4j.logger.kafka.authorizer.logger=DEBUG, authorizerAppender log4j.additivity.kafka.authorizer.logger=false diff --git a/config/server.properties b/config/server.properties index 46208b1523f63..abb7a3df61396 100644 --- a/config/server.properties +++ b/config/server.properties @@ -28,21 +28,65 @@ broker.id=0 # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 -#listeners=PLAINTEXT://:9092 -# Hostname and port the broker will advertise to producers and consumers. If not set, +listeners=PLAINTEXT://:9092 +advertised.listeners=PLAINTEXT://:9092 +listener.security.protocol.map=PLAINTEXT:PLAINTEXT + +# SSL Settings +#ssl.protocol= TLS +#ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 +#ssl.keystore.type=JKS +#ssl.keystore.location=D:\\Work\\AzPubSub\\SSL\\adityacerts\\certs\\broker_adinigamdev2_server.keystore.jks +#ssl.keystore.location=D:\\Work\\AzPubSub\\SSL\\scripts\\broker_WADE-Z240_server.keystore.jks +#ssl.keystore.password=abcdefgh +#ssl.key.password=abcdefgh +#ssl.truststore.type=JKS +#ssl.truststore.location=D:\\Work\\AzPubSub\\SSL\\adityacerts\\certs\\broker_adinigamdev2_server.truststore.jks +#ssl.truststore.location=D:\\Work\\AzPubSub\\SSL\\scripts\\broker_WADE-Z240_server.truststore.jks +#ssl.truststore.password=abcdefgh +#ssl.client.auth=required + +# using APPKI +ssl.protocol=TLSv1.2 +#ssl.keystore.type=Windows-MY +ssl.keymanager.algorithm=APPKIV2 +ssl.trustmanager.algorithm=AzPubSub +ssl.enabled.protocols=TLSv1.2 +# From kafka 2.0 onwards, host name verification of servers is enabled by default and getting auth failure errors were logged because, the kafka hostname (ip) didnt match the certificate CN (machinename). +# As the Kafka hostname (i.e. logical IP) and certificate CN doesnt match, disabling the hostname verification by setting the below property to empty +ssl.endpoint.identification.algorithm= +ssl.appki.provider.class=com.microsoft.autopilot.azpubsub.ssl.AzPubSubProvider +security.providers=com.microsoft.autopilot.azpubsub.ssl.AzPubSubSecurityProviderCreator +authorizer.class.name=com.microsoft.azpubsub.security.auth.AzPubSubAclAuthorizer +ssl.client.auth=required + +# Broker to Broker Settings +security.inter.broker.protocol=plaintext + +# SASL OAUTHBEARER +listener.name.sasl_plaintext.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredLoginStringClaim_sub="admin"; +sasl.enabled.mechanisms=OAUTHBEARER +listener.name.sasl_plaintext.oauthbearer.connections.max.reauth.ms=30000 + + +# backup, good settings. +#listener.name.sasl_plaintext.oauthbearer.sasl.server.callback.handler.class=com.microsoft.azpubsub.security.oauthbearer.OAuthAuthenticateValidatorCallbackHandler +#principal.builder.class=com.microsoft.azpubsub.security.authenticator.AzPubSubPrincipalBuilder +#allow.everyone.if.no.acl.found=true + + +# Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). -#advertised.listeners=PLAINTEXT://your.host.name:9092 +# advertised.listeners=PLAINTEXT://127.0.0.1:9092 # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details -#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL - # The number of threads that the server uses for receiving requests from the network and sending responses to the network num.network.threads=3 # The number of threads that the server uses for processing requests, which may include disk I/O -num.io.threads=8 +num.io.threads=3 # The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer.bytes=102400 @@ -58,6 +102,7 @@ socket.request.max.bytes=104857600 # A comma separated list of directories under which to store log files log.dirs=/tmp/kafka-logs +log.retention.minutes=5 # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across @@ -120,7 +165,7 @@ log.retention.check.interval.ms=300000 # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. -zookeeper.connect=localhost:2181 +zookeeper.connect=127.0.0.1:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=6000 @@ -134,3 +179,7 @@ zookeeper.connection.timeout.ms=6000 # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing. # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup. group.initial.rebalance.delay.ms=0 + +# leader rebalance +auto.leader.rebalance.enable=true +#leader.imbalance.check.interval.seconds=10 From ce896d2f5372abcbe1b35263a4d955de36725379 Mon Sep 17 00:00:00 2001 From: wadewu Date: Fri, 13 Nov 2020 16:39:34 -0800 Subject: [PATCH 3/3] Supporting throttling at different level/combinations: topic, clientId, topic + clientId. --- .../security/auth/AzPubSubConfig.java | 20 ++- .../security/auth/ThreadCounterTimerTask.java | 18 ++- .../security/auth/TopicThreadCounter.java | 28 ++-- .../security/auth/AzPubSubAclAuthorizer.scala | 41 +----- .../auth/AzPubSubAclAuthorizerV2.scala | 135 ++++++++++++++++++ 5 files changed, 191 insertions(+), 51 deletions(-) create mode 100644 azpubsub/src/main/scala/com/microsoft/azpubsub/security/auth/AzPubSubAclAuthorizerV2.scala diff --git a/azpubsub/src/main/java/com/microsoft/azpubsub/security/auth/AzPubSubConfig.java b/azpubsub/src/main/java/com/microsoft/azpubsub/security/auth/AzPubSubConfig.java index af8577a9f3eb7..2ab710daa2302 100644 --- a/azpubsub/src/main/java/com/microsoft/azpubsub/security/auth/AzPubSubConfig.java +++ b/azpubsub/src/main/java/com/microsoft/azpubsub/security/auth/AzPubSubConfig.java @@ -47,12 +47,22 @@ public class AzPubSubConfig extends AbstractConfig { 1000, Importance.MEDIUM, "Topic Qps") - .define("azpubsub.enable.topic.qps.throttling", - Type.BOOLEAN, - true, + .define("azpubsub.qps.throttling.level", + Type.INT, + 0, + Importance.MEDIUM, + "Topic Qps throttling level. 0: throttling is disabled; 1: throttling at topic level; 2: throttling at clientId + topic level.") + .define("azpubsub.clientid.topic.max.qps", + Type.INT, + 1000, + Importance.MEDIUM, + "Topic Qps") + .define("azpubsub.timer.task.execution.interval.in.ms", + Type.LONG, + 300000, Importance.MEDIUM, - "Topic Qps throttling enabled") - ; + "The interval of timer background task executions, in milliseconds") + ; } public static AzPubSubConfig fromProps(Map configProviderProps) { diff --git a/azpubsub/src/main/java/com/microsoft/azpubsub/security/auth/ThreadCounterTimerTask.java b/azpubsub/src/main/java/com/microsoft/azpubsub/security/auth/ThreadCounterTimerTask.java index 8781df3231325..6a2704913f2e2 100644 --- a/azpubsub/src/main/java/com/microsoft/azpubsub/security/auth/ThreadCounterTimerTask.java +++ b/azpubsub/src/main/java/com/microsoft/azpubsub/security/auth/ThreadCounterTimerTask.java @@ -4,14 +4,25 @@ public class ThreadCounterTimerTask extends TimerTask { private String topicName = null; + private String clientId = null; private int threadCount = 0; private Long ioThreadId = 0L; - TopicThreadCounter topicThreadCounterInstance = TopicThreadCounter.getInstance(5000L); + private int throttlingLevel = 0; + private Long intervalInMs = 300000L; + private TopicThreadCounter topicThreadCounterInstance = null; + + public ThreadCounterTimerTask(long interval, int level) { + intervalInMs = interval; + throttlingLevel = level; + topicThreadCounterInstance = TopicThreadCounter.getInstance(this.intervalInMs, this.throttlingLevel); + } public void setTopicName(String topic) { topicName = topic; } + public void setClientId(String client) { clientId = client; } + public void setIoThreadId(Long threadId) { ioThreadId = threadId; } @@ -20,10 +31,13 @@ public int getThreadCount() { return threadCount; } + public void setThrottlingLevel (int level) { throttlingLevel = level; } + public int getThrottlingLevel () { return throttlingLevel; } + @Override public void run() { if(null != topicName) { - threadCount = topicThreadCounterInstance.add(topicName, System.currentTimeMillis(), ioThreadId); + threadCount = topicThreadCounterInstance.add(topicName, System.currentTimeMillis(), ioThreadId, clientId); } } } diff --git a/azpubsub/src/main/java/com/microsoft/azpubsub/security/auth/TopicThreadCounter.java b/azpubsub/src/main/java/com/microsoft/azpubsub/security/auth/TopicThreadCounter.java index 3c262f4689af0..5bb7e24442e33 100644 --- a/azpubsub/src/main/java/com/microsoft/azpubsub/security/auth/TopicThreadCounter.java +++ b/azpubsub/src/main/java/com/microsoft/azpubsub/security/auth/TopicThreadCounter.java @@ -7,32 +7,42 @@ public class TopicThreadCounter { private static TopicThreadCounter instance = null; private static Object lock = new Object(); private Long interval = null; + private int throttlingLevel = 0; ConcurrentHashMap> topicThreadMap = new ConcurrentHashMap<>(); - public TopicThreadCounter(Long intvl) { + public TopicThreadCounter(Long intvl, int level) { interval = intvl; + throttlingLevel = level; } - static TopicThreadCounter getInstance(Long interval) { + static TopicThreadCounter getInstance(Long interval, int level) { synchronized (lock) { if(null == instance) { - instance = new TopicThreadCounter(interval); + instance = new TopicThreadCounter(interval, level); } } return instance; } - public int add(String topic, Long currentTimeInMs, Long threadId) { - if(!topicThreadMap.containsKey(topic)) { - topicThreadMap.put(topic, new TreeMap<>()); + public int add(String topic, Long currentTimeInMs, Long threadId, String clientId) { + String key = TopicThreadCounter.makeKey(this.throttlingLevel, topic, clientId, threadId); + if(!topicThreadMap.containsKey(key)) { + topicThreadMap.put(key, new TreeMap<>()); } - topicThreadMap.get(topic).put(currentTimeInMs, threadId); - NavigableMap subMap= topicThreadMap.get(topic).tailMap(currentTimeInMs - interval, false); + topicThreadMap.get(key).put(currentTimeInMs, threadId); + NavigableMap subMap= topicThreadMap.get(key).tailMap(currentTimeInMs - interval, false); HashSet hs = new HashSet<>(); for(Map.Entry element: subMap.entrySet()) { hs.add((Long)element.getValue()); } - topicThreadMap.put(topic, new TreeMap<>(subMap)); + topicThreadMap.put(key, new TreeMap<>(subMap)); return hs.size(); } + + public static String makeKey(int throttlingLevel, String topic, String clientId, Long threaId) { + if(1 == throttlingLevel) return String.format("ClientId:%s|ThreadId:%d|Topic:%s", clientId, threaId, topic); + else if(2 == throttlingLevel) return String.format("ClientId:%s|ThreadId:%d", clientId, threaId); + else if(3 == throttlingLevel) return String.format("Topic:%s|ThreadId:%d", topic, threaId); + return topic; + } } diff --git a/azpubsub/src/main/scala/com/microsoft/azpubsub/security/auth/AzPubSubAclAuthorizer.scala b/azpubsub/src/main/scala/com/microsoft/azpubsub/security/auth/AzPubSubAclAuthorizer.scala index 628bd448f15df..0128b8c9e9d0c 100644 --- a/azpubsub/src/main/scala/com/microsoft/azpubsub/security/auth/AzPubSubAclAuthorizer.scala +++ b/azpubsub/src/main/scala/com/microsoft/azpubsub/security/auth/AzPubSubAclAuthorizer.scala @@ -1,19 +1,19 @@ package com.microsoft.azpubsub.security.auth -import java.util -import java.util.{Timer, TimerTask} import java.util.concurrent._ import scala.collection.JavaConverters.asScalaSetConverter + import com.yammer.metrics.core.{Meter, MetricName} + import org.apache.kafka.common.security.auth.KafkaPrincipal + import kafka.metrics.KafkaMetricsGroup import kafka.network.RequestChannel.Session -import kafka.security.auth.{Operation, Resource, SimpleAclAuthorizer, Topic} +import kafka.security.auth.Operation +import kafka.security.auth.Resource +import kafka.security.auth.SimpleAclAuthorizer import kafka.utils.Logging -import kafka.utils.timer.{SystemTimer, TimerTask} - -import scala.collection.mutable /* * AzPubSub ACL Authorizer to handle the certificate & role based principal type @@ -23,39 +23,10 @@ class AzPubSubAclAuthorizer extends SimpleAclAuthorizer with Logging with KafkaM explicitMetricName("azpubsub.security", "AuthorizerMetrics", name, metricTags) } - override def configure(javaConfigs: util.Map[String, _]): Unit = { - super.configure(javaConfigs) - config = AzPubSubConfig.fromProps(javaConfigs) - } - - private var config: AzPubSubConfig = null private val successRate: Meter = newMeter("AuthorizerSuccessPerSec", "success", TimeUnit.SECONDS) private val failureRate: Meter = newMeter("AuthorizerFailurePerSec", "failure", TimeUnit.SECONDS) - private val topicThreadCounterTimerTask = new ThreadCounterTimerTask() - private val trigger: Timer = new Timer(true) - trigger.scheduleAtFixedRate(topicThreadCounterTimerTask, 5, 5000) - var ints = new mutable.TreeSet[Long] override def authorize(session: Session, operation: Operation, resource: Resource): Boolean = { - - if(config.getBoolean("azpubsub.enable.topic.qps.throttling") && resource.resourceType.equals( Topic )) { - topicThreadCounterTimerTask.setTopicName(resource.name) - topicThreadCounterTimerTask.setIoThreadId(Thread.currentThread().getId); - - val threadCount = Math.max(topicThreadCounterTimerTask.getThreadCount, 1) - - var count = 1 - while (ints.size * threadCount > config.getInt("azpubsub.topic.max.qps")) { - val pivot = ints.minBy(x => x > System.currentTimeMillis - 1000) - val (_, after) = ints.partition(x => x > pivot) - ints = after - Thread.sleep(count) - count *= 2 - } - - ints += System.currentTimeMillis - } - val sessionPrincipal = session.principal if (classOf[AzPubSubPrincipal] == sessionPrincipal.getClass) { val principal = sessionPrincipal.asInstanceOf[AzPubSubPrincipal] diff --git a/azpubsub/src/main/scala/com/microsoft/azpubsub/security/auth/AzPubSubAclAuthorizerV2.scala b/azpubsub/src/main/scala/com/microsoft/azpubsub/security/auth/AzPubSubAclAuthorizerV2.scala new file mode 100644 index 0000000000000..9b28ae7118ea0 --- /dev/null +++ b/azpubsub/src/main/scala/com/microsoft/azpubsub/security/auth/AzPubSubAclAuthorizerV2.scala @@ -0,0 +1,135 @@ +package com.microsoft.azpubsub.security.auth + +import java.net.InetAddress +import java.util +import java.util.Timer +import java.util.concurrent._ + +import com.yammer.metrics.core.{Meter, MetricName} +import kafka.metrics.KafkaMetricsGroup +import kafka.security.authorizer.AclAuthorizer +import kafka.utils.Logging +import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} +import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult} + +import scala.collection.JavaConverters.asScalaSetConverter +import scala.collection.mutable + +/* + * AzPubSub ACL Authorizer to handle the certificate & role based principal type + */ + +object AzPubSubAclAuthorizerV2 { + val configThrottlingLevel = "azpubsub.qps.throttling.level" + val configTopicThrottlingQps = "azpubsub.topic.max.qps" + val configClientidTopicThrottlingQps = "azpubsub.clientid.topic.max.qps" + val configMetterSuccessRatePerSec = "AuthorizerSuccessPerSec" + val configMetterFailureRatePerSec = "AuthorizerFailurePerSec" + val configTimerTaskExecutionInterval = "azpubsub.timer.task.execution.interval.in.ms" +} + + +class AzPubSubAclAuthorizerV2 extends AclAuthorizer with Logging with KafkaMetricsGroup { + override def metricName(name: String, metricTags: scala.collection.Map[String, String]): MetricName = { + explicitMetricName("azpubsub.security", "AuthorizerMetrics", name, metricTags) + } + + override def configure(javaConfigs: util.Map[String, _]): Unit = { + super.configure(javaConfigs) + config = AzPubSubConfig.fromProps(javaConfigs) + throttlingLevel = config.getInt(AzPubSubAclAuthorizerV2.configThrottlingLevel) + throttlingTopicQps = config.getInt(AzPubSubAclAuthorizerV2.configTopicThrottlingQps) + throttlingClientIdTopicQps = config.getInt(AzPubSubAclAuthorizerV2.configTopicThrottlingQps) + timerTaskExecutionInterval = config.getLong(AzPubSubAclAuthorizerV2.configTimerTaskExecutionInterval) + topicThreadCounterTimerTask = new ThreadCounterTimerTask(timerTaskExecutionInterval, throttlingLevel) + trigger.scheduleAtFixedRate(topicThreadCounterTimerTask, 2, timerTaskExecutionInterval) + topicThreadCounterTimerTask.setIoThreadId(Thread.currentThread().getId) + } + + private var config: AzPubSubConfig = null + private var throttlingLevel: Int = 0 + private var throttlingTopicQps: Int = 0 + private var throttlingClientIdTopicQps: Int = 0 + private var timerTaskExecutionInterval: Long = 0 + private val successRate: Meter = newMeter(AzPubSubAclAuthorizerV2.configMetterSuccessRatePerSec, "success", TimeUnit.SECONDS) + private val failureRate: Meter = newMeter(AzPubSubAclAuthorizerV2.configMetterFailureRatePerSec, "failure", TimeUnit.SECONDS) + private var topicThreadCounterTimerTask : ThreadCounterTimerTask = null + private val trigger: Timer = new Timer(true) + + var ints = new mutable.HashMap[String, mutable.TreeSet[Long]] + + override def authorize(requestContext: AuthorizableRequestContext, actions: util.List[Action]): util.List[AuthorizationResult] = { + if(throttlingLevel > 0) { + actions.forEach( a => if( a.resourcePattern().resourceType() == org.apache.kafka.common.resource.ResourceType.TOPIC) { + topicThreadCounterTimerTask.setTopicName(a.resourcePattern.name()) + topicThreadCounterTimerTask.setClientId(requestContext.clientId()) + + val key = makeKey(a.resourcePattern().name(), requestContext.clientId()) + if(!ints.contains(key)) ints.put(key, new mutable.TreeSet[Long]()) + + val threadCount = Math.max(topicThreadCounterTimerTask.getThreadCount, 1) + + var count = 1 + while (throttlingLevel == 1 && ints.get(key).size * threadCount > throttlingTopicQps || throttlingLevel == 2 && ints.get(key).size * threadCount > throttlingClientIdTopicQps) { + val pivot = ints.get(key).get.minBy(x => x > System.currentTimeMillis - 1000) + val (_, after) = ints.get(key).get.partition(x => x > pivot) + ints.put(key, after) + Thread.sleep(count) + count *= 2 + } + + ints.get(key).get += System.currentTimeMillis + } ) + } + + var res : util.List[AuthorizationResult] = null + + if (requestContext.principal().getClass == classOf[AzPubSubPrincipal]) { + val tmpPrincipal = requestContext.principal().asInstanceOf[AzPubSubPrincipal] + for(role <- tmpPrincipal.getRoles.asScala) { + val context : AuthorizableRequestContext = new AuthorizableRequestContext { + override def listenerName(): String = requestContext.listenerName() + + override def securityProtocol(): SecurityProtocol = requestContext.securityProtocol() + + override def principal(): KafkaPrincipal = { + new KafkaPrincipal(tmpPrincipal.getPrincipalType, role) + } + + override def clientAddress(): InetAddress = requestContext.clientAddress() + + override def requestType(): Int = requestContext.requestType() + + override def requestVersion(): Int = requestContext.requestVersion() + + override def clientId(): String = requestContext.clientId() + + override def correlationId(): Int = requestContext.correlationId() + } + res = super.authorize(context, actions) + if (res.contains(AuthorizationResult.ALLOWED) ) { + successRate.mark() + res + } + } + failureRate.mark() + return res + } + res = super.authorize(requestContext, actions) + if(null != res && res.contains(AuthorizationResult.ALLOWED)) { + successRate.mark() + } + else { + failureRate.mark() + } + return res + } + + private def makeKey(topic: String, clientId: String) : String = { + throttlingLevel match { + case 1 => String.format("ClientId:%s|Topic:%s", topic, clientId) + case 2 => String.format("ClientId:%s", clientId) + case _ => String.format("Topic:%s", topic) + } + } +}