diff --git a/Network/Kafka.hs b/Network/Kafka.hs index 3635963..4f63456 100644 --- a/Network/Kafka.hs +++ b/Network/Kafka.hs @@ -3,7 +3,7 @@ module Network.Kafka where import Prelude -- base -import Control.Applicative +import Control.Concurrent (threadDelay) import Control.Exception (Exception, IOException) import qualified Data.List.NonEmpty as NE import Data.List.NonEmpty (NonEmpty(..)) @@ -70,6 +70,7 @@ data KafkaClientError = -- | A response did not contain an offset. -- | Could not find a cached broker for the found leader. | KafkaInvalidBroker Leader | KafkaFailedToFetchMetadata + | KafkaFailedToFetchGroupCoordinator KafkaError | KafkaIOException IOException deriving (Eq, Generic, Show) @@ -127,9 +128,9 @@ defaultRequestTimeout = 10000 defaultMinBytes :: MinBytes defaultMinBytes = MinBytes 0 --- | Default: @1024 * 1024@ +-- | Default: @32 * 1024 * 1024@ defaultMaxBytes :: MaxBytes -defaultMaxBytes = 1024 * 1024 +defaultMaxBytes = 32 * 1024 * 1024 -- | Default: @0@ defaultMaxWaitTime :: MaxWaitTime @@ -218,7 +219,9 @@ deleteTopicsRequest topic = DeleteTopicsReq ([topic], defaultRequestTimeout) fetchOffset :: Kafka m => OffsetFetchRequest -> m OffsetFetchResponse -fetchOffset request = withAnyHandle $ flip fetchOffset' request +fetchOffset request@(OffsetFetchReq (group, _)) = do + coordinator <- getConsumerGroupCoordinator group + withBrokerHandle coordinator $ flip fetchOffset' request fetchOffset' :: Kafka m => Handle -> OffsetFetchRequest -> m OffsetFetchResponse fetchOffset' h request = makeRequest h $ OffsetFetchRR request @@ -241,7 +244,9 @@ heartbeatRequest :: GroupId -> GenerationId -> MemberId -> HeartbeatRequest heartbeatRequest genId gId memId = HeartbeatReq (genId, gId, memId) commitOffset :: Kafka m => OffsetCommitRequest -> m OffsetCommitResponse -commitOffset request = withAnyHandle $ flip commitOffset' request +commitOffset request@(OffsetCommitReq (group, _, _, _, _)) = do + coordinator <- getConsumerGroupCoordinator group + withBrokerHandle coordinator $ flip commitOffset' request commitOffset' :: Kafka m => Handle -> OffsetCommitRequest -> m OffsetCommitResponse @@ -253,8 +258,18 @@ commitOffsetRequest consumerGroup topic partition offset = let time = -1 metadata_ = Metadata "milena" in OffsetCommitReq - (consumerGroup, [(topic, [(partition, offset, time, metadata_)])]) - + (consumerGroup, -1, "", time, [(topic, [(partition, offset, metadata_)])]) + +getConsumerGroupCoordinator :: Kafka m => ConsumerGroup -> m Broker +getConsumerGroupCoordinator group = do + let theReq = GroupCoordinatorRR $ GroupCoordinatorReq group + (GroupCoordinatorResp (err, broker)) <- withAnyHandle $ flip makeRequest theReq + case err of + ConsumerCoordinatorNotAvailableCode -> do -- coordinator not ready, must retry with backoff + liftIO $ threadDelay 100000 -- todo something better than threadDelay? + getConsumerGroupCoordinator group + NoError -> return broker + other -> throwError $ KafkaFailedToFetchGroupCoordinator other getTopicPartitionLeader :: Kafka m => TopicName -> Partition -> m Broker getTopicPartitionLeader t p = do diff --git a/Network/Kafka/Protocol.hs b/Network/Kafka/Protocol.hs index e0964ce..a34aa3f 100644 --- a/Network/Kafka/Protocol.hs +++ b/Network/Kafka/Protocol.hs @@ -1,3 +1,5 @@ +{-# LANGUAGE NoDeriveAnyClass #-} + module Network.Kafka.Protocol ( module Network.Kafka.Protocol ) where @@ -39,6 +41,7 @@ data ReqResp a where DeleteTopicsRR :: MonadIO m => DeleteTopicsRequest -> ReqResp (m DeleteTopicsResponse) OffsetCommitRR :: MonadIO m => OffsetCommitRequest -> ReqResp (m OffsetCommitResponse) OffsetFetchRR :: MonadIO m => OffsetFetchRequest -> ReqResp (m OffsetFetchResponse) + GroupCoordinatorRR :: MonadIO m => GroupCoordinatorRequest -> ReqResp (m GroupCoordinatorResponse) doRequest' :: (Deserializable a, MonadIO m) => CorrelationId -> Handle -> Request -> m (Either String a) doRequest' correlationId h r = do @@ -65,6 +68,7 @@ doRequest clientId correlationId h (TopicsRR req) = doRequest' correlationId h doRequest clientId correlationId h (DeleteTopicsRR req) = doRequest' correlationId h $ Request (correlationId, clientId, DeleteTopicsRequest req) doRequest clientId correlationId h (OffsetCommitRR req) = doRequest' correlationId h $ Request (correlationId, clientId, OffsetCommitRequest req) doRequest clientId correlationId h (OffsetFetchRR req) = doRequest' correlationId h $ Request (correlationId, clientId, OffsetFetchRequest req) +doRequest clientId correlationId h (GroupCoordinatorRR req) = doRequest' correlationId h $ Request (correlationId, clientId, GroupCoordinatorRequest req) class Serializable a where serialize :: a -> Put @@ -218,7 +222,11 @@ newtype GroupCoordinatorRequest = GroupCoordinatorReq ConsumerGroup deriving (Sh newtype CreateTopicsRequest = CreateTopicsReq ([(TopicName, Partition, ReplicationFactor, [(Partition, Replicas)], [(KafkaString, Metadata)])], Timeout) deriving (Show, Eq, Serializable, Generic) newtype DeleteTopicsRequest = DeleteTopicsReq ([TopicName], Timeout) deriving (Show, Eq, Serializable, Generic) -newtype OffsetCommitRequest = OffsetCommitReq (ConsumerGroup, [(TopicName, [(Partition, Offset, Time, Metadata)])]) deriving (Show, Eq, Serializable, Generic) +newtype OffsetCommitRequest = OffsetCommitReq (ConsumerGroup, ConsumerGroupGeneration, ConsumerId, Time, [(TopicName, [(Partition, Offset, Metadata)])]) deriving (Show, Eq, Serializable, Generic) +newtype ConsumerGroupGeneration = ConsumerGroupGeneration Int32 deriving (Show, Eq, Deserializable, Serializable, Num, Integral, Ord, Real, Enum) + +newtype ConsumerId = ConsumerId KafkaString deriving (Show, Eq, Serializable, Deserializable, IsString) + newtype OffsetFetchRequest = OffsetFetchReq (ConsumerGroup, [(TopicName, [Partition])]) deriving (Show, Eq, Serializable, Generic) newtype ConsumerGroup = ConsumerGroup KafkaString deriving (Show, Eq, Serializable, Deserializable, IsString, Generic) newtype Metadata = Metadata KafkaString deriving (Show, Eq, Serializable, Deserializable, IsString, Generic) @@ -508,7 +516,9 @@ requestBytes x = runPut $ do where mr = runPut $ serialize x apiVersion :: RequestMessage -> ApiVersion -apiVersion _ = ApiVersion 0 -- everything is at version 0 right now +apiVersion OffsetFetchRequest{} = 1 -- have to be V1 to use kafka storage to allow metadata +apiVersion OffsetCommitRequest{} = 2 -- use V2 commit to not deal with Timestamps, and get stored in Kafka +apiVersion _ = ApiVersion 0 -- everything else is at version 0 right now apiKey :: RequestMessage -> ApiKey apiKey ProduceRequest{} = ApiKey 0 diff --git a/test/tests.hs b/test/tests.hs index e4328df..42c9913 100644 --- a/test/tests.hs +++ b/test/tests.hs @@ -54,9 +54,9 @@ specs = do let cleanup :: TopicName -> IO () cleanup topicName = do - run $ do - stateAddresses %= NE.cons ("localhost", 9092) - deleteTopic (deleteTopicsRequest topicName) + _ <- run $ do + stateAddresses %= NE.cons ("localhost", 9092) + deleteTopic (deleteTopicsRequest topicName) pure () describe "can talk to local Kafka server" $ do