Skip to content

Commit

Permalink
[INLONG-11614][Agent] Fix the signal leakage issue of AbstractSource …
Browse files Browse the repository at this point in the history
…class (#11615)
  • Loading branch information
justinwwhuang authored Dec 20, 2024
1 parent 535059e commit 6816f9f
Showing 1 changed file with 12 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ protected class SourceData {
new SynchronousQueue<>(),
new AgentThreadFactory("source-pool"));
protected OffsetProfile offsetProfile;
protected boolean sourceError = false;

@Override
public void init(InstanceProfile profile) {
Expand Down Expand Up @@ -200,10 +201,16 @@ private void doRun() {
* @return true if prepared ok
*/
private boolean prepareToRead() {
if (!doPrepareToRead()) {
try {
if (!doPrepareToRead()) {
return false;
}
return waitForPermit(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_READ_LINE_TOTAL_LEN);
} catch (Throwable e) {
LOGGER.error("prepare to read {} error:", instanceId, e);
sourceError = true;
return false;
}
return waitForPermit(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_READ_LINE_TOTAL_LEN);
}

/**
Expand Down Expand Up @@ -416,6 +423,9 @@ private void clearQueue(BlockingQueue<SourceData> queue) {

@Override
public boolean sourceFinish() {
if (sourceError) {
return true;
}
if (isRealTime) {
return false;
}
Expand Down

0 comments on commit 6816f9f

Please sign in to comment.