Skip to content

Commit

Permalink
Manual assignment of consumer group offset
Browse files Browse the repository at this point in the history
  • Loading branch information
zhilingc committed Aug 26, 2019
1 parent e525c74 commit 1059b66
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 4 deletions.
5 changes: 5 additions & 0 deletions core/src/test/java/feast/core/ImportJobTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package feast.core;

public class ImportJobTest {

}
84 changes: 80 additions & 4 deletions ingestion/src/main/java/feast/ingestion/ImportJob.java
Original file line number Diff line number Diff line change
@@ -1,21 +1,37 @@
package feast.ingestion;

import static feast.specs.ImportJobSpecsProto.SourceSpec.SourceType.KAFKA;

import com.google.cloud.bigquery.BigQueryOptions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import feast.ingestion.options.ImportJobPipelineOptions;
import feast.ingestion.transform.ReadFeaturesTransform;
import feast.ingestion.transform.ToFeatureRowExtended;
import feast.ingestion.transform.WriteFeaturesTransform;
import feast.ingestion.util.ProtoUtil;
import feast.ingestion.util.StorageUtil;
import feast.specs.ImportJobSpecsProto.ImportJobSpecs;
import feast.specs.ImportJobSpecsProto.SourceSpec;
import feast.specs.ImportJobSpecsProto.SourceSpec.SourceType;
import feast.specs.StorageSpecProto.StorageSpec;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;

@SuppressWarnings("WeakerAccess")
@Slf4j
Expand All @@ -27,8 +43,8 @@ public class ImportJob {
* <p>The arguments will be passed to Beam {@code PipelineOptionsFactory} to create {@code
* ImportJobPipelineOptions}.
*
* <p>The returned PipelineResult object can be used to check the state of the pipeline e.g. if it
* is running, done or cancelled.
* <p>The returned PipelineResult object can be used to check the state of the pipeline e.g. if
* it is running, done or cancelled.
*
* @param args command line arguments, typically come from the main() method
* @return PipelineResult
Expand All @@ -43,8 +59,8 @@ public static PipelineResult runPipeline(String[] args) throws IOException, URIS
/**
* Create and run a Beam pipeline from {@code ImportJobPipelineOptions}.
*
* <p>The returned PipelineResult object can be used to check the state of the pipeline e.g. if it
* is running, done or cancelled.
* <p>The returned PipelineResult object can be used to check the state of the pipeline e.g. if
* it is running, done or cancelled.
*
* @param pipelineOptions configuration for the pipeline
* @return PipelineResult
Expand All @@ -61,6 +77,7 @@ public static PipelineResult runPipeline(ImportJobPipelineOptions pipelineOption
ImportJobSpecs.class);
pipelineOptions.setJobName(importJobSpecs.getJobId());
setupStorage(importJobSpecs);
setupConsumerGroupOffset(importJobSpecs);
Pipeline pipeline = Pipeline.create(pipelineOptions);
pipeline
.apply("Read FeatureRow", new ReadFeaturesTransform(importJobSpecs))
Expand Down Expand Up @@ -103,4 +120,63 @@ private static void setupStorage(ImportJobSpecs importJobSpecs) {
storageSpecType));
}
}

/**
* Manually sets the consumer group offset for this job's consumer group to the offset at the time
* at which we provision the ingestion job.
*
* <p>This is necessary because the setup time for certain
* runners (e.g. Dataflow) might cause the worker to miss the messages that were emitted into the
* stream prior to the workers being ready.
*
* @param importJobSpecs import job specification, refer to {@code ImportJobSpecs.proto}
*/
private static void setupConsumerGroupOffset(ImportJobSpecs importJobSpecs) {
SourceType sourceType = importJobSpecs.getSourceSpec().getType();
switch (sourceType) {
case KAFKA:
String consumerGroupId = String.format("feast-import-job-%s", importJobSpecs.getJobId());
Properties consumerProperties = new Properties();
consumerProperties.setProperty("group.id", consumerGroupId);
consumerProperties.setProperty("bootstrap.servers",
importJobSpecs.getSourceSpec().getOptionsOrThrow("bootstrapServers"));
consumerProperties.setProperty(
"key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
consumerProperties.setProperty(
"value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
KafkaConsumer kafkaConsumer = new KafkaConsumer(consumerProperties);

String[] topics = importJobSpecs.getSourceSpec().getOptionsOrThrow("topics").split(",");
long timestamp = System.currentTimeMillis();
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
for (String topic : topics) {
List<PartitionInfo> partitionInfos = kafkaConsumer.partitionsFor(topic);
for (PartitionInfo partitionInfo : partitionInfos) {
TopicPartition topicPartition = new TopicPartition(topic, partitionInfo.partition());
timestampsToSearch.put(topicPartition, timestamp);
}
}
Map<TopicPartition, OffsetAndTimestamp> offsets = kafkaConsumer
.offsetsForTimes(timestampsToSearch);

kafkaConsumer.assign(offsets.keySet());
kafkaConsumer.poll(1000);
kafkaConsumer.commitSync();

offsets.forEach((topicPartition, offset) -> {
if (offset != null) {
kafkaConsumer.seek(topicPartition, offset.offset());
} else {
kafkaConsumer.seekToBeginning(Sets.newHashSet(topicPartition));
}
});
return;
default:
throw new IllegalArgumentException(
String.format(
"Unsupported type of sourceSpec: \"%s\". Only KAFKA is supported in Feast 0.2",
sourceType));
}

}
}

0 comments on commit 1059b66

Please sign in to comment.