Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Should DeletionVectorsMaintainer keep thread-safe? #4579

Closed
2 tasks done
stayrascal opened this issue Nov 25, 2024 · 6 comments
Closed
2 tasks done

Should DeletionVectorsMaintainer keep thread-safe? #4579

stayrascal opened this issue Nov 25, 2024 · 6 comments
Labels
bug Something isn't working

Comments

@stayrascal
Copy link
Contributor

Search before asking

  • I searched in the issues and found nothing similar.

Paimon version

master: 16a4058

Compute Engine

flink

Minimal reproduce step

  1. it's hard to reproduce since it's not a deterministic problem, the stack as follow:
Caused By: java.util.ConcurrentModificationException
	at java.util.HashMap$HashIterator.nextNode(HashMap.java:1469)
	at java.util.HashMap$EntryIterator.next(HashMap.java:1503)
	at java.util.HashMap$EntryIterator.next(HashMap.java:1501)
	at org.apache.paimon.deletionvectors.DeletionVectorIndexFileWriter.tryWriter(DeletionVectorIndexFileWriter.java:78)
	at org.apache.paimon.deletionvectors.DeletionVectorIndexFileWriter.write(DeletionVectorIndexFileWriter.java:68)
	at org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.write(DeletionVectorsIndexFile.java:154)
	at org.apache.paimon.index.IndexFileHandler.writeDeletionVectorsIndex(IndexFileHandler.java:293)
	at org.apache.paimon.deletionvectors.DeletionVectorsMaintainer.writeDeletionVectorsIndex(DeletionVectorsMaintainer.java:117)
    at org.apache.paimon.compact.CompactDeletionFile.generateFiles(CompactDeletionFile.java:45)
    at org.apache.paimon.compact.CompactDeletionFileSLazyCompactDeletionFile.getOrCompute(CompactDeletionFile.java:121)
    at org. apache.paimon.operation.AbstractFileStoreWrite.prepareCommit(AbstractFileStoreWrite.java:217)
    at org.apache.paimon.operation.MemoryFileStoreWrite.prepareCommit(MemoryFileStorewrite.java:154)
  1. I'm trying to add a new parallel test case in DeletionVectorsMaintainerTest
@Test
    void testParallelNotifyNewDeletionAndWriteIndex() {
        DeletionVectorsMaintainer.Factory factory =
                new DeletionVectorsMaintainer.Factory(fileHandler);
        DeletionVectorsMaintainer dvMaintainer =
                factory.createOrRestore(null, BinaryRow.EMPTY_ROW, 0);

        ThreadPoolExecutor threadPool = ThreadPoolUtils.createCachedThreadPool(2, "dv-");
        Runnable delete = () -> {
            for (int i = 0; i < 1000; i++) {
                dvMaintainer.notifyNewDeletion("f" + i, i);
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        };

        Runnable compact = () -> {
            for (int i = 0; i < 100; i++) {
                dvMaintainer.writeDeletionVectorsIndex();
                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        };

        Future<?> deleteFuture = threadPool.submit(delete);
        Future<?> compactFuture = threadPool.submit(compact);

        try {
            compactFuture.get();
            deleteFuture.get();
        } catch (ExecutionException | InterruptedException e) {
            throw new RuntimeException(e);
        }

        dvMaintainer.notifyNewDeletion("f1001", 1001);
        List<IndexFileMeta> indexFileMetas = dvMaintainer.writeDeletionVectorsIndex();
        Assert.assertEquals(1, indexFileMetas.size());
        Assert.assertEquals(1001, indexFileMetas.get(0).rowCount());
    }

What doesn't meet your expectations?

  1. Not sure if I misunderstand the whole sink process, it seems that the data writing(traverse WriteBuffer and writing) and async compact task running concurrently, so the it might caused that data writing thread modify the deletionVectors of DeletionVectorsMaintainer while the compact thread is traversing the deletionVectors and write to index, and then it leads to CME problem.

  2. if the problem above all mentioned really exists, will we keep the DeletionVectorsMaintainer as a thread-safe maintainer, or change the type of deletionVectors, e.g. queue?

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@stayrascal stayrascal added the bug Something isn't working label Nov 25, 2024
@melin
Copy link
Contributor

melin commented Nov 25, 2024

flink cdc pipeline write paimon

java.io.IOException: Could not perform checkpoint 54 for operator Sink Writer: rt_ods_1825759191045992449 (1/1)#1.
	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1275) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:118) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) ~[flink-dist-1.18.1.jar:1.18.1]
	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_242]
Caused by: java.io.IOException: java.util.ConcurrentModificationException
	at org.apache.flink.cdc.connectors.paimon.sink.v2.StoreSinkWriteImpl.prepareCommit(StoreSinkWriteImpl.java:194) ~[flink-cdc-pipeline-connector-paimon-3.1-SNAPSHOT.jar:3.1-SNAPSHOT]
	at org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonWriter.prepareCommit(PaimonWriter.java:144) ~[flink-cdc-pipeline-connector-paimon-3.1-SNAPSHOT.jar:3.1-SNAPSHOT]
	at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.emitCommittables(SinkWriterOperator.java:199) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.prepareSnapshotPreBarrier(SinkWriterOperator.java:169) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.cdc.runtime.operators.sink.DataSinkWriterOperator.prepareSnapshotPreBarrier(DataSinkWriterOperator.java:202) ~[flink-cdc-dist-3.1-SNAPSHOT.jar:3.1-SNAPSHOT]
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:322) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$15(StreamTask.java:1318) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1306) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1263) ~[flink-dist-1.18.1.jar:1.18.1]
	... 22 more
Caused by: java.util.ConcurrentModificationException
	at java.util.HashMap$HashIterator.nextNode(HashMap.java:1445) ~[?:1.8.0_242]
	at java.util.HashMap$EntryIterator.next(HashMap.java:1479) ~[?:1.8.0_242]
	at java.util.HashMap$EntryIterator.next(HashMap.java:1477) ~[?:1.8.0_242]
	at org.apache.paimon.deletionvectors.DeletionVectorIndexFileWriter.tryWriter(DeletionVectorIndexFileWriter.java:78) ~[flink-cdc-pipeline-connector-paimon-3.1-SNAPSHOT.jar:3.1-SNAPSHOT]
	at org.apache.paimon.deletionvectors.DeletionVectorIndexFileWriter.write(DeletionVectorIndexFileWriter.java:68) ~[flink-cdc-pipeline-connector-paimon-3.1-SNAPSHOT.jar:3.1-SNAPSHOT]
	at org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.write(DeletionVectorsIndexFile.java:154) ~[flink-cdc-pipeline-connector-paimon-3.1-SNAPSHOT.jar:3.1-SNAPSHOT]
	at org.apache.paimon.index.IndexFileHandler.writeDeletionVectorsIndex(IndexFileHandler.java:293) ~[flink-cdc-pipeline-connector-paimon-3.1-SNAPSHOT.jar:3.1-SNAPSHOT]
	at org.apache.paimon.deletionvectors.DeletionVectorsMaintainer.writeDeletionVectorsIndex(DeletionVectorsMaintainer.java:116) ~[flink-cdc-pipeline-connector-paimon-3.1-SNAPSHOT.jar:3.1-SNAPSHOT]
	at org.apache.paimon.compact.CompactDeletionFile.generateFiles(CompactDeletionFile.java:45) ~[flink-cdc-pipeline-connector-paimon-3.1-SNAPSHOT.jar:3.1-SNAPSHOT]
	at org.apache.paimon.compact.CompactDeletionFile$LazyCompactDeletionFile.getOrCompute(CompactDeletionFile.java:121) ~[flink-cdc-pipeline-connector-paimon-3.1-SNAPSHOT.jar:3.1-SNAPSHOT]
	at org.apache.paimon.operation.AbstractFileStoreWrite.prepareCommit(AbstractFileStoreWrite.java:217) ~[flink-cdc-pipeline-connector-paimon-3.1-SNAPSHOT.jar:3.1-SNAPSHOT]
	at org.apache.paimon.operation.MemoryFileStoreWrite.prepareCommit(MemoryFileStoreWrite.java:149) ~[flink-cdc-pipeline-connector-paimon-3.1-SNAPSHOT.jar:3.1-SNAPSHOT]
	at org.apache.paimon.table.sink.TableWriteImpl.prepareCommit(TableWriteImpl.java:253) ~[flink-cdc-pipeline-connector-paimon-3.1-SNAPSHOT.jar:3.1-SNAPSHOT]
	at org.apache.flink.cdc.connectors.paimon.sink.v2.StoreSinkWriteImpl.prepareCommit(StoreSinkWriteImpl.java:189) ~[flink-cdc-pipeline-connector-paimon-3.1-SNAPSHOT.jar:3.1-SNAPSHOT]
	at org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonWriter.prepareCommit(PaimonWriter.java:144) ~[flink-cdc-pipeline-connector-paimon-3.1-SNAPSHOT.jar:3.1-SNAPSHOT]
	at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.emitCommittables(SinkWriterOperator.java:199) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.prepareSnapshotPreBarrier(SinkWriterOperator.java:169) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.cdc.runtime.operators.sink.DataSinkWriterOperator.prepareSnapshotPreBarrier(DataSinkWriterOperator.java:202) ~[flink-cdc-dist-3.1-SNAPSHOT.jar:3.1-SNAPSHOT]
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:322) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$15(StreamTask.java:1318) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1306) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1263) ~[flink-dist-1.18.1.jar:1.18.1]

@JingsongLi
Copy link
Contributor

JingsongLi commented Nov 25, 2024

In TableWriteImpl.prepareCommit, the waitCompaction should be true, in this way, only one thread will access the deletion vectors maintainer.

I think this is a bug in flink-cdc.

@stayrascal
Copy link
Contributor Author

In TableWriteImpl.prepareCommit, the waitCompaction should be true, in this way, only one thread will access the deletion vectors maintainer.

I think this is a bug in flink-cdc.

Got it, thanks for explanation.

One more question, when the waitCompaction will be false? it seems only test cases will pass false. Or if it's better that the TableWriteImpl.prepareCommit check value of waitCompaction in case the caller pass wrong value?

@JingsongLi
Copy link
Contributor

@stayrascal If the waitCompaction is false, CompactDeletionFile should be another implementation. I think we can have a check here.

@stayrascal
Copy link
Contributor Author

@stayrascal If the waitCompaction is false, CompactDeletionFile should be another implementation. I think we can have a check here.

Got it, thanks.

@leonardBang
Copy link
Contributor

In TableWriteImpl.prepareCommit, the waitCompaction should be true, in this way, only one thread will access the deletion vectors maintainer.

I think this is a bug in flink-cdc.

Let's fix this issue in flink-cdc, tracked in FLINK-36790

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants