From ccb31f257f3cca133a6e2dc6625e4f4f705cdcc1 Mon Sep 17 00:00:00 2001 From: vernedeng Date: Tue, 26 Nov 2024 16:44:39 +0800 Subject: [PATCH] [INLONG-11546][SDK] Support async and sync report dirty data --- .../inlong/sdk/dirtydata/InlongSdkDirtySender.java | 10 ++++++++-- .../sort/base/dirty/sink/sdk/InlongSdkDirtySink.java | 2 +- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java index 80cc596c26..ee96b3b52e 100644 --- a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java +++ b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java @@ -74,8 +74,14 @@ public void init() throws Exception { log.info("init InlongSdkDirtySink successfully, target group={}, stream={}", inlongGroupId, inlongStreamId); } - public void sendDirtyMessage(DirtyMessageWrapper messageWrapper) throws InterruptedException { - dirtyDataQueue.offer(messageWrapper, 10, TimeUnit.SECONDS); + public void sendDirtyMessageSync(DirtyMessageWrapper messageWrapper) throws InterruptedException { + dirtyDataQueue.put(messageWrapper); + } + + public void sendDirtyMessageAsync(DirtyMessageWrapper messageWrapper) throws InterruptedException { + if(!dirtyDataQueue.offer(messageWrapper)) { + log.warn("the dirty data queue is full, you can increase the size of queue by configure maxCallbackSize"); + } } private void doSendDirtyMessage() { diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java index 8e692a4c10..daec1c0694 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java @@ -74,7 +74,7 @@ public void invoke(DirtyData dirtyData) throws Exception { .data(dirtyMessage) .build(); - dirtySender.sendDirtyMessage(wrapper); + dirtySender.sendDirtyMessageAsync(wrapper); } catch (Throwable t) { log.error("failed to send dirty message to inlong sdk", t); if (!options.isIgnoreSideOutputErrors()) {