Skip to content

.Net implementation of the Apache Kafka Protocol that provides basic functionality through Producer/Consumer classes. The project also offers balanced consumer implementation.

License

Notifications You must be signed in to change notification settings

KunalSaini/Kafkanet

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

38 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Kafkanet

Join the chat at https://gitter.im/Microsoft/Kafkanet .Net implementation of the Apache Kafka Protocol that provides basic functionality through Producer/Consumer classes. The project also offers balanced consumer implementation. The project is a fork from ExactTarget's Kafka-net Client.

Related documentation

Build Kafkanet

  • Clone Kafkanet through git clone https://github.com/Microsoft/Kafkanet.git
  • Open src\KafkaNETLibraryAndConsole.sln in Visual Studio
  • Build Solution

Run Unit Tests

  • Open Test Window in Visual Studio: Test>Windows>Test Explorer
  • Run all

Using Console

  • Setup local Kafka and Zookeeper

Console Options

	topic                           Dump topics metadata, such as: earliest/latest offset, replica, ISR.
    consumesimple                   Consume data in single thread.
    consumegroup                    Monitor latest offset gap and speed of consumer group.
    consumegroupmonitor             Monitor latest offset gap and speed of consumer group.
    producesimple                   Produce data in single thread.
    produceperftest                 Produce data in multiple thread.
    eventserverperftest             Http Post data to event server in multiple thread.
    producemonitor                  Monitor latest offset.
    test                            Run some adhoc test cases.

Using the library

Producer

The Producer can send one message or an entire batch to Kafka. When sending a batch you can send to multiple topics at once

Producer Usage

		var brokerConfig = new BrokerConfiguration()
        {
            BrokerId = this.brokerId,
            Host = this.kafkaServerName,
            Port = this.kafkaPort
        };
        var config = new ProducerConfiguration(new List<BrokerConfiguration> { brokerConfig });
        kafkaProducer = new Producer(config);
		// here you construct you batch or single message object
		var batch=ConstructBatch();
		kafkaProducer.Send(batch);

Simple Consumer

The simple Consumer allows full control for retrieving data. You could instantiate a Consumer directly by providing a ConsumerConfiguration and then calling Fetch. Kafkanet has a higher level wrapper around Consumer which allows consumer reuse and other benefits

Consumer Usage

			// create the Consumer higher level manager
			var managerConfig = new KafkaSimpleManagerConfiguration()
            {
                FetchSize = FetchSize,
                BufferSize = BufferSize,
                Zookeeper = m_zookeeper
            };
            m_consumerManager = new KafkaSimpleManager<int, Kafka.Client.Messages.Message>(managerConfig);
			// get all available partitions for a topic through the manager
			var allPartitions = m_consumerManager.GetTopicPartitionsFromZK(m_topic);
			// Refresh metadata and grab a consumer for desired partitions
			m_consumerManager.RefreshMetadata(0, m_consumerId, 0, m_topic, true);
            var partitionConsumer = m_consumerManager.GetConsumer(m_topic, partitionId);

Balanced Consumer

The balanced consumer manages partition assignment for each instance in the same consumer group. Rebalance are triggered by zookeeper changes.

Balanced Consumer Usage

		// Here we create a balanced consumer on one consumer machine for consumerGroupId. All machines consuming for this group will get balanced together
		ConsumerConfiguration config = new ConsumerConfiguration
        {
            AutoCommit = false,
            GroupId = consumerGroupId
            ConsumerId = uniqueConsumerId
            MaxFetchBufferLength = m_BufferMaxNoOfMessages,
            FetchSize = fetchSize,
            AutoOffsetReset = OffsetRequest.LargestTime,
            NumberOfTries = 20,
            ZooKeeper = new ZooKeeperConfiguration(zookeeperString, 30000, 30000, 2000)
        };
        var balancedConsumer = new ZookeeperConsumerConnector(config, true, m_ConsumerRebalanceHandler, m_ZKDisconnectHandler, m_ZKExpireHandler);
		// grab streams for desired topics 
		var streams = m_ZooKeeperConsumerConnector.CreateMessageStreams(m_TopicMap, new DefaultDecoder());
        var KafkaMessageStream = streams[m_Topic][0];
		// start consuming stream
		foreach (Message message in m_KafkaMessageStream.GetCancellable(cancellationTokenSource.Token))
		....

Contribute

Contributions to Kafkanet are welcome. Here is how you can contribute to Kafkanet:

About

.Net implementation of the Apache Kafka Protocol that provides basic functionality through Producer/Consumer classes. The project also offers balanced consumer implementation.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • C# 100.0%