Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public abstract class AbstractVeniceProducer<K, V> implements VeniceProducer<K,

private SchemaReader schemaReader;
private ThreadPoolExecutor producerExecutor;
private ThreadPoolExecutor writerExecutor; // Single-threaded executor to maintain write order
private VeniceWriter<byte[], byte[], byte[]> veniceWriter;

private RecordSerializer<Object> keySerializer;
Expand Down Expand Up @@ -103,6 +104,9 @@ protected void configure(
if (metricsRepository != null) {
new ThreadPoolStats(metricsRepository, producerExecutor, "client_producer_thread_pool");
}
// Single-threaded executor to ensure write operations maintain order
this.writerExecutor = ThreadPoolFactory
.createThreadPool(1, "ClientProducerWriter", Integer.MAX_VALUE, BlockingQueueType.LINKED_BLOCKING_QUEUE);
this.keySerializer = getSerializer(schemaReader.getKeySchema());

VersionCreationResponse versionCreationResponse = requestTopic();
Expand Down Expand Up @@ -198,15 +202,19 @@ private CompletableFuture<DurableWrite> asyncPutInternal(long logicalTime, K key
}

producerMetrics.recordPutRequest();
return CompletableFuture.supplyAsync(() -> {

// Step 1: Pre-process in parallel (schema fetching and serialization)
// This can happen concurrently across multiple threads for performance
CompletableFuture<PreparedWriteData> preprocessFuture = CompletableFuture.supplyAsync(() -> {
final Instant preProcessingStartTime = Instant.now();
Schema valueSchema;
try {
valueSchema = getSchemaFromObject(value);
} catch (Exception e) {
producerMetrics.recordFailedRequest();
throw e;
}
// Might block
// Might block - this is the expensive part we want to parallelize
int valueSchemaId;
Exception schemaReadException = null;
try {
Expand All @@ -222,39 +230,37 @@ private CompletableFuture<DurableWrite> asyncPutInternal(long logicalTime, K key
+ ". This might be transient if the schema has been registered recently.",
schemaReadException);
}
final CompletableFuture<Void> completableFuture = new CompletableFuture<>();
final Instant sendStartTime = Instant.now();
final PubSubProducerCallback callback = getPubSubProducerCallback(
sendStartTime,
completableFuture,
"Failed to write the requested data to the PubSub system");

byte[] keyBytes = keySerializer.serialize(key);
byte[] valueBytes = getSerializer(valueSchema).serialize(value);

try {
veniceWriter.put(keyBytes, valueBytes, valueSchemaId, logicalTime, callback);
} catch (Exception e) {
callback.onCompletion(null, e);
throw e;
}

try {
completableFuture.get();
} catch (InterruptedException | ExecutionException e) {
throw new VeniceException(e);
}
// Record preprocessing latency
Duration preprocessingDuration = Duration.between(preProcessingStartTime, Instant.now());
producerMetrics.recordPreprocessingLatency(preprocessingDuration.toMillis());

return DURABLE_WRITE;
return new PreparedWriteData(keyBytes, valueBytes, valueSchemaId, -1, logicalTime);
}, producerExecutor);

// Step 2: Submit write task to single-threaded executor IMMEDIATELY to maintain order
// The write task waits for preprocessing to complete, then performs the write
// This ensures veniceWriter.put() calls happen in the order requests were received
return executeWrite(
preprocessFuture,
(preparedData, callback) -> veniceWriter.put(
preparedData.keyBytes,
preparedData.payloadBytes,
preparedData.valueSchemaId,
preparedData.logicalTime,
callback),
"Failed to write the PUT record to the PubSub system");
}

private PubSubProducerCallback getPubSubProducerCallback(
Instant sendStartTime,
CompletableFuture<Void> completableFuture,
String errorMessage) {
final AtomicBoolean callbackTriggered = new AtomicBoolean();
final PubSubProducerCallback callback = (PubSubProduceResult produceResult, Exception exception) -> {
return (PubSubProduceResult produceResult, Exception exception) -> {
boolean firstInvocation = callbackTriggered.compareAndSet(false, true);
// We do not expect this to be triggered multiple times, but we still handle the case for defensive reasons
if (!firstInvocation) {
Expand All @@ -271,7 +277,6 @@ private PubSubProducerCallback getPubSubProducerCallback(
completableFuture.completeExceptionally(exception);
}
};
return callback;
}

@Override
Expand All @@ -295,31 +300,24 @@ private CompletableFuture<DurableWrite> asyncDeleteInternal(long logicalTime, K
}

producerMetrics.recordDeleteRequest();
return CompletableFuture.supplyAsync(() -> {
final CompletableFuture<Void> completableFuture = new CompletableFuture<>();
final Instant sendStartTime = Instant.now();
final PubSubProducerCallback callback = getPubSubProducerCallback(
sendStartTime,
completableFuture,
"Failed to write the delete operation to the PubSub system");

// Step 1: Pre-process in parallel (key serialization)
CompletableFuture<PreparedWriteData> preprocessFuture = CompletableFuture.supplyAsync(() -> {
final Instant preProcessingStartTime = Instant.now();
byte[] keyBytes = keySerializer.serialize(key);

try {
veniceWriter.delete(keyBytes, logicalTime, callback);
} catch (Exception e) {
callback.onCompletion(null, e);
throw e;
}

try {
completableFuture.get();
} catch (InterruptedException | ExecutionException e) {
throw new VeniceException(e);
}
// Record preprocessing latency
Duration preprocessingDuration = Duration.between(preProcessingStartTime, Instant.now());
producerMetrics.recordPreprocessingLatency(preprocessingDuration.toMillis());

return DURABLE_WRITE;
return new PreparedWriteData(keyBytes, null, -1, -1, logicalTime);
}, producerExecutor);

// Step 2: Submit write task to single-threaded executor IMMEDIATELY to maintain order
return executeWrite(
preprocessFuture,
(preparedData, callback) -> veniceWriter.delete(preparedData.keyBytes, preparedData.logicalTime, callback),
"Failed to write the DELETE record to the PubSub system");
}

@Override
Expand All @@ -346,7 +344,10 @@ private CompletableFuture<DurableWrite> asyncUpdateInternal(
}

producerMetrics.recordUpdateRequest();
return CompletableFuture.supplyAsync(() -> {

// Step 1: Pre-process in parallel (schema fetching, building update record, serialization)
CompletableFuture<PreparedWriteData> preprocessFuture = CompletableFuture.supplyAsync(() -> {
final Instant preProcessingStartTime = Instant.now();
// Caching to avoid race conditions during processing of the function
DerivedSchemaEntry updateSchemaEntry = schemaReader.getLatestUpdateSchema();

Expand All @@ -371,40 +372,32 @@ private CompletableFuture<DurableWrite> asyncUpdateInternal(
updateFunction.accept(updateBuilder);
GenericRecord updateRecord = updateBuilder.build();

final CompletableFuture<Void> completableFuture = new CompletableFuture<>();
final Instant sendStartTime = Instant.now();
final AtomicBoolean callbackTriggered = new AtomicBoolean();
final PubSubProducerCallback callback = getPubSubProducerCallback(
sendStartTime,
completableFuture,
"Failed to write the partial update record to the PubSub system");

byte[] keyBytes = keySerializer.serialize(key);
byte[] updateBytes = getSerializer(updateSchema).serialize(updateRecord);

try {
veniceWriter.update(
keyBytes,
updateBytes,
updateSchemaEntry.getValueSchemaID(),
updateSchemaEntry.getId(),
callback,
logicalTime);
} catch (Exception e) {
if (!callbackTriggered.get()) {
callback.onCompletion(null, e);
}
throw e;
}
// Record preprocessing latency
Duration preprocessingDuration = Duration.between(preProcessingStartTime, Instant.now());
producerMetrics.recordPreprocessingLatency(preprocessingDuration.toMillis());

try {
completableFuture.get();
} catch (InterruptedException | ExecutionException e) {
throw new VeniceException(e);
}

return DURABLE_WRITE;
return new PreparedWriteData(
keyBytes,
updateBytes,
updateSchemaEntry.getValueSchemaID(),
updateSchemaEntry.getId(),
logicalTime);
}, producerExecutor);

// Step 2: Submit write task to single-threaded executor IMMEDIATELY to maintain order
return executeWrite(
preprocessFuture,
(preparedData, callback) -> veniceWriter.update(
preparedData.keyBytes,
preparedData.payloadBytes,
preparedData.valueSchemaId,
preparedData.derivedSchemaId,
callback,
preparedData.logicalTime),
"Failed to write the UPDATE record to the PubSub system");
}

/**
Expand Down Expand Up @@ -442,11 +435,75 @@ public void close() throws IOException {
LOGGER.warn("Caught InterruptedException while closing the Venice producer ExecutorService", e);
}
}
if (writerExecutor != null) {
writerExecutor.shutdownNow();
try {
writerExecutor.awaitTermination(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOGGER.warn("Caught InterruptedException while closing the Venice producer writer ExecutorService", e);
}
}

Utils.closeQuietlyWithErrorLogged(veniceWriter);
}

protected boolean isClosed() {
return closed;
}

@FunctionalInterface
private interface WriteOperation {
void execute(PreparedWriteData preparedData, PubSubProducerCallback callback) throws Exception;
}

private CompletableFuture<DurableWrite> executeWrite(
CompletableFuture<PreparedWriteData> preprocessFuture,
WriteOperation writeOperation,
String errorMessage) {
return CompletableFuture.supplyAsync(() -> {
PreparedWriteData preparedData;
try {
// Wait for preprocessing to complete
preparedData = preprocessFuture.get();
} catch (InterruptedException | ExecutionException e) {
throw new VeniceException(e);
}

final CompletableFuture<Void> writeFuture = new CompletableFuture<>();
final Instant sendStartTime = Instant.now();
final PubSubProducerCallback callback = getPubSubProducerCallback(sendStartTime, writeFuture, errorMessage);

try {
writeOperation.execute(preparedData, callback);
} catch (Exception e) {
callback.onCompletion(null, e);
throw new VeniceException(errorMessage, e);
}

try {
writeFuture.get();
} catch (InterruptedException | ExecutionException e) {
throw new VeniceException(e);
}

return DURABLE_WRITE;
}, writerExecutor);
}

// Helper class to hold prepared data from preprocessing
private static class PreparedWriteData {
final byte[] keyBytes;
final byte[] payloadBytes; // null for delete operations, valueBytes for put, updateBytes for update
final int valueSchemaId; // -1 for delete operations
final int derivedSchemaId; // -1 for put/delete operations, used only for update
final long logicalTime;

PreparedWriteData(byte[] keyBytes, byte[] payloadBytes, int valueSchemaId, int derivedSchemaId, long logicalTime) {
this.keyBytes = keyBytes;
this.payloadBytes = payloadBytes;
this.valueSchemaId = valueSchemaId;
this.derivedSchemaId = derivedSchemaId;
this.logicalTime = logicalTime;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public class VeniceProducerMetrics extends AbstractVeniceStats {
private Sensor successOperationSensor = null;
private Sensor failedOperationSensor = null;
private Sensor produceLatencySensor = null;
private Sensor preprocessingLatencySensor = null;
private Sensor pendingOperationSensor = null;

private final AtomicInteger pendingOperationCounter = new AtomicInteger(0);
Expand All @@ -39,6 +40,10 @@ public VeniceProducerMetrics(MetricsRepository metricsRepository, String storeNa
produceLatencySensor = registerSensor(
produceLatencySensorName,
TehutiUtils.getPercentileStat(getName() + AbstractVeniceStats.DELIMITER + produceLatencySensorName));
String preprocessingLatencySensorName = "preprocessing_latency";
preprocessingLatencySensor = registerSensor(
preprocessingLatencySensorName,
TehutiUtils.getPercentileStat(getName() + AbstractVeniceStats.DELIMITER + preprocessingLatencySensorName));

pendingOperationSensor = registerSensor("pending_write_operation", new Min(), new Max());
} else {
Expand Down Expand Up @@ -88,4 +93,10 @@ public void recordFailedRequest() {
pendingOperationSensor.record(pendingOperationCounter.decrementAndGet());
}
}

public void recordPreprocessingLatency(long latencyMs) {
if (enableMetrics) {
preprocessingLatencySensor.record(latencyMs);
}
}
}
Loading
Loading