Skip to content

Commit 1e62187

Browse files
authored
SolaceIO data loss - remove message ack from close and advance as it may lead to data loss during work rebalancing or retry. (#37007)
1 parent 7eb6215 commit 1e62187

File tree

3 files changed

+78
-37
lines changed

3 files changed

+78
-37
lines changed

sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
package org.apache.beam.sdk.io.solace.read;
1919

2020
import com.solacesystems.jcsmp.BytesXMLMessage;
21+
import java.util.List;
2122
import java.util.Objects;
22-
import java.util.Queue;
2323
import org.apache.beam.sdk.annotations.Internal;
2424
import org.apache.beam.sdk.coders.DefaultCoder;
2525
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
@@ -38,7 +38,7 @@
3838
@VisibleForTesting
3939
public class SolaceCheckpointMark implements UnboundedSource.CheckpointMark {
4040
private static final Logger LOG = LoggerFactory.getLogger(SolaceCheckpointMark.class);
41-
private transient Queue<BytesXMLMessage> safeToAck;
41+
private transient List<BytesXMLMessage> safeToAck;
4242

4343
@SuppressWarnings("initialization") // Avro will set the fields by breaking abstraction
4444
private SolaceCheckpointMark() {}
@@ -48,14 +48,13 @@ private SolaceCheckpointMark() {}
4848
*
4949
* @param safeToAck - a queue of {@link BytesXMLMessage} to be acknowledged.
5050
*/
51-
SolaceCheckpointMark(Queue<BytesXMLMessage> safeToAck) {
51+
SolaceCheckpointMark(List<BytesXMLMessage> safeToAck) {
5252
this.safeToAck = safeToAck;
5353
}
5454

5555
@Override
5656
public void finalizeCheckpoint() {
57-
BytesXMLMessage msg;
58-
while ((msg = safeToAck.poll()) != null) {
57+
for (BytesXMLMessage msg : safeToAck) {
5958
try {
6059
msg.ackMessage();
6160
} catch (IllegalStateException e) {

sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java

Lines changed: 4 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import java.util.NoSuchElementException;
2828
import java.util.Queue;
2929
import java.util.UUID;
30-
import java.util.concurrent.ConcurrentLinkedQueue;
3130
import java.util.concurrent.ExecutionException;
3231
import java.util.concurrent.Executors;
3332
import java.util.concurrent.ScheduledExecutorService;
@@ -42,6 +41,7 @@
4241
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
4342
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
4443
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification;
44+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
4545
import org.checkerframework.checker.nullness.qual.Nullable;
4646
import org.joda.time.Instant;
4747
import org.slf4j.Logger;
@@ -60,12 +60,6 @@ class UnboundedSolaceReader<T> extends UnboundedReader<T> {
6060
private @Nullable BytesXMLMessage solaceOriginalRecord;
6161
private @Nullable T solaceMappedRecord;
6262

63-
/**
64-
* Queue to place advanced messages before {@link #getCheckpointMark()} is called. CAUTION:
65-
* Accessed by both reader and checkpointing threads.
66-
*/
67-
private final Queue<BytesXMLMessage> safeToAckMessages = new ConcurrentLinkedQueue<>();
68-
6963
/**
7064
* Queue for messages that were ingested in the {@link #advance()} method, but not sent yet to a
7165
* {@link SolaceCheckpointMark}.
@@ -136,8 +130,6 @@ public boolean start() {
136130

137131
@Override
138132
public boolean advance() {
139-
finalizeReadyMessages();
140-
141133
BytesXMLMessage receivedXmlMessage;
142134
try {
143135
receivedXmlMessage = getSessionService().getReceiver().receive();
@@ -158,27 +150,9 @@ public boolean advance() {
158150

159151
@Override
160152
public void close() {
161-
finalizeReadyMessages();
162153
sessionServiceCache.invalidate(readerUuid);
163154
}
164155

165-
public void finalizeReadyMessages() {
166-
BytesXMLMessage msg;
167-
while ((msg = safeToAckMessages.poll()) != null) {
168-
try {
169-
msg.ackMessage();
170-
} catch (IllegalStateException e) {
171-
LOG.error(
172-
"SolaceIO.Read: failed to acknowledge the message with applicationMessageId={}, ackMessageId={}. Returning the message to queue to retry.",
173-
msg.getApplicationMessageId(),
174-
msg.getAckMessageId(),
175-
e);
176-
safeToAckMessages.add(msg); // In case the error was transient, might succeed later
177-
break; // Commit is only best effort
178-
}
179-
}
180-
}
181-
182156
@Override
183157
public Instant getWatermark() {
184158
// should be only used by a test receiver
@@ -190,9 +164,10 @@ public Instant getWatermark() {
190164

191165
@Override
192166
public UnboundedSource.CheckpointMark getCheckpointMark() {
193-
safeToAckMessages.addAll(receivedMessages);
167+
168+
ImmutableList<BytesXMLMessage> bytesXMLMessages = ImmutableList.copyOf(receivedMessages);
194169
receivedMessages.clear();
195-
return new SolaceCheckpointMark(safeToAckMessages);
170+
return new SolaceCheckpointMark(bytesXMLMessages);
196171
}
197172

198173
@Override

sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java

Lines changed: 70 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -458,13 +458,13 @@ public void testCheckpointMarkAndFinalizeSeparately() throws Exception {
458458
// mark all consumed messages as ready to be acknowledged
459459
CheckpointMark checkpointMark = reader.getCheckpointMark();
460460

461-
// consume 1 more message. This will call #ackMsg() on messages that were ready to be acked.
461+
// consume 1 more message.
462462
reader.advance();
463-
assertEquals(4, countAckMessages.get());
463+
assertEquals(0, countAckMessages.get());
464464

465465
// consume 1 more message. No change in the acknowledged messages.
466466
reader.advance();
467-
assertEquals(4, countAckMessages.get());
467+
assertEquals(0, countAckMessages.get());
468468

469469
// acknowledge from the first checkpoint
470470
checkpointMark.finalizeCheckpoint();
@@ -473,6 +473,73 @@ public void testCheckpointMarkAndFinalizeSeparately() throws Exception {
473473
assertEquals(4, countAckMessages.get());
474474
}
475475

476+
@Test
477+
public void testLateCheckpointOverlappingFlushingOfNextBundle() throws Exception {
478+
AtomicInteger countConsumedMessages = new AtomicInteger(0);
479+
AtomicInteger countAckMessages = new AtomicInteger(0);
480+
481+
// Broker that creates input data
482+
SerializableFunction<Integer, BytesXMLMessage> recordFn =
483+
index -> {
484+
List<BytesXMLMessage> messages = new ArrayList<>();
485+
for (int i = 0; i < 10; i++) {
486+
messages.add(
487+
SolaceDataUtils.getBytesXmlMessage(
488+
"payload_test" + i, "45" + i, (num) -> countAckMessages.incrementAndGet()));
489+
}
490+
countConsumedMessages.incrementAndGet();
491+
return getOrNull(index, messages);
492+
};
493+
494+
SessionServiceFactory fakeSessionServiceFactory =
495+
MockSessionServiceFactory.builder().recordFn(recordFn).minMessagesReceived(10).build();
496+
497+
Read<Record> spec =
498+
getDefaultRead()
499+
.withSessionServiceFactory(fakeSessionServiceFactory)
500+
.withMaxNumConnections(4);
501+
502+
UnboundedSolaceSource<Record> initialSource = getSource(spec, pipeline);
503+
504+
UnboundedReader<Record> reader =
505+
initialSource.createReader(PipelineOptionsFactory.create(), null);
506+
507+
// start the reader and move to the first record
508+
assertTrue(reader.start());
509+
510+
// consume 3 messages (NB: #start() already consumed the first message)
511+
for (int i = 0; i < 3; i++) {
512+
assertTrue(String.format("Failed at %d-th message", i), reader.advance());
513+
}
514+
515+
// #advance() was called, but the messages were not ready to be acknowledged.
516+
assertEquals(0, countAckMessages.get());
517+
518+
// mark all consumed messages as ready to be acknowledged
519+
CheckpointMark checkpointMark = reader.getCheckpointMark();
520+
521+
// data is flushed
522+
523+
// consume 1 more message.
524+
reader.advance();
525+
assertEquals(0, countAckMessages.get());
526+
527+
// consume 1 more message. No change in the acknowledged messages.
528+
reader.advance();
529+
assertEquals(0, countAckMessages.get());
530+
531+
CheckpointMark checkpointMark2 = reader.getCheckpointMark();
532+
// data is prepared for flushing that will be rejected
533+
534+
// acknowledge from the first checkpoint may arrive late
535+
checkpointMark.finalizeCheckpoint();
536+
537+
assertEquals(4, countAckMessages.get());
538+
539+
checkpointMark2.finalizeCheckpoint();
540+
assertEquals(6, countAckMessages.get());
541+
}
542+
476543
@Test
477544
public void testCheckpointMarkSafety() throws Exception {
478545

0 commit comments

Comments
 (0)