Skip to content

Commit

Permalink
Merge pull request #111 from haskell-works/rebalance-pause
Browse files Browse the repository at this point in the history
Use assign/pause/redirect/unpause pattern
  • Loading branch information
AlexeyRaga authored Oct 21, 2019
2 parents 6db07e0 + 771ad31 commit 5901585
Show file tree
Hide file tree
Showing 11 changed files with 176 additions and 226 deletions.
1 change: 0 additions & 1 deletion hw-kafka-client.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ library
Kafka.Callbacks
Kafka.Consumer.Callbacks
Kafka.Consumer.Convert
Kafka.Internal.CancellationToken
Kafka.Internal.RdKafka
Kafka.Internal.Setup
Kafka.Internal.Shared
Expand Down
109 changes: 60 additions & 49 deletions src/Kafka/Consumer.hs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TupleSections #-}
module Kafka.Consumer
Expand All @@ -18,28 +19,27 @@ module Kafka.Consumer
)
where

import Control.Arrow (left, (&&&))
import Control.Concurrent (forkIO, rtsSupportsBoundThreads)
import Control.Exception (bracket)
import Control.Monad (forM_, void, when)
import Control.Monad.IO.Class (MonadIO (liftIO))
import Control.Monad.Trans.Except (ExceptT (ExceptT), runExceptT)
import Data.Bifunctor (bimap, first)
import qualified Data.ByteString as BS
import Data.IORef (readIORef, writeIORef)
import qualified Data.Map as M
import Data.Maybe (fromMaybe)
import Data.Monoid ((<>))
import Data.Set (Set)
import qualified Data.Set as Set
import qualified Data.Text as Text
import Foreign hiding (void)
import Kafka.Consumer.Convert (fromMessagePtr, fromNativeTopicPartitionList'', offsetCommitToBool, offsetToInt64, toMap, toNativeTopicPartitionList, toNativeTopicPartitionList', toNativeTopicPartitionListNoDispose, topicPartitionFromMessageForCommit)
import Kafka.Consumer.Types (KafkaConsumer (..))
import Kafka.Internal.CancellationToken as CToken
import Kafka.Internal.RdKafka (RdKafkaRespErrT (..), RdKafkaTopicPartitionListTPtr, RdKafkaTypeT (..), newRdKafkaT, newRdKafkaTopicPartitionListT, newRdKafkaTopicT, rdKafkaAssignment, rdKafkaCommit, rdKafkaCommitted, rdKafkaConfSetDefaultTopicConf, rdKafkaConsumeBatchQueue, rdKafkaConsumeQueue, rdKafkaConsumerClose, rdKafkaConsumerPoll, rdKafkaOffsetsStore, rdKafkaPausePartitions, rdKafkaPollSetConsumer, rdKafkaPosition, rdKafkaQueueDestroy, rdKafkaQueueNew, rdKafkaResumePartitions, rdKafkaSeek, rdKafkaSetLogLevel, rdKafkaSubscribe, rdKafkaSubscription, rdKafkaTopicConfDup, rdKafkaTopicPartitionListAdd)
import Kafka.Internal.Setup (Kafka (..), KafkaConf (..), KafkaProps (..), TopicConf (..), TopicProps (..), getRdKafka, kafkaConf, topicConf)
import Kafka.Internal.Shared (kafkaErrorToMaybe, maybeToLeft, rdKafkaErrorToEither)
import Control.Arrow (left, (&&&))
import Control.Concurrent (forkIO, modifyMVar, rtsSupportsBoundThreads, withMVar)
import Control.Exception (bracket)
import Control.Monad (forM_, mapM_, void, when)
import Control.Monad.IO.Class (MonadIO (liftIO))
import Control.Monad.Trans.Except (ExceptT (ExceptT), runExceptT)
import Data.Bifunctor (bimap, first)
import qualified Data.ByteString as BS
import Data.IORef (readIORef, writeIORef)
import qualified Data.Map as M
import Data.Maybe (fromMaybe)
import Data.Monoid ((<>))
import Data.Set (Set)
import qualified Data.Set as Set
import qualified Data.Text as Text
import Foreign hiding (void)
import Kafka.Consumer.Convert (fromMessagePtr, fromNativeTopicPartitionList'', offsetCommitToBool, offsetToInt64, toMap, toNativeTopicPartitionList, toNativeTopicPartitionList', toNativeTopicPartitionListNoDispose, topicPartitionFromMessageForCommit)
import Kafka.Consumer.Types (KafkaConsumer (..))
import Kafka.Internal.RdKafka (RdKafkaRespErrT (..), RdKafkaTopicPartitionListTPtr, RdKafkaTypeT (..), newRdKafkaT, newRdKafkaTopicPartitionListT, newRdKafkaTopicT, rdKafkaAssignment, rdKafkaCommit, rdKafkaCommitted, rdKafkaConfSetDefaultTopicConf, rdKafkaConsumeBatchQueue, rdKafkaConsumeQueue, rdKafkaConsumerClose, rdKafkaConsumerPoll, rdKafkaOffsetsStore, rdKafkaPausePartitions, rdKafkaPollSetConsumer, rdKafkaPosition, rdKafkaQueueDestroy, rdKafkaQueueNew, rdKafkaResumePartitions, rdKafkaSeek, rdKafkaSetLogLevel, rdKafkaSubscribe, rdKafkaSubscription, rdKafkaTopicConfDup, rdKafkaTopicPartitionListAdd)
import Kafka.Internal.Setup (CallbackPollStatus (..), Kafka (..), KafkaConf (..), KafkaProps (..), TopicConf (..), TopicProps (..), getKafkaConf, getRdKafka, kafkaConf, topicConf)
import Kafka.Internal.Shared (kafkaErrorToMaybe, maybeToLeft, rdKafkaErrorToEither)

import Kafka.Consumer.ConsumerProperties as X
import Kafka.Consumer.Subscription as X
Expand Down Expand Up @@ -69,17 +69,17 @@ newConsumer :: MonadIO m
-> Subscription
-> m (Either KafkaError KafkaConsumer)
newConsumer props (Subscription ts tp) = liftIO $ do
let cp = case cpUserPolls props of
CallbackModeAsync -> setCallback (rebalanceCallback (\_ _ -> return ())) <> props
CallbackModeSync -> props
kc@(KafkaConf kc' qref ct) <- newConsumerConf cp
let cp = case cpCallbackPollMode props of
CallbackPollModeAsync -> setCallback (rebalanceCallback (\_ _ -> return ())) <> props
CallbackPollModeSync -> props
kc@(KafkaConf kc' qref _) <- newConsumerConf cp
tp' <- topicConf (TopicProps tp)
_ <- setDefaultTopicConf kc tp'
rdk <- newRdKafkaT RdKafkaConsumer kc'
case rdk of
Left err -> return . Left $ KafkaError err
Right rdk' -> do
when (cpUserPolls props == CallbackModeAsync) $ do
when (cpCallbackPollMode props == CallbackPollModeAsync) $ do
msgq <- rdKafkaQueueNew rdk'
writeIORef qref (Just msgq)
let kafka = KafkaConsumer (Kafka rdk') kc
Expand All @@ -90,8 +90,8 @@ newConsumer props (Subscription ts tp) = liftIO $ do
forM_ (cpLogLevel cp) (setConsumerLogLevel kafka)
sub <- subscribe kafka ts
case sub of
Nothing -> (when (cpUserPolls props == CallbackModeAsync) $
runConsumerLoop kafka ct (Just $ Timeout 100)) >> return (Right kafka)
Nothing -> (when (cpCallbackPollMode props == CallbackPollModeAsync) $
runConsumerLoop kafka (Just $ Timeout 100)) >> return (Right kafka)
Just err -> closeConsumer kafka >> return (Left err)

pollMessage :: MonadIO m
Expand All @@ -102,9 +102,7 @@ pollMessage c@(KafkaConsumer _ (KafkaConf _ qr _)) (Timeout ms) = liftIO $ do
mbq <- readIORef qr
case mbq of
Nothing -> rdKafkaConsumerPoll (getRdKafka c) ms >>= fromMessagePtr
Just q -> do
pollConsumerEvents c Nothing
rdKafkaConsumeQueue q (fromIntegral ms) >>= fromMessagePtr
Just q -> rdKafkaConsumeQueue q (fromIntegral ms) >>= fromMessagePtr

-- | Polls up to BatchSize messages.
-- Unlike 'pollMessage' this function does not return usual "timeout" errors.
Expand Down Expand Up @@ -206,8 +204,7 @@ seek (KafkaConsumer (Kafka k) _) (Timeout timeout) tps = liftIO $
where
seekAll = runExceptT $ do
tr <- traverse (ExceptT . topicPair) tps
void $ traverse (\(kt, p, o) -> ExceptT (rdSeek kt p o)) tr
return ()
mapM_ (\(kt, p, o) -> ExceptT (rdSeek kt p o)) tr

rdSeek kt (PartitionId p) o =
rdKafkaErrorToEither <$> rdKafkaSeek kt (fromIntegral p) (offsetToInt64 o) timeout
Expand Down Expand Up @@ -253,17 +250,20 @@ position (KafkaConsumer (Kafka k) _) tps = liftIO $ do
-- frequent enough.
pollConsumerEvents :: KafkaConsumer -> Maybe Timeout -> IO ()
pollConsumerEvents k timeout =
let (Timeout tm) = fromMaybe (Timeout 0) timeout
in void $ rdKafkaConsumerPoll (getRdKafka k) tm
void . withCallbackPollEnabled k $ pollConsumerEvents' k timeout

-- | Closes the consumer.
closeConsumer :: MonadIO m => KafkaConsumer -> m (Maybe KafkaError)
closeConsumer (KafkaConsumer (Kafka k) (KafkaConf _ qr ct)) = liftIO $ do
CToken.cancel ct
mbq <- readIORef qr
void $ traverse rdKafkaQueueDestroy mbq
kafkaErrorToMaybe . KafkaResponseError <$> rdKafkaConsumerClose k

closeConsumer (KafkaConsumer (Kafka k) (KafkaConf _ qr statusVar)) = liftIO $
-- because closing the consumer will raise callbacks,
-- prevent the async loop from doing it at the same time
modifyMVar statusVar $ \_ -> do
-- librdkafka says:
-- Prior to destroying the client instance, loose your reference to the
-- background queue by calling rd_kafka_queue_destroy()
readIORef qr >>= mapM_ rdKafkaQueueDestroy
res <- kafkaErrorToMaybe . KafkaResponseError <$> rdKafkaConsumerClose k
pure (CallbackPollDisabled, res)
-----------------------------------------------------------------------------
newConsumerConf :: ConsumerProperties -> IO KafkaConf
newConsumerConf ConsumerProperties {cpProps = m, cpCallbacks = cbs} = do
Expand Down Expand Up @@ -304,13 +304,24 @@ redirectCallbacksPoll :: KafkaConsumer -> IO (Maybe KafkaError)
redirectCallbacksPoll (KafkaConsumer (Kafka k) _) =
kafkaErrorToMaybe . KafkaResponseError <$> rdKafkaPollSetConsumer k

runConsumerLoop :: KafkaConsumer -> CancellationToken -> Maybe Timeout -> IO ()
runConsumerLoop k ct timeout =
runConsumerLoop :: KafkaConsumer -> Maybe Timeout -> IO ()
runConsumerLoop k timeout =
when rtsSupportsBoundThreads $ void $ forkIO go
where
go = do
token <- CToken.status ct
case token of
Running -> pollConsumerEvents k timeout >> go
Cancelled -> return ()

st <- withCallbackPollEnabled k (pollConsumerEvents' k timeout)
case st of
CallbackPollEnabled -> go
CallbackPollDisabled -> pure ()

withCallbackPollEnabled :: KafkaConsumer -> IO () -> IO CallbackPollStatus
withCallbackPollEnabled k f = do
let statusVar = kcfgCallbackPollStatus (getKafkaConf k)
withMVar statusVar $ \case
CallbackPollEnabled -> f >> pure CallbackPollEnabled
CallbackPollDisabled -> pure CallbackPollDisabled

pollConsumerEvents' :: KafkaConsumer -> Maybe Timeout -> IO ()
pollConsumerEvents' k timeout =
let (Timeout tm) = fromMaybe (Timeout 0) timeout
in void $ rdKafkaConsumerPoll (getRdKafka k) tm
81 changes: 27 additions & 54 deletions src/Kafka/Consumer/Callbacks.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,21 @@ module Kafka.Consumer.Callbacks
)
where

import Control.Arrow ((&&&))
import Control.Concurrent (threadDelay)
import Control.Monad (forM_, void)
import Data.Monoid ((<>))
import qualified Data.Text as Text
import Foreign.ForeignPtr (newForeignPtr_)
import Foreign.Ptr (nullPtr)
import Kafka.Callbacks as X
import Kafka.Consumer.Convert (fromNativeTopicPartitionList', fromNativeTopicPartitionList'', toNativeTopicPartitionList)
import Kafka.Consumer.Types (KafkaConsumer (..), RebalanceEvent (..), TopicPartition (..))
import Kafka.Internal.RdKafka
import Kafka.Internal.Setup (HasKafka (..), HasKafkaConf (..), Kafka (..), KafkaConf (..), getRdMsgQueue)
import Kafka.Internal.Shared (kafkaErrorToMaybe)
import Kafka.Types (KafkaError (..), PartitionId (..), TopicName (..))
import Control.Arrow ((&&&))
import Control.Monad (forM_, void)
import Data.Monoid ((<>))
import Foreign.ForeignPtr (newForeignPtr_)
import Foreign.Ptr (nullPtr)
import Kafka.Callbacks as X
import Kafka.Consumer.Convert (fromNativeTopicPartitionList', fromNativeTopicPartitionList'')
import Kafka.Consumer.Types (KafkaConsumer (..), RebalanceEvent (..), TopicPartition (..))
import Kafka.Internal.RdKafka
import Kafka.Internal.Setup (HasKafka (..), HasKafkaConf (..), Kafka (..), KafkaConf (..), getRdMsgQueue)
import Kafka.Types (KafkaError (..), PartitionId (..), TopicName (..))

import qualified Data.Text as Text

-- | Sets a callback that is called when rebalance is needed.
--
-- Callback implementations suppose to watch for 'KafkaResponseError' 'RdKafkaRespErrAssignPartitions' and
-- for 'KafkaResponseError' 'RdKafkaRespErrRevokePartitions'. Other error codes are not expected and would indicate
-- something really bad happening in a system, or bugs in @librdkafka@ itself.
--
-- A callback is expected to call 'assign' according to the error code it receives.
--
-- * When 'RdKafkaRespErrAssignPartitions' happens 'assign' should be called with all the partitions it was called with.
-- It is OK to alter partitions offsets before calling 'assign'.
--
-- * When 'RdKafkaRespErrRevokePartitions' happens 'assign' should be called with an empty list of partitions.
-- rebalanceCallback :: (KafkaConsumer -> KafkaError -> [TopicPartition] -> IO ()) -> KafkaConf -> IO ()
rebalanceCallback :: (KafkaConsumer -> RebalanceEvent -> IO ()) -> KafkaConf -> IO ()
rebalanceCallback callback kc@(KafkaConf conf _ _) = rdKafkaConfSetRebalanceCb conf realCb
where
Expand All @@ -47,8 +34,6 @@ rebalanceCallback callback kc@(KafkaConf conf _ _) = rdKafkaConfSetRebalanceCb c
-- The results of automatic or manual offset commits will be scheduled
-- for this callback and is served by `pollMessage`.
--
-- A callback is expected to call 'assign' according to the error code it receives.
--
-- If no partitions had valid offsets to commit this callback will be called
-- with `KafkaError` == `KafkaResponseError` `RdKafkaRespErrNoOffset` which is not to be considered
-- an error.
Expand All @@ -75,41 +60,29 @@ setRebalanceCallback :: (KafkaConsumer -> RebalanceEvent -> IO ())
setRebalanceCallback f k e pls = do
ps <- fromNativeTopicPartitionList'' pls
let assignment = (tpTopicName &&& tpPartition) <$> ps
let (Kafka kptr) = getKafka k

case e of
KafkaResponseError RdKafkaRespErrAssignPartitions -> do
f k (RebalanceBeforeAssign assignment)
void $ rdKafkaAssign kptr pls

mbq <- getRdMsgQueue $ getKafkaConf k
case mbq of
Nothing -> pure ()
Just mq -> do
{- Magnus Edenhill:
If you redirect after assign() it means some messages may be forwarded to the single consumer queue,
so either do it before assign() or do: assign(); pause(); redirect; resume()
-}
void $ rdKafkaPausePartitions kptr pls
forM_ ps (\tp -> redirectPartitionQueue (getKafka k) (tpTopicName tp) (tpPartition tp) mq)
-- sleep for 1 second.
-- it looks like without it there is not enough time for redirect to happen
-- or something similarly strange. I don't understand it.
-- If you know WTH is going on PLEASE let me know because the current "fix" is ugly
-- and is completely unreasonable :(
threadDelay 1000000
f k (RebalanceBeforeAssign assignment)
void $ assign' k pls -- pass as pointer to avoid possible serialisation issues
void $ rdKafkaResumePartitions kptr pls

f k (RebalanceAssign assignment)

KafkaResponseError RdKafkaRespErrRevokePartitions -> do
f k (RebalanceBeforeRevoke assignment)
void $ assign k []
void $ newForeignPtr_ nullPtr >>= rdKafkaAssign kptr
f k (RebalanceRevoke assignment)
x -> error $ "Rebalance: UNKNOWN response: " <> show x

-- | Assigns specified partitions to a current consumer.
-- Assigning an empty list means unassigning from all partitions that are currently assigned.
assign :: KafkaConsumer -> [TopicPartition] -> IO (Maybe KafkaError)
assign (KafkaConsumer (Kafka k) _) ps =
let pl = if null ps
then newForeignPtr_ nullPtr
else toNativeTopicPartitionList ps
er = KafkaResponseError <$> (pl >>= rdKafkaAssign k)
in kafkaErrorToMaybe <$> er

-- | Assigns specified partitions to a current consumer.
-- Assigning an empty list means unassigning from all partitions that are currently assigned.
assign' :: KafkaConsumer -> RdKafkaTopicPartitionListTPtr -> IO (Maybe KafkaError)
assign' (KafkaConsumer (Kafka k) _) pls =
(kafkaErrorToMaybe . KafkaResponseError) <$> rdKafkaAssign k pls

32 changes: 16 additions & 16 deletions src/Kafka/Consumer/ConsumerProperties.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

module Kafka.Consumer.ConsumerProperties
( ConsumerProperties(..)
, CallbackMode(..)
, CallbackPollMode(..)
, brokersList
, autoCommit
, noAutoCommit
Expand Down Expand Up @@ -30,18 +30,18 @@ import Data.Text (Text)
import qualified Data.Text as Text
import Kafka.Consumer.Types (ConsumerGroupId (..))
import Kafka.Internal.Setup (KafkaConf (..))
import Kafka.Types (BrokerAddress (..), ClientId (..), KafkaCompressionCodec (..), KafkaDebug (..), KafkaLogLevel (..), kafkaCompressionCodecToText, kafkaDebugToText, Millis(..))
import Kafka.Types (BrokerAddress (..), ClientId (..), KafkaCompressionCodec (..), KafkaDebug (..), KafkaLogLevel (..), Millis (..), kafkaCompressionCodecToText, kafkaDebugToText)

import Kafka.Consumer.Callbacks as X

data CallbackMode = CallbackModeSync | CallbackModeAsync deriving (Show, Eq)
data CallbackPollMode = CallbackPollModeSync | CallbackPollModeAsync deriving (Show, Eq)

-- | Properties to create 'KafkaConsumer'.
data ConsumerProperties = ConsumerProperties
{ cpProps :: Map Text Text
, cpLogLevel :: Maybe KafkaLogLevel
, cpCallbacks :: [KafkaConf -> IO ()]
, cpUserPolls :: CallbackMode
{ cpProps :: Map Text Text
, cpLogLevel :: Maybe KafkaLogLevel
, cpCallbacks :: [KafkaConf -> IO ()]
, cpCallbackPollMode :: CallbackPollMode
}

instance Sem.Semigroup ConsumerProperties where
Expand All @@ -52,10 +52,10 @@ instance Sem.Semigroup ConsumerProperties where
-- | /Right biased/ so we prefer newer properties over older ones.
instance Monoid ConsumerProperties where
mempty = ConsumerProperties
{ cpProps = M.empty
, cpLogLevel = Nothing
, cpCallbacks = []
, cpUserPolls = CallbackModeAsync
{ cpProps = M.empty
, cpLogLevel = Nothing
, cpCallbacks = []
, cpCallbackPollMode = CallbackPollModeAsync
}
{-# INLINE mempty #-}
mappend = (Sem.<>)
Expand Down Expand Up @@ -140,14 +140,14 @@ queuedMaxMessagesKBytes kBytes =

-- | Sets the callback poll mode.
--
-- The default 'CallbackModeAsync' mode handles polling rebalance
-- The default 'CallbackPollModeAsync' mode handles polling rebalance
-- and keep alive events for you
-- in a background thread.
--
-- With 'CalalcacModeSync' the user will poll the consumer
-- With 'CallbacPollModeSync' the user will poll the consumer
-- frequently to handle new messages as well as rebalance and keep alive events.
-- 'CalalcacModeSync' lets you can simplify
-- 'CallbacPollModeSync' lets you can simplify
-- hw-kafka-client's footprint and have full control over when polling
-- happens at the cost of having to manage this yourself.
callbackPollMode :: CallbackMode -> ConsumerProperties
callbackPollMode mode = mempty { cpUserPolls = mode }
callbackPollMode :: CallbackPollMode -> ConsumerProperties
callbackPollMode mode = mempty { cpCallbackPollMode = mode }
Loading

0 comments on commit 5901585

Please sign in to comment.