diff --git a/README.md b/README.md index fc6be0f4..04906b9a 100644 --- a/README.md +++ b/README.md @@ -26,14 +26,14 @@ NetflixGraph is built via Gradle (www.gradle.org). To build from the command lin ./gradlew build -To build a jar that includes all dependencies run: - - ./gradlew installApp +See the `build.gradle` file for other gradle targets, like `distTar`, `distZip`, `installApp`, and `runServer`. Running the server ------------------ -After running `./gradlew installApp` the basic run command is: +You can run the server locally by just running `./gradlew runServer`. + +More more advanced usage you may wish to run `./gradlew installApp` and then: cd suro-server java -cp "build/install/suro-server/lib/*" com.netflix.suro.SuroServer -m conf/routingmap.json -s conf/sink.json -i conf/input.json diff --git a/suro-kafka/src/main/java/com/netflix/suro/sink/kafka/KafkaSinkV2.java b/suro-kafka/src/main/java/com/netflix/suro/sink/kafka/KafkaSinkV2.java index c248ba15..9970b356 100644 --- a/suro-kafka/src/main/java/com/netflix/suro/sink/kafka/KafkaSinkV2.java +++ b/suro-kafka/src/main/java/com/netflix/suro/sink/kafka/KafkaSinkV2.java @@ -144,28 +144,28 @@ protected void write(List msgList) { } // The new KafkaProducer does not have interface for sending multiple messages, - // so we loop and create lots of Runnables -- this seems inefficient. + // so we loop and create lots of Runnables -- this seems inefficient, but the alternative + // has its own problems. If we create one "big Runnable" that loops over messages we'll + // drain the queue4sink too quickly -- all the messages will be queued in the in-memory + // job queue storing the Runnables. for( final SuroKeyedMessage m : msgCopies ) { senders.submit(new Runnable() { @Override public void run() { - // TODO: somehow use keyForTopic map in SinkConfig, or is this already handled elsewhere? String topic = m.getRoutingKey(); // calculate the kafka partition, with backward compatibility with old kafka producer int numPartitions = producer.partitionsFor(topic).size(); - int partition = (int) (m.getKey() % numPartitions); + DefaultPartitioner partitioner = new DefaultPartitioner(null); // old Scala partitioner + int partition = partitioner.partition(m.getKey(), numPartitions); - // Suro's message key is an Integer type but Kafka stores it as a byte[]. - // For the storage purpose, we are converting the number to ASCII. - // For the partitioning purpose, it will be treated as an Integer. - byte[] keyBytes = Long.toHexString( m.getKey() ).getBytes(); ProducerRecord r = new ProducerRecord( topic, partition, - keyBytes, + null, // don't store the key m.getPayload() ); log.trace( "Will send message to Kafka" ); long startTimeMs = System.currentTimeMillis(); + // send Future responseFtr = producer.send( r ); log.trace( "Started aysnc producer" ); boolean success = true; diff --git a/suro-kafka/src/test/java/com/netflix/suro/sink/kafka/TestKafkaSinkV2.java b/suro-kafka/src/test/java/com/netflix/suro/sink/kafka/TestKafkaSinkV2.java index 2f9b567f..bd5e4517 100644 --- a/suro-kafka/src/test/java/com/netflix/suro/sink/kafka/TestKafkaSinkV2.java +++ b/suro-kafka/src/test/java/com/netflix/suro/sink/kafka/TestKafkaSinkV2.java @@ -52,6 +52,7 @@ public class TestKafkaSinkV2 { private static final String TOPIC_NAME = "routingKey"; private static final String TOPIC_NAME_MULTITHREAD = "routingKeyMultithread"; private static final String TOPIC_NAME_PARTITION_BY_KEY = "routingKey_partitionByKey"; + private static final String TOPIC_NAME_BACKWARD_COMPAT = "routingKey_backwardCompat"; @Test public void testDefaultParameters() throws IOException { @@ -248,6 +249,99 @@ public void testFileBasedQueuePartitionByKey() throws Exception { } } + + /** Tests backward compatability with old Kafka sink. */ + @Test + public void testBackwardCompatability() throws Exception { + int numPartitions = 9; + + TopicCommand.createTopic(zk.getZkClient(), + new TopicCommand.TopicCommandOptions(new String[]{ + "--zookeeper", "dummy", "--create", "--topic", TOPIC_NAME_BACKWARD_COMPAT, + "--replication-factor", "2", "--partitions", Integer.toString(numPartitions)})); + String keyTopicMap = String.format(" \"keyTopicMap\": {\n" + + " \"%s\": \"key\"\n" + + " }", TOPIC_NAME_BACKWARD_COMPAT); + + String description1 = "{\n" + + " \"type\": \"kafkaV1\",\n" + + " \"client.id\": \"kafkasink\",\n" + + " \"metadata.broker.list\": \"" + kafkaServer.getBrokerListStr() + "\",\n" + + " \"request.required.acks\": 1,\n" + + keyTopicMap + "\n" + + "}"; + String description2 = "{\n" + + " \"type\": \"kafkaV2\",\n" + + " \"client.id\": \"kafkasink\",\n" + + " \"metadata.broker.list\": \"" + kafkaServer.getBrokerListStr() + "\",\n" + + " \"request.required.acks\": 1,\n" + + keyTopicMap + "\n" + + "}"; + + // setup sinks, both old and new versions + ObjectMapper jsonMapper = new DefaultObjectMapper(); + jsonMapper.registerSubtypes(new NamedType(KafkaSink.class, "kafkaV1")); + jsonMapper.registerSubtypes(new NamedType(KafkaSinkV2.class, "kafkaV2")); + KafkaSink sinkV1 = jsonMapper.readValue(description1, new TypeReference(){}); + KafkaSinkV2 sinkV2 = jsonMapper.readValue(description2, new TypeReference(){}); + sinkV1.open(); + sinkV2.open(); + List sinks = new ArrayList(); + sinks.add(sinkV1); + sinks.add(sinkV2); + + // setup Kafka consumer (to read back messages) + ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector( + createConsumerConfig("localhost:" + zk.getServerPort(), "gropuid")); + Map topicCountMap = new HashMap(); + topicCountMap.put(TOPIC_NAME_BACKWARD_COMPAT, 1); + Map>> consumerMap = + consumer.createMessageStreams(topicCountMap); + KafkaStream stream = consumerMap.get(TOPIC_NAME_BACKWARD_COMPAT).get(0); + + // Send 20 test message, using the old and new Kafka sinks. + // Retrieve the messages and ensure that they are identical and sent to the same partition. + Random rand = new Random(); + int messageCount = 20; + for (int i = 0; i < messageCount; ++i) { + Map msgMap = new ImmutableMap.Builder() + .put("key", new Long( rand.nextLong() ) ) + .put("value", "message:" + i).build(); + + // send message to both sinks + for( Sink sink : sinks ){ + sink.writeTo(new DefaultMessageContainer( + new Message(TOPIC_NAME_BACKWARD_COMPAT, jsonMapper.writeValueAsBytes(msgMap)), + jsonMapper)); + } + + // read two copies of message back from Kafka and check that partitions and data match + MessageAndMetadata msgAndMeta1 = stream.iterator().next(); + MessageAndMetadata msgAndMeta2 = stream.iterator().next(); + System.out.println( "iteration: "+i+" partition1: "+msgAndMeta1.partition() ); + System.out.println( "iteration: "+i+" partition2: "+msgAndMeta2.partition() ); + assertEquals( msgAndMeta1.partition(), msgAndMeta2.partition() ); + String msg1Str = new String( msgAndMeta1.message() ); + String msg2Str = new String( msgAndMeta2.message() ); + System.out.println( "iteration: "+i+" message1: "+msg1Str ); + System.out.println( "iteration: "+i+" message2: "+msg2Str ); + assertEquals( msg1Str, msg2Str ); + } + + // close sinks + sinkV1.close(); + sinkV2.close(); + // close consumer + try { + stream.iterator().next(); + fail(); // there should be no data left to consume + } catch (ConsumerTimeoutException e) { + //this is expected + consumer.shutdown(); + } + } + + @Test public void testBlockingThreadPoolExecutor() { int jobQueueSize = 5;