Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 22 additions & 7 deletions Network/Kafka.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module Network.Kafka where
import Prelude

-- base
import Control.Applicative
import Control.Concurrent (threadDelay)
import Control.Exception (Exception, IOException)
import qualified Data.List.NonEmpty as NE
import Data.List.NonEmpty (NonEmpty(..))
Expand Down Expand Up @@ -70,6 +70,7 @@ data KafkaClientError = -- | A response did not contain an offset.
-- | Could not find a cached broker for the found leader.
| KafkaInvalidBroker Leader
| KafkaFailedToFetchMetadata
| KafkaFailedToFetchGroupCoordinator KafkaError
| KafkaIOException IOException
deriving (Eq, Generic, Show)

Expand Down Expand Up @@ -127,9 +128,9 @@ defaultRequestTimeout = 10000
defaultMinBytes :: MinBytes
defaultMinBytes = MinBytes 0

-- | Default: @1024 * 1024@
-- | Default: @32 * 1024 * 1024@
defaultMaxBytes :: MaxBytes
defaultMaxBytes = 1024 * 1024
defaultMaxBytes = 32 * 1024 * 1024

-- | Default: @0@
defaultMaxWaitTime :: MaxWaitTime
Expand Down Expand Up @@ -218,7 +219,9 @@ deleteTopicsRequest topic = DeleteTopicsReq ([topic], defaultRequestTimeout)


fetchOffset :: Kafka m => OffsetFetchRequest -> m OffsetFetchResponse
fetchOffset request = withAnyHandle $ flip fetchOffset' request
fetchOffset request@(OffsetFetchReq (group, _)) = do
coordinator <- getConsumerGroupCoordinator group
withBrokerHandle coordinator $ flip fetchOffset' request

fetchOffset' :: Kafka m => Handle -> OffsetFetchRequest -> m OffsetFetchResponse
fetchOffset' h request = makeRequest h $ OffsetFetchRR request
Expand All @@ -241,7 +244,9 @@ heartbeatRequest :: GroupId -> GenerationId -> MemberId -> HeartbeatRequest
heartbeatRequest genId gId memId = HeartbeatReq (genId, gId, memId)

commitOffset :: Kafka m => OffsetCommitRequest -> m OffsetCommitResponse
commitOffset request = withAnyHandle $ flip commitOffset' request
commitOffset request@(OffsetCommitReq (group, _, _, _, _)) = do
coordinator <- getConsumerGroupCoordinator group
withBrokerHandle coordinator $ flip commitOffset' request

commitOffset' ::
Kafka m => Handle -> OffsetCommitRequest -> m OffsetCommitResponse
Expand All @@ -253,8 +258,18 @@ commitOffsetRequest consumerGroup topic partition offset =
let time = -1
metadata_ = Metadata "milena"
in OffsetCommitReq
(consumerGroup, [(topic, [(partition, offset, time, metadata_)])])

(consumerGroup, -1, "", time, [(topic, [(partition, offset, metadata_)])])

getConsumerGroupCoordinator :: Kafka m => ConsumerGroup -> m Broker
getConsumerGroupCoordinator group = do
let theReq = GroupCoordinatorRR $ GroupCoordinatorReq group
(GroupCoordinatorResp (err, broker)) <- withAnyHandle $ flip makeRequest theReq
case err of
ConsumerCoordinatorNotAvailableCode -> do -- coordinator not ready, must retry with backoff
liftIO $ threadDelay 100000 -- todo something better than threadDelay?
getConsumerGroupCoordinator group
NoError -> return broker
other -> throwError $ KafkaFailedToFetchGroupCoordinator other

getTopicPartitionLeader :: Kafka m => TopicName -> Partition -> m Broker
getTopicPartitionLeader t p = do
Expand Down
14 changes: 12 additions & 2 deletions Network/Kafka/Protocol.hs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
{-# LANGUAGE NoDeriveAnyClass #-}

module Network.Kafka.Protocol
( module Network.Kafka.Protocol
) where
Expand Down Expand Up @@ -39,6 +41,7 @@ data ReqResp a where
DeleteTopicsRR :: MonadIO m => DeleteTopicsRequest -> ReqResp (m DeleteTopicsResponse)
OffsetCommitRR :: MonadIO m => OffsetCommitRequest -> ReqResp (m OffsetCommitResponse)
OffsetFetchRR :: MonadIO m => OffsetFetchRequest -> ReqResp (m OffsetFetchResponse)
GroupCoordinatorRR :: MonadIO m => GroupCoordinatorRequest -> ReqResp (m GroupCoordinatorResponse)

doRequest' :: (Deserializable a, MonadIO m) => CorrelationId -> Handle -> Request -> m (Either String a)
doRequest' correlationId h r = do
Expand All @@ -65,6 +68,7 @@ doRequest clientId correlationId h (TopicsRR req) = doRequest' correlationId h
doRequest clientId correlationId h (DeleteTopicsRR req) = doRequest' correlationId h $ Request (correlationId, clientId, DeleteTopicsRequest req)
doRequest clientId correlationId h (OffsetCommitRR req) = doRequest' correlationId h $ Request (correlationId, clientId, OffsetCommitRequest req)
doRequest clientId correlationId h (OffsetFetchRR req) = doRequest' correlationId h $ Request (correlationId, clientId, OffsetFetchRequest req)
doRequest clientId correlationId h (GroupCoordinatorRR req) = doRequest' correlationId h $ Request (correlationId, clientId, GroupCoordinatorRequest req)

class Serializable a where
serialize :: a -> Put
Expand Down Expand Up @@ -218,7 +222,11 @@ newtype GroupCoordinatorRequest = GroupCoordinatorReq ConsumerGroup deriving (Sh
newtype CreateTopicsRequest = CreateTopicsReq ([(TopicName, Partition, ReplicationFactor, [(Partition, Replicas)], [(KafkaString, Metadata)])], Timeout) deriving (Show, Eq, Serializable, Generic)
newtype DeleteTopicsRequest = DeleteTopicsReq ([TopicName], Timeout) deriving (Show, Eq, Serializable, Generic)

newtype OffsetCommitRequest = OffsetCommitReq (ConsumerGroup, [(TopicName, [(Partition, Offset, Time, Metadata)])]) deriving (Show, Eq, Serializable, Generic)
newtype OffsetCommitRequest = OffsetCommitReq (ConsumerGroup, ConsumerGroupGeneration, ConsumerId, Time, [(TopicName, [(Partition, Offset, Metadata)])]) deriving (Show, Eq, Serializable, Generic)
newtype ConsumerGroupGeneration = ConsumerGroupGeneration Int32 deriving (Show, Eq, Deserializable, Serializable, Num, Integral, Ord, Real, Enum)

newtype ConsumerId = ConsumerId KafkaString deriving (Show, Eq, Serializable, Deserializable, IsString)

newtype OffsetFetchRequest = OffsetFetchReq (ConsumerGroup, [(TopicName, [Partition])]) deriving (Show, Eq, Serializable, Generic)
newtype ConsumerGroup = ConsumerGroup KafkaString deriving (Show, Eq, Serializable, Deserializable, IsString, Generic)
newtype Metadata = Metadata KafkaString deriving (Show, Eq, Serializable, Deserializable, IsString, Generic)
Expand Down Expand Up @@ -508,7 +516,9 @@ requestBytes x = runPut $ do
where mr = runPut $ serialize x

apiVersion :: RequestMessage -> ApiVersion
apiVersion _ = ApiVersion 0 -- everything is at version 0 right now
apiVersion OffsetFetchRequest{} = 1 -- have to be V1 to use kafka storage to allow metadata
apiVersion OffsetCommitRequest{} = 2 -- use V2 commit to not deal with Timestamps, and get stored in Kafka
apiVersion _ = ApiVersion 0 -- everything else is at version 0 right now

apiKey :: RequestMessage -> ApiKey
apiKey ProduceRequest{} = ApiKey 0
Expand Down
6 changes: 3 additions & 3 deletions test/tests.hs
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ specs = do

let cleanup :: TopicName -> IO ()
cleanup topicName = do
run $ do
stateAddresses %= NE.cons ("localhost", 9092)
deleteTopic (deleteTopicsRequest topicName)
_ <- run $ do
stateAddresses %= NE.cons ("localhost", 9092)
deleteTopic (deleteTopicsRequest topicName)
pure ()

describe "can talk to local Kafka server" $ do
Expand Down