diff --git a/ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java b/ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java index 4dc53467c7..6884b86b61 100644 --- a/ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java +++ b/ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java @@ -138,12 +138,16 @@ public static class WriteDoFn extends DoFn { } public WriteDoFn withBatchSize(int batchSize) { - if (batchSize > 0) this.batchSize = batchSize; + if (batchSize > 0) { + this.batchSize = batchSize; + } return this; } public WriteDoFn withTimeout(int timeout) { - if (timeout > 0) this.timeout = timeout; + if (timeout > 0) { + this.timeout = timeout; + } return this; } @@ -155,7 +159,6 @@ public void setup() { @StartBundle public void startBundle() { pipeline = jedis.pipelined(); - pipeline.multi(); batchCount = 0; } @@ -168,9 +171,7 @@ public void processElement(ProcessContext context) { } batchCount++; if (batchCount >= batchSize) { - pipeline.exec(); pipeline.sync(); - pipeline.multi(); batchCount = 0; } } @@ -197,10 +198,7 @@ private Response writeRecord(RedisMutation mutation) { @FinishBundle public void finishBundle() { - if (pipeline.isInMulti()) { - pipeline.exec(); - pipeline.sync(); - } + pipeline.sync(); batchCount = 0; }