Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

to #103, Support PB format serializer in Kafka Source #185

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

qidian99
Copy link

Signed-off-by:

to #103, Support PB format serializer in Kafka Source

@garyli1019
Copy link
Collaborator

hi @qidian99 , thanks for your contribution, looks like there is a conflict. Would you rebase the latest master?

@garyli1019 garyli1019 self-assigned this Nov 18, 2022
@qidian99 qidian99 force-pushed the issue/#103_support_pb_format_in_kafka branch 2 times, most recently from 2cb8936 to 40983a3 Compare November 18, 2022 04:12
@qidian99
Copy link
Author

qidian99 commented Nov 18, 2022

hi @qidian99 , thanks for your contribution, looks like there is a conflict. Would you rebase the latest master?

Done rebasing. @garyli1019

@qidian99 qidian99 force-pushed the issue/#103_support_pb_format_in_kafka branch from 40983a3 to bc60582 Compare November 18, 2022 06:45
Copy link
Collaborator

@garyli1019 garyli1019 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your contribution! left some comments

@@ -0,0 +1,970 @@

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add the license header?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is generated by protoc, which isn't dynamic. I removed it and made the test case directly read proto descriptor for job configuration.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's ok to put this class in our codebase, because if we generate this class from the test case, this class will be recognized by git and added to the git commit for those who run the test case. It's tricky to add this to .gitignore as well. Add the license header on an auto generated file is fine I believe.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, my concern didn't happen at all.

@@ -64,6 +65,14 @@ public static KafkaDeserializationSchema<Row> getDeserializationSchema(BitSailCo
.build());
}

if (StringUtils.equalsIgnoreCase(PB_DESERIALIZATION_SCHEMA_KEY, formatType)) {
try {
return new PbKafkaDeserializationSchema(configuration);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use the CountKafkaDeserializationSchemaWrapper like json, this wrapper is useful to run our regression test for streaming mode.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, PbKafkaDeserializationSchema is renamed to PbDeserializationSchema and extends Flink's deserialization schema

import java.util.List;

@Internal
public class PbKafkaDeserializationSchema implements KafkaDeserializationSchema<Row> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with the wrapper, this class could be more general which not only work for kafka. Can we move this to bitsail-components/bitsail-component-formats/bitsail-component-format-pb

Copy link
Author

@qidian99 qidian99 Nov 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried generalizing PbDeserializationSchema. Already created empty module named bitsail-component-format-pb. However, there are some problems:

  1. the wrapper takes in org.apache.flink.api.common.serialization.DeserializationSchema while deserialization schemas in bitsail-component-formats implements com.bytedance.bitsail.base.format.DeserializationSchema<I, O>
  2. v1 connectors are not using spi to get deserialization schemas, e.g., RocketMQDeserializationSchema
    image

IMHO, It is better to create another issue for bitsail-component-formats implementations

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, I agree with you here! Please feel free to create an issue if you're interested with the follow up.

}

@Override
public Row deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we can UT for this, I believe all the supported types should be covered

}, 0, 1, TimeUnit.SECONDS);
}

@SneakyThrows
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure why we need this SneakyThrows here, would you elaborate?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed. Thanks for pointing out!

@@ -0,0 +1,7 @@
syntax = "proto3";

message ProtoTest {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest we include all supported types here, including some complex type.

"update-mode": "append"
},
"proto": {
"descriptor": "",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This json sample should be runnable. It's ok to add the pb info from the integration test you write.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added base64 encoded descriptor and class name

}
}, 0, 1, TimeUnit.SECONDS);
}

@Test
public void testKafkaSource() throws Exception {
public void testKafkaSourceJsonFormat() throws Exception {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran this ITCase and one test was failed. looks like the kafka producer didn't stop when we switch to the second test.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added clean up logic on the producer service to stop creating records. All test cases pass now

image

@qidian99 qidian99 force-pushed the issue/#103_support_pb_format_in_kafka branch from 87e5e04 to a46ffa4 Compare November 22, 2022 04:35
@qidian99 qidian99 force-pushed the issue/#103_support_pb_format_in_kafka branch from a46ffa4 to 00fca9e Compare November 22, 2022 04:40
@BlockLiu
Copy link
Collaborator

👏 Great work!
I see there are dependency conflicts leading to compiling failure, would you update the PR and resolve this conflict?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: No status
Development

Successfully merging this pull request may close these issues.

3 participants