Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-25920] Ignore duplicate EOI in SinkWriter [1.19] #25627

Open
wants to merge 8 commits into
base: release-1.19
Choose a base branch
from

Conversation

AHeise
Copy link
Contributor

@AHeise AHeise commented Nov 8, 2024

What is the purpose of the change

[FLINK-25920]

In case of a failure after final checkpoint, EOI is called twice. SinkWriter should ignore the second call to avoid emitting duplicate committables. This commit uses a union state to remember that EOI happened and suppress additional handling.

Brief change log

  • Improve sink test assertions
  • Straighten EOI handling in CommittableCollector
  • Ignore duplicate EOI in SinkWriter <- main fix
  • Improve logging in committable handling of the sink

Verifying this change

Please make sure both new and modified tests in this PR follow the conventions for tests defined in our code quality guide.

  • Added integration tests that covers the fix

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

UnifiedSinkMigrationITCase assumed that we also commit partial batches of committables. However, that was never the intend and fixed in FLINK-25920. This commit adjusts the test.

(cherry picked from commit 300e1ea)
In some parts of the sink, EOI is treated as checkpointId=null and in some checkpointId=MAX. The code of CheckpointCommittableManagerImpl implies that a null is valid however the serializer actually breaks then. In practice, checkpointId=MAX is used all the time by accident.

This commit replaces the nullable checkpointIds with a primitive long EOI=MAX, so that we always use the special value instead of null. The serializer already used that value, so it actually simplifies many places and doesn't break any existing state.

(cherry picked from commit c56def0)
Use the proper ObjectAssert as the base for CommittableSummaryAssert and CommittableWithLinageAssert.

(cherry picked from commit ad01d71)
The committer is supposed to commit all committables at once for a given subtask (so that it can potentially optimize committables on the fly). With UCs, we could potentially see notifyCheckpointCompleted before receiving all committables. The CommittableSummary was built and is used to detect that.

So far, we enforced completeness only for the most current committables belonging the respective checkpoint being completed. However, we should also enforce it to all subsumed committables. In fact, we probably implicitly do it but we have the extra code path which allows subsumed committables to be incomplete. This commit simplifies the code a bit by always enforcing completeness.

(cherry picked from commit 1d32f1b)
The stateful SinkWriterOperatorTestBase test cases used EOI to manipulate the state which was never clean. In particular, it also stored the input elements in state until EOI arrived and emitted them all at once. For state restoration tests, we emitted records after EOI arrived.

This commit changed the writer state completely to just capture the record count, which is much more realistic than storing actual payload. The tests now directly assert on the state instead of output.

This commit also introduces an adaptor for serializing basic types in the writer state and replaces the hard-to-maintain SinkAndSuppliers with an InspectableSink in the sink writer tests that require an abstraction on top of the different Sink flavors.

(cherry picked from commit 4217408)
In case of a failure after final checkpoint, EOI is called twice.

SinkWriter should ignore the second call to avoid emitting more dummy committables = transactional objects containing no data since no data can arrive when recovering from final checkpoint. The commit uses a boolean list state to remember if EOI has been emitted. The cases are discussed in code.

Since rescaling may still result in these dummy committables, the committer needs merge them into the CommittableCollector as these committables still need to be committed as systems like Kafka don't provide transactional isolation.

(cherry picked from commit 37e6724)
AbstractStreamingWriter send partition info twice on EOI. This commit ensures that we are not resending partition information even after restarting from a final checkpoint.

(cherry picked from commit 6d60f41)
@flinkbot
Copy link
Collaborator

flinkbot commented Nov 8, 2024

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants