Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36929][table] Add SQL connector for keyed savepoint data #26035

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

gaborgsomogyi
Copy link
Contributor

What is the purpose of the change

FLIP-496 has described the SQL connector for keyed savepoint data feature in-depth. In this PR I've added this functionality with automated tests. Documentation is intended to be added after the merge is order to have consumable PR pieces.

Brief change log

Add SQL connector for keyed savepoint data.

Verifying this change

Automated tests + manually on state files.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): yes
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? not documented

}

public void setPrivateLong(Long privateLong) {
this.privateLong = privateLong;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure what this example is showing . publicLong is defined but not referenced at all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The state contains this POJO serialized. It shows that it can read private members too.

@davidradl
Copy link
Contributor

It would really help me to review this is the documentation was included in this PR.

.stringType()
.noDefaultValue()
.withDescription(
"Defines the state backend which must be used for state reading.");
Copy link
Contributor

@davidradl davidradl Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the valid values should be included in the description, as per the Flip - I assume with ForsDB as well.
The Flip documents this as an Enum - shouldn't this be an EnumType?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is similar like state.backend.type so I would keep it that way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding more description makes sense though.


if (operatorUid.isPresent() == operatorUidHash.isPresent()) {
throw new IllegalArgumentException(
"Either operator uid or operator hash must be specified.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: operator hash -> operator UID hash

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@gaborgsomogyi
Copy link
Contributor Author

cc @Zakelly

@flinkbot
Copy link
Collaborator

flinkbot commented Jan 21, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build


@Override
public String factoryIdentifier() {
return "state";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the identifier should be 'savepoint' as the FLIP describes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great if also changing the class name to 'SavepointDynamicTableSourceFactory'

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah, leftover what I've forgotten. Fixing it...

Copy link
Contributor

@Zakelly Zakelly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this connector will be included in the state-processor-api.jar under the opt/ folder of the flink distribution, right?

@gaborgsomogyi
Copy link
Contributor Author

So this connector will be included in the state-processor-api.jar under the opt/ folder of the flink distribution, right?

Yeah, it's intended to be optional.

@gaborgsomogyi
Copy link
Contributor Author

Not sure why but some of the tests are flaky so rebased to the latest master. Hope this helps.

@gaborgsomogyi
Copy link
Contributor Author

Finally tests passed.

Copy link
Contributor

@Zakelly Zakelly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have briefly read this and overall LGTM. I'm not an expert of SQL types so it would be great if @fsk119 or @lvyanquan could take a look.

Comment on lines 246 to 276
private Method getMethod(String rowFieldName, Class objectClass) {
String upperRowFieldName =
rowFieldName.substring(0, 1).toUpperCase() + rowFieldName.substring(1);
Method method;
try {
String methodName = "get" + upperRowFieldName;
method = objectClass.getMethod(methodName);
} catch (NoSuchMethodException e2) {
try {
String methodName = "is" + upperRowFieldName;
method = objectClass.getMethod(methodName);
} catch (NoSuchMethodException e3) {
throw new RuntimeException(e3);
}
}
return method;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC this is using the java reflection to access the sub field of a pojo, right? And it is invoked each time an incoming instance should be converted, which is slow. Is it possible to maintain a class/method cache, or a converter, to avoid calling reflection on the fly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just added guava cache to have a limited entries solution.

@gaborgsomogyi
Copy link
Contributor Author

Did a rebase to latest master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants