Skip to content

Commit

Permalink
Use default partitioner and don't store key. Added unit test.
Browse files Browse the repository at this point in the history
  • Loading branch information
starzia committed Sep 24, 2014
1 parent fa42389 commit 846cddc
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 12 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ NetflixGraph is built via Gradle (www.gradle.org). To build from the command lin

./gradlew build

To build a jar that includes all dependencies run:

./gradlew installApp
See the `build.gradle` file for other gradle targets, like `distTar`, `distZip`, `installApp`, and `runServer`.

Running the server
------------------

After running `./gradlew installApp` the basic run command is:
You can run the server locally by just running `./gradlew runServer`.

More more advanced usage you may wish to run `./gradlew installApp` and then:

cd suro-server
java -cp "build/install/suro-server/lib/*" com.netflix.suro.SuroServer -m conf/routingmap.json -s conf/sink.json -i conf/input.json
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,28 +144,28 @@ protected void write(List<Message> msgList) {
}

// The new KafkaProducer does not have interface for sending multiple messages,
// so we loop and create lots of Runnables -- this seems inefficient.
// so we loop and create lots of Runnables -- this seems inefficient, but the alternative
// has its own problems. If we create one "big Runnable" that loops over messages we'll
// drain the queue4sink too quickly -- all the messages will be queued in the in-memory
// job queue storing the Runnables.
for( final SuroKeyedMessage m : msgCopies ) {
senders.submit(new Runnable() {
@Override
public void run() {
// TODO: somehow use keyForTopic map in SinkConfig, or is this already handled elsewhere?
String topic = m.getRoutingKey();

// calculate the kafka partition, with backward compatibility with old kafka producer
int numPartitions = producer.partitionsFor(topic).size();
int partition = (int) (m.getKey() % numPartitions);
DefaultPartitioner partitioner = new DefaultPartitioner(null); // old Scala partitioner
int partition = partitioner.partition(m.getKey(), numPartitions);

// Suro's message key is an Integer type but Kafka stores it as a byte[].
// For the storage purpose, we are converting the number to ASCII.
// For the partitioning purpose, it will be treated as an Integer.
byte[] keyBytes = Long.toHexString( m.getKey() ).getBytes();
ProducerRecord r = new ProducerRecord( topic,
partition,
keyBytes,
null, // don't store the key
m.getPayload() );
log.trace( "Will send message to Kafka" );
long startTimeMs = System.currentTimeMillis();
// send
Future<RecordMetadata> responseFtr = producer.send( r );
log.trace( "Started aysnc producer" );
boolean success = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class TestKafkaSinkV2 {
private static final String TOPIC_NAME = "routingKey";
private static final String TOPIC_NAME_MULTITHREAD = "routingKeyMultithread";
private static final String TOPIC_NAME_PARTITION_BY_KEY = "routingKey_partitionByKey";
private static final String TOPIC_NAME_BACKWARD_COMPAT = "routingKey_backwardCompat";

@Test
public void testDefaultParameters() throws IOException {
Expand Down Expand Up @@ -248,6 +249,99 @@ public void testFileBasedQueuePartitionByKey() throws Exception {
}
}


/** Tests backward compatability with old Kafka sink. */
@Test
public void testBackwardCompatability() throws Exception {
int numPartitions = 9;

TopicCommand.createTopic(zk.getZkClient(),
new TopicCommand.TopicCommandOptions(new String[]{
"--zookeeper", "dummy", "--create", "--topic", TOPIC_NAME_BACKWARD_COMPAT,
"--replication-factor", "2", "--partitions", Integer.toString(numPartitions)}));
String keyTopicMap = String.format(" \"keyTopicMap\": {\n" +
" \"%s\": \"key\"\n" +
" }", TOPIC_NAME_BACKWARD_COMPAT);

String description1 = "{\n" +
" \"type\": \"kafkaV1\",\n" +
" \"client.id\": \"kafkasink\",\n" +
" \"metadata.broker.list\": \"" + kafkaServer.getBrokerListStr() + "\",\n" +
" \"request.required.acks\": 1,\n" +
keyTopicMap + "\n" +
"}";
String description2 = "{\n" +
" \"type\": \"kafkaV2\",\n" +
" \"client.id\": \"kafkasink\",\n" +
" \"metadata.broker.list\": \"" + kafkaServer.getBrokerListStr() + "\",\n" +
" \"request.required.acks\": 1,\n" +
keyTopicMap + "\n" +
"}";

// setup sinks, both old and new versions
ObjectMapper jsonMapper = new DefaultObjectMapper();
jsonMapper.registerSubtypes(new NamedType(KafkaSink.class, "kafkaV1"));
jsonMapper.registerSubtypes(new NamedType(KafkaSinkV2.class, "kafkaV2"));
KafkaSink sinkV1 = jsonMapper.readValue(description1, new TypeReference<Sink>(){});
KafkaSinkV2 sinkV2 = jsonMapper.readValue(description2, new TypeReference<Sink>(){});
sinkV1.open();
sinkV2.open();
List<Sink> sinks = new ArrayList<Sink>();
sinks.add(sinkV1);
sinks.add(sinkV2);

// setup Kafka consumer (to read back messages)
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
createConsumerConfig("localhost:" + zk.getServerPort(), "gropuid"));
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(TOPIC_NAME_BACKWARD_COMPAT, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(TOPIC_NAME_BACKWARD_COMPAT).get(0);

// Send 20 test message, using the old and new Kafka sinks.
// Retrieve the messages and ensure that they are identical and sent to the same partition.
Random rand = new Random();
int messageCount = 20;
for (int i = 0; i < messageCount; ++i) {
Map<String, Object> msgMap = new ImmutableMap.Builder<String, Object>()
.put("key", new Long( rand.nextLong() ) )
.put("value", "message:" + i).build();

// send message to both sinks
for( Sink sink : sinks ){
sink.writeTo(new DefaultMessageContainer(
new Message(TOPIC_NAME_BACKWARD_COMPAT, jsonMapper.writeValueAsBytes(msgMap)),
jsonMapper));
}

// read two copies of message back from Kafka and check that partitions and data match
MessageAndMetadata<byte[], byte[]> msgAndMeta1 = stream.iterator().next();
MessageAndMetadata<byte[], byte[]> msgAndMeta2 = stream.iterator().next();
System.out.println( "iteration: "+i+" partition1: "+msgAndMeta1.partition() );
System.out.println( "iteration: "+i+" partition2: "+msgAndMeta2.partition() );
assertEquals( msgAndMeta1.partition(), msgAndMeta2.partition() );
String msg1Str = new String( msgAndMeta1.message() );
String msg2Str = new String( msgAndMeta2.message() );
System.out.println( "iteration: "+i+" message1: "+msg1Str );
System.out.println( "iteration: "+i+" message2: "+msg2Str );
assertEquals( msg1Str, msg2Str );
}

// close sinks
sinkV1.close();
sinkV2.close();
// close consumer
try {
stream.iterator().next();
fail(); // there should be no data left to consume
} catch (ConsumerTimeoutException e) {
//this is expected
consumer.shutdown();
}
}


@Test
public void testBlockingThreadPoolExecutor() {
int jobQueueSize = 5;
Expand Down

0 comments on commit 846cddc

Please sign in to comment.