Skip to content

Commit

Permalink
Merge pull request #13 from jianwan2016/MAYH-5750-expose-commitOffsets
Browse files Browse the repository at this point in the history
MAYH-5750 add commitPartitionsOffsets
  • Loading branch information
AlexeyRaga authored Jul 12, 2017
2 parents f22fa7d + 822807e commit 9ab9ded
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 19 deletions.
30 changes: 20 additions & 10 deletions src/Kafka/Consumer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ module Kafka.Consumer
, pollMessage
, commitOffsetMessage
, commitAllOffsets
, commitPartitionsOffsets
, closeConsumer

-- ReExport Types
Expand All @@ -20,24 +21,24 @@ module Kafka.Consumer
where

import Control.Exception
import Control.Monad (forM_)
import Control.Monad (forM_)
import Control.Monad.IO.Class
import qualified Data.ByteString as BS
import qualified Data.Map as M
import Foreign hiding (void)
import qualified Data.ByteString as BS
import qualified Data.Map as M
import Foreign hiding (void)
import Kafka.Consumer.Convert
import Kafka.Internal.RdKafka
import Kafka.Internal.RdKafkaEnum
import Kafka.Internal.Setup
import Kafka.Internal.Shared

import qualified Kafka.Consumer.Types as CIT
import qualified Kafka.Internal.RdKafkaEnum as RDE
import qualified Kafka.Consumer.Types as CIT
import qualified Kafka.Internal.RdKafkaEnum as RDE

import Kafka.Types as X
import Kafka.Consumer.Types as X
import Kafka.Consumer.Subscription as X
import Kafka.Consumer.ConsumerProperties as X
import Kafka.Consumer.ConsumerProperties as X
import Kafka.Consumer.Subscription as X
import Kafka.Consumer.Types as X
import Kafka.Types as X

-- | Runs high-level kafka consumer.
--
Expand Down Expand Up @@ -107,6 +108,15 @@ commitAllOffsets :: MonadIO m
commitAllOffsets o k =
liftIO $ newForeignPtr_ nullPtr >>= commitOffsets o k

-- | Commit offsets for all currently assigned partitions.
commitPartitionsOffsets :: MonadIO m
=> OffsetCommit
-> KafkaConsumer
-> [TopicPartition]
-> m (Maybe KafkaError)
commitPartitionsOffsets o k ps =
liftIO $ toNativeTopicPartitionList ps >>= commitOffsets o k

-- | Assigns specified partitions to a current consumer.
-- Assigning an empty list means unassigning from all partitions that are currently assigned.
assign :: MonadIO m => KafkaConsumer -> [TopicPartition] -> m KafkaError
Expand Down
19 changes: 10 additions & 9 deletions src/Kafka/Consumer/Types.hs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
{-# LANGUAGE DeriveDataTypeable, GeneralizedNewtypeDeriving #-}
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
module Kafka.Consumer.Types

where

import Data.Bifunctor
import Data.Bifoldable
import Data.Bitraversable
import Data.Int
import Data.Typeable
import Kafka.Types
import Kafka.Internal.RdKafka
import Data.Bifoldable
import Data.Bifunctor
import Data.Bitraversable
import Data.Int
import Data.Typeable
import Kafka.Internal.RdKafka
import Kafka.Types

data KafkaConsumer = KafkaConsumer { kcKafkaPtr :: !RdKafkaTPtr, kcKafkaConf :: !RdKafkaConfTPtr} deriving (Show)

Expand All @@ -18,7 +19,7 @@ newtype OffsetsCommitCallback = OffsetsCommitCallback (KafkaConsumer -> KafkaErr

newtype ConsumerGroupId = ConsumerGroupId String deriving (Show, Eq)
newtype Offset = Offset Int64 deriving (Show, Eq, Read)
newtype PartitionId = PartitionId Int deriving (Show, Eq, Read)
newtype PartitionId = PartitionId Int deriving (Show, Eq, Read, Ord)
newtype Millis = Millis Int deriving (Show, Eq, Ord, Num)
newtype ClientId = ClientId String deriving (Show, Eq, Ord)
data OffsetReset = Earliest | Latest deriving (Show, Eq)
Expand Down

0 comments on commit 9ab9ded

Please sign in to comment.