Skip to content

Commit

Permalink
Fixed key/partitioning, fixed stats reporting, added unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
starzia committed Sep 18, 2014
1 parent c006d87 commit 000d4b3
Show file tree
Hide file tree
Showing 3 changed files with 363 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;

import java.io.IOException;
Expand All @@ -29,6 +30,7 @@
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Kafka 0.8.2 Sink, using new Java-native producer, rather than Scala produer.
Expand All @@ -48,6 +50,7 @@ public class KafkaSinkV2 extends ThreadPoolQueuedSink implements Sink {

private final KafkaProducer producer;
private long msgId = 0;
private AtomicInteger failureCount = new AtomicInteger(0);

@JsonCreator
public KafkaSinkV2(
Expand Down Expand Up @@ -132,19 +135,35 @@ protected void beforePolling() throws IOException { /*do nothing */}
protected void write(List<Message> msgList) {
log.trace( "KafkaSink write() with {} messages", msgList.size() );
// prepare "final" copies of the messages to be used in the anonymous class below
final ArrayList<Message> msgCopies = new ArrayList<Message>( msgList.size() );
final ArrayList<SuroKeyedMessage> msgCopies =
new ArrayList<SuroKeyedMessage>( msgList.size() );
for( Message m : msgList ){
msgCopies.add( new Message( m.getRoutingKey(), m.getPayload() ) );
SuroKeyedMessage sKeyedMsg = (SuroKeyedMessage) m;
msgCopies.add( new SuroKeyedMessage( sKeyedMsg.getKey(),
new Message( m.getRoutingKey(), m.getPayload() )));
}

// :( The new KafkaProducer does not have interface for sending multiple messages
for( final Message m : msgCopies ) {
// The new KafkaProducer does not have interface for sending multiple messages,
// so we loop and create lots of Runnables -- this seems inefficient.
for( final SuroKeyedMessage m : msgCopies ) {
senders.submit(new Runnable() {
@Override
public void run() {
// TODO: somehow use keyForTopic map in SinkConfig
ProducerRecord r = new ProducerRecord( m.getRoutingKey(), // Kafka topic
m.getPayload() ); // Kafka payload
// TODO: somehow use keyForTopic map in SinkConfig, or is this already handled elsewhere?
String topic = m.getRoutingKey();

// calculate the kafka partition, with backward compatibility with old kafka producer
int numPartitions = producer.partitionsFor(topic).size();
int partition = (int) (m.getKey() % numPartitions);

// Suro's message key is an Integer type but Kafka stores it as a byte[].
// For the storage purpose, we are converting the number to ASCII.
// For the partitioning purpose, it will be treated as an Integer.
byte[] keyBytes = Long.toHexString( m.getKey() ).getBytes();
ProducerRecord r = new ProducerRecord( topic,
partition,
keyBytes,
m.getPayload() );
log.trace( "Will send message to Kafka" );
long startTimeMs = System.currentTimeMillis();
Future<RecordMetadata> responseFtr = producer.send( r );
Expand Down Expand Up @@ -179,13 +198,12 @@ public void run() {
}
long durationMs = System.currentTimeMillis() - startTimeMs;
if( !success ){
// sending the request failed.
log.warn( "Kafka producer send failed after {} milliseconds", durationMs );
failureCount.incrementAndGet();
enqueue( m );
}else{
log.trace( "Kafka producer send succeeded after {} milliseconds", durationMs );
}
// TODO: do we need to do something to set statistics queried in getStat(), below?
}
});
}
Expand All @@ -205,16 +223,15 @@ public String recvNotice() {

@Override
public String getStat() {
ProducerStats stats = ProducerStatsRegistry.getProducerStats(clientId);
ProducerTopicStats topicStats = ProducerTopicStatsRegistry.getProducerTopicStats(clientId);

Map<String,? extends Metric> metrics = producer.metrics();
StringBuilder sb = new StringBuilder();
sb.append("resend rate: ").append(stats.resendRate().count()).append('\n');
sb.append("serialization error rate: " ).append(stats.serializationErrorRate().count()).append('\n');
sb.append("failed send rate: ").append(stats.failedSendRate().count()).append('\n');
sb.append("message rate: ").append(topicStats.getProducerAllTopicsStats().messageRate().count()).append('\n');
sb.append("byte rate: " ).append(topicStats.getProducerAllTopicsStats().byteRate().count()).append('\n');
sb.append("dropped message rate: " ).append(topicStats.getProducerAllTopicsStats().droppedMessageRate().count()).append('\n');
// add kafka producer stats
for( Map.Entry<String,? extends Metric> e : metrics.entrySet() ){
sb.append("kafka.").append(e.getKey()).append(": ").append(e.getValue().value()).append('\n');
}
// also report the queue size
sb.append("messages-in-queue4sink: ").append( this.queue4Sink.size() ).append('\n');
sb.append("failures: ").append( this.failureCount.get() ).append('\n');

return sb.toString();
}
Expand Down
Loading

0 comments on commit 000d4b3

Please sign in to comment.