From ccb31f257f3cca133a6e2dc6625e4f4f705cdcc1 Mon Sep 17 00:00:00 2001 From: vernedeng Date: Tue, 26 Nov 2024 16:44:39 +0800 Subject: [PATCH 1/5] [INLONG-11546][SDK] Support async and sync report dirty data --- .../inlong/sdk/dirtydata/InlongSdkDirtySender.java | 10 ++++++++-- .../sort/base/dirty/sink/sdk/InlongSdkDirtySink.java | 2 +- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java index 80cc596c26..ee96b3b52e 100644 --- a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java +++ b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java @@ -74,8 +74,14 @@ public void init() throws Exception { log.info("init InlongSdkDirtySink successfully, target group={}, stream={}", inlongGroupId, inlongStreamId); } - public void sendDirtyMessage(DirtyMessageWrapper messageWrapper) throws InterruptedException { - dirtyDataQueue.offer(messageWrapper, 10, TimeUnit.SECONDS); + public void sendDirtyMessageSync(DirtyMessageWrapper messageWrapper) throws InterruptedException { + dirtyDataQueue.put(messageWrapper); + } + + public void sendDirtyMessageAsync(DirtyMessageWrapper messageWrapper) throws InterruptedException { + if(!dirtyDataQueue.offer(messageWrapper)) { + log.warn("the dirty data queue is full, you can increase the size of queue by configure maxCallbackSize"); + } } private void doSendDirtyMessage() { diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java index 8e692a4c10..daec1c0694 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java @@ -74,7 +74,7 @@ public void invoke(DirtyData dirtyData) throws Exception { .data(dirtyMessage) .build(); - dirtySender.sendDirtyMessage(wrapper); + dirtySender.sendDirtyMessageAsync(wrapper); } catch (Throwable t) { log.error("failed to send dirty message to inlong sdk", t); if (!options.isIgnoreSideOutputErrors()) { From 35b6b4c2592db6213b1121828a4beb4952e0e939 Mon Sep 17 00:00:00 2001 From: vernedeng Date: Tue, 26 Nov 2024 16:51:49 +0800 Subject: [PATCH 2/5] fix checkstyle --- .../org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java | 1 - 1 file changed, 1 deletion(-) diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java index ee96b3b52e..14fd9e697e 100644 --- a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java +++ b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java @@ -31,7 +31,6 @@ import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; @Slf4j @Builder From 8b23d2b79dc0ce80ee3612938ac825fbd2922a1b Mon Sep 17 00:00:00 2001 From: vernedeng Date: Tue, 26 Nov 2024 17:00:51 +0800 Subject: [PATCH 3/5] fix --- .../apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java index 14fd9e697e..4565a2947d 100644 --- a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java +++ b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java @@ -77,10 +77,12 @@ public void sendDirtyMessageSync(DirtyMessageWrapper messageWrapper) throws Inte dirtyDataQueue.put(messageWrapper); } - public void sendDirtyMessageAsync(DirtyMessageWrapper messageWrapper) throws InterruptedException { - if(!dirtyDataQueue.offer(messageWrapper)) { - log.warn("the dirty data queue is full, you can increase the size of queue by configure maxCallbackSize"); + public boolean sendDirtyMessageAsync(DirtyMessageWrapper messageWrapper) { + boolean result = dirtyDataQueue.offer(messageWrapper); + if (!result) { + log.warn("send dirty message async queue is full, you can increase maxCallbackSize"); } + return result; } private void doSendDirtyMessage() { From ebec0c4c12eedd8c32ccc2d77d97ce17c8b3611a Mon Sep 17 00:00:00 2001 From: vernedeng Date: Tue, 26 Nov 2024 17:02:09 +0800 Subject: [PATCH 4/5] fix --- .../org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java index 4565a2947d..9130da7484 100644 --- a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java +++ b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java @@ -73,7 +73,7 @@ public void init() throws Exception { log.info("init InlongSdkDirtySink successfully, target group={}, stream={}", inlongGroupId, inlongStreamId); } - public void sendDirtyMessageSync(DirtyMessageWrapper messageWrapper) throws InterruptedException { + public void sendDirtyMessage(DirtyMessageWrapper messageWrapper) throws InterruptedException { dirtyDataQueue.put(messageWrapper); } From a60ce3d9c2a0443b6c2b650829915698290b87e5 Mon Sep 17 00:00:00 2001 From: vernedeng Date: Tue, 26 Nov 2024 17:20:01 +0800 Subject: [PATCH 5/5] remove logs --- .../apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java index 9130da7484..74cfcffa21 100644 --- a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java +++ b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java @@ -78,11 +78,7 @@ public void sendDirtyMessage(DirtyMessageWrapper messageWrapper) throws Interrup } public boolean sendDirtyMessageAsync(DirtyMessageWrapper messageWrapper) { - boolean result = dirtyDataQueue.offer(messageWrapper); - if (!result) { - log.warn("send dirty message async queue is full, you can increase maxCallbackSize"); - } - return result; + return dirtyDataQueue.offer(messageWrapper); } private void doSendDirtyMessage() {