-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(indexer/server): implement a task buffer to improve efficiency in managing backpressure issues #519
feat(indexer/server): implement a task buffer to improve efficiency in managing backpressure issues #519
Conversation
) | ||
|
||
// TaskBuffer represents a fixed-size buffer, it operates as a FIFO buffer | ||
type TaskBuffer struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
type TaskBuffer struct { | |
This is a new data structure, please add sufficient test cases. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comprehensive test cases have been added in task_buffer_test.go
under directory/engine
} | ||
|
||
task := sw.tasks[0] | ||
sw.tasks = sw.tasks[1:] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sw.tasks = sw.tasks[1:] | |
The slice operation sw.tasks = sw.tasks[1:] changes the starting position of the slice, but the underlying array remains unchanged. If this operation is performed frequently (e.g., frequent task retrieval), the underlying array of the slice may grow indefinitely without releasing the used memory, potentially causing a memory leak. | |
So i think adding sw.tasks[0] = nil would be safer. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with your point, and I have made the change to address it.
internal/node/indexer/server.go
Outdated
return fmt.Errorf("an error occurred in the source: %w", err) | ||
} | ||
|
||
return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return nil | |
The return nil statement immediately after the if err != nil check is unnecessary because if err != nil is true, the function will return early. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved.
internal/node/indexer/server.go
Outdated
if err := s.handleTasks(ctx, tasks); err != nil { | ||
return fmt.Errorf("handle tasks: %w", err) | ||
if err := s.handleTasks(ctx, task); err != nil { | ||
errorChan <- fmt.Errorf("handle tasks error: %w", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
errorChan <- fmt.Errorf("handle tasks error: %w", err) | |
I think the function retryableFunc should return the error instead of sending it to the errorChan. This approach can make the retry mechanism clearer and more straightforward |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved.
…nncessary operations on error channel in Run()
Summary
Implement an adaptive "Task Buffer" that efficiently manages task flow between the producer (data-source) and the consumer (worker).
Task Buffer
Performance
Checklist
Does this PR introduce a breaking change?
Other information