From fa875dcfaf39ba305520415d7b2bdd538f3dafe1 Mon Sep 17 00:00:00 2001 From: Jamshid Date: Fri, 22 May 2020 03:54:08 +0000 Subject: [PATCH 1/4] Changing the version of commit/fetchOffset to use metadata --- Network/Kafka/Protocol.hs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/Network/Kafka/Protocol.hs b/Network/Kafka/Protocol.hs index e0964ce..a8fc72c 100644 --- a/Network/Kafka/Protocol.hs +++ b/Network/Kafka/Protocol.hs @@ -508,7 +508,9 @@ requestBytes x = runPut $ do where mr = runPut $ serialize x apiVersion :: RequestMessage -> ApiVersion -apiVersion _ = ApiVersion 0 -- everything is at version 0 right now +apiVersion OffsetFetchRequest{} = 1 -- have to be V1 to use kafka storage to allow metadata +apiVersion OffsetCommitRequest{} = 1 -- have to be V1 to use kafka storage to allow metadata +apiVersion _ = ApiVersion 0 -- everything else is at version 0 right now apiKey :: RequestMessage -> ApiKey apiKey ProduceRequest{} = ApiKey 0 From 2efa48d2a10456a46da24090ffb6e92e28eabf04 Mon Sep 17 00:00:00 2001 From: Jamshid Date: Tue, 26 May 2020 00:44:37 +0000 Subject: [PATCH 2/4] Adding code to perform a GroupCoordinatorRequest, and modifying the OffsetCommitRequest to comply with the apiVersion=2 --- Network/Kafka.hs | 8 ++++---- Network/Kafka/Protocol.hs | 12 ++++++++++-- test/tests.hs | 6 +++--- 3 files changed, 17 insertions(+), 9 deletions(-) diff --git a/Network/Kafka.hs b/Network/Kafka.hs index 3635963..467edfd 100644 --- a/Network/Kafka.hs +++ b/Network/Kafka.hs @@ -3,7 +3,6 @@ module Network.Kafka where import Prelude -- base -import Control.Applicative import Control.Exception (Exception, IOException) import qualified Data.List.NonEmpty as NE import Data.List.NonEmpty (NonEmpty(..)) @@ -70,6 +69,7 @@ data KafkaClientError = -- | A response did not contain an offset. -- | Could not find a cached broker for the found leader. | KafkaInvalidBroker Leader | KafkaFailedToFetchMetadata + | KafkaFailedToFetchGroupCoordinator KafkaError | KafkaIOException IOException deriving (Eq, Generic, Show) @@ -127,9 +127,9 @@ defaultRequestTimeout = 10000 defaultMinBytes :: MinBytes defaultMinBytes = MinBytes 0 --- | Default: @1024 * 1024@ +-- | Default: @32 * 1024 * 1024@ defaultMaxBytes :: MaxBytes -defaultMaxBytes = 1024 * 1024 +defaultMaxBytes = 32 * 1024 * 1024 -- | Default: @0@ defaultMaxWaitTime :: MaxWaitTime @@ -253,7 +253,7 @@ commitOffsetRequest consumerGroup topic partition offset = let time = -1 metadata_ = Metadata "milena" in OffsetCommitReq - (consumerGroup, [(topic, [(partition, offset, time, metadata_)])]) + (consumerGroup, -1, "", time, [(topic, [(partition, offset, metadata_)])]) getTopicPartitionLeader :: Kafka m => TopicName -> Partition -> m Broker diff --git a/Network/Kafka/Protocol.hs b/Network/Kafka/Protocol.hs index a8fc72c..a34aa3f 100644 --- a/Network/Kafka/Protocol.hs +++ b/Network/Kafka/Protocol.hs @@ -1,3 +1,5 @@ +{-# LANGUAGE NoDeriveAnyClass #-} + module Network.Kafka.Protocol ( module Network.Kafka.Protocol ) where @@ -39,6 +41,7 @@ data ReqResp a where DeleteTopicsRR :: MonadIO m => DeleteTopicsRequest -> ReqResp (m DeleteTopicsResponse) OffsetCommitRR :: MonadIO m => OffsetCommitRequest -> ReqResp (m OffsetCommitResponse) OffsetFetchRR :: MonadIO m => OffsetFetchRequest -> ReqResp (m OffsetFetchResponse) + GroupCoordinatorRR :: MonadIO m => GroupCoordinatorRequest -> ReqResp (m GroupCoordinatorResponse) doRequest' :: (Deserializable a, MonadIO m) => CorrelationId -> Handle -> Request -> m (Either String a) doRequest' correlationId h r = do @@ -65,6 +68,7 @@ doRequest clientId correlationId h (TopicsRR req) = doRequest' correlationId h doRequest clientId correlationId h (DeleteTopicsRR req) = doRequest' correlationId h $ Request (correlationId, clientId, DeleteTopicsRequest req) doRequest clientId correlationId h (OffsetCommitRR req) = doRequest' correlationId h $ Request (correlationId, clientId, OffsetCommitRequest req) doRequest clientId correlationId h (OffsetFetchRR req) = doRequest' correlationId h $ Request (correlationId, clientId, OffsetFetchRequest req) +doRequest clientId correlationId h (GroupCoordinatorRR req) = doRequest' correlationId h $ Request (correlationId, clientId, GroupCoordinatorRequest req) class Serializable a where serialize :: a -> Put @@ -218,7 +222,11 @@ newtype GroupCoordinatorRequest = GroupCoordinatorReq ConsumerGroup deriving (Sh newtype CreateTopicsRequest = CreateTopicsReq ([(TopicName, Partition, ReplicationFactor, [(Partition, Replicas)], [(KafkaString, Metadata)])], Timeout) deriving (Show, Eq, Serializable, Generic) newtype DeleteTopicsRequest = DeleteTopicsReq ([TopicName], Timeout) deriving (Show, Eq, Serializable, Generic) -newtype OffsetCommitRequest = OffsetCommitReq (ConsumerGroup, [(TopicName, [(Partition, Offset, Time, Metadata)])]) deriving (Show, Eq, Serializable, Generic) +newtype OffsetCommitRequest = OffsetCommitReq (ConsumerGroup, ConsumerGroupGeneration, ConsumerId, Time, [(TopicName, [(Partition, Offset, Metadata)])]) deriving (Show, Eq, Serializable, Generic) +newtype ConsumerGroupGeneration = ConsumerGroupGeneration Int32 deriving (Show, Eq, Deserializable, Serializable, Num, Integral, Ord, Real, Enum) + +newtype ConsumerId = ConsumerId KafkaString deriving (Show, Eq, Serializable, Deserializable, IsString) + newtype OffsetFetchRequest = OffsetFetchReq (ConsumerGroup, [(TopicName, [Partition])]) deriving (Show, Eq, Serializable, Generic) newtype ConsumerGroup = ConsumerGroup KafkaString deriving (Show, Eq, Serializable, Deserializable, IsString, Generic) newtype Metadata = Metadata KafkaString deriving (Show, Eq, Serializable, Deserializable, IsString, Generic) @@ -509,7 +517,7 @@ requestBytes x = runPut $ do apiVersion :: RequestMessage -> ApiVersion apiVersion OffsetFetchRequest{} = 1 -- have to be V1 to use kafka storage to allow metadata -apiVersion OffsetCommitRequest{} = 1 -- have to be V1 to use kafka storage to allow metadata +apiVersion OffsetCommitRequest{} = 2 -- use V2 commit to not deal with Timestamps, and get stored in Kafka apiVersion _ = ApiVersion 0 -- everything else is at version 0 right now apiKey :: RequestMessage -> ApiKey diff --git a/test/tests.hs b/test/tests.hs index e4328df..42c9913 100644 --- a/test/tests.hs +++ b/test/tests.hs @@ -54,9 +54,9 @@ specs = do let cleanup :: TopicName -> IO () cleanup topicName = do - run $ do - stateAddresses %= NE.cons ("localhost", 9092) - deleteTopic (deleteTopicsRequest topicName) + _ <- run $ do + stateAddresses %= NE.cons ("localhost", 9092) + deleteTopic (deleteTopicsRequest topicName) pure () describe "can talk to local Kafka server" $ do From 295359c6fd1e60d632f33e8d6b4f0c6a7cf28de6 Mon Sep 17 00:00:00 2001 From: Jamshid Date: Tue, 26 May 2020 01:56:08 +0000 Subject: [PATCH 3/4] Adding a function to get the Group Coordinator --- Network/Kafka.hs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/Network/Kafka.hs b/Network/Kafka.hs index 467edfd..ff60597 100644 --- a/Network/Kafka.hs +++ b/Network/Kafka.hs @@ -3,6 +3,7 @@ module Network.Kafka where import Prelude -- base +import Control.Concurrent (threadDelay) import Control.Exception (Exception, IOException) import qualified Data.List.NonEmpty as NE import Data.List.NonEmpty (NonEmpty(..)) @@ -255,6 +256,16 @@ commitOffsetRequest consumerGroup topic partition offset = in OffsetCommitReq (consumerGroup, -1, "", time, [(topic, [(partition, offset, metadata_)])]) +getConsumerGroupCoordinator :: Kafka m => ConsumerGroup -> m Broker +getConsumerGroupCoordinator group = do + let theReq = GroupCoordinatorRR $ GroupCoordinatorReq group + (GroupCoordinatorResp (err, broker)) <- withAnyHandle $ flip makeRequest theReq + case err of + ConsumerCoordinatorNotAvailableCode -> do -- coordinator not ready, must retry with backoff + liftIO $ threadDelay 100000 -- todo something better than threadDelay? + getConsumerGroupCoordinator group + NoError -> return broker + other -> throwError $ KafkaFailedToFetchGroupCoordinator other getTopicPartitionLeader :: Kafka m => TopicName -> Partition -> m Broker getTopicPartitionLeader t p = do From 39dad725837f8d67721f3ac0f87a047ce36b2889 Mon Sep 17 00:00:00 2001 From: Jamshid Date: Tue, 26 May 2020 02:14:34 +0000 Subject: [PATCH 4/4] Running commit/fetchOffset at the correct machine. --- Network/Kafka.hs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/Network/Kafka.hs b/Network/Kafka.hs index ff60597..4f63456 100644 --- a/Network/Kafka.hs +++ b/Network/Kafka.hs @@ -219,7 +219,9 @@ deleteTopicsRequest topic = DeleteTopicsReq ([topic], defaultRequestTimeout) fetchOffset :: Kafka m => OffsetFetchRequest -> m OffsetFetchResponse -fetchOffset request = withAnyHandle $ flip fetchOffset' request +fetchOffset request@(OffsetFetchReq (group, _)) = do + coordinator <- getConsumerGroupCoordinator group + withBrokerHandle coordinator $ flip fetchOffset' request fetchOffset' :: Kafka m => Handle -> OffsetFetchRequest -> m OffsetFetchResponse fetchOffset' h request = makeRequest h $ OffsetFetchRR request @@ -242,7 +244,9 @@ heartbeatRequest :: GroupId -> GenerationId -> MemberId -> HeartbeatRequest heartbeatRequest genId gId memId = HeartbeatReq (genId, gId, memId) commitOffset :: Kafka m => OffsetCommitRequest -> m OffsetCommitResponse -commitOffset request = withAnyHandle $ flip commitOffset' request +commitOffset request@(OffsetCommitReq (group, _, _, _, _)) = do + coordinator <- getConsumerGroupCoordinator group + withBrokerHandle coordinator $ flip commitOffset' request commitOffset' :: Kafka m => Handle -> OffsetCommitRequest -> m OffsetCommitResponse