-
-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
(2.11) Internal: Intra-Process Queue fixes/improvements. #5895
base: main
Are you sure you want to change the base?
Conversation
@derekcollison @neilalexander I set this as a Draft because I would like to be sure that we want to go this direction, also, there will be a small conflict probably with one of @neilalexander branch/future PR with the jsapi rate limit. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Ivan. In general LGTM, but will let @neilalexander weigh in and do the formal review.
@derekcollison In other words, this is ready for review, but I did not want it to be merged until we all agree that this is the way to go and ear from all participants feedback. |
Understood. |
@neilalexander could you take a look and comment here? |
2625e32
to
4ae7551
Compare
@derekcollison @neilalexander I have actually just updated/change the code for this PR. Happy to discuss if there are some concerns. |
Go says that when using pool we should use pointer to a slice, not the slice itself to save on copy when getting/putting things back. That's what I tried to do, but I believe that the gain was defeated by the fact that I was storing it as a []T in the queue object, and more importantly, was returning the address of the local variable when putting back in the pool. The way it was used, worked, but was dangerous if queues were to be used differently. For instance with implementation before this change, this loop would fail: ``` var elts []int for i:=0;i<1000;i++{ q.push(i+1) expected += i+1 elts = q.pop() for _, v := range elts { sum += v } q.recycle(&elts) q.push(i+2) expected += i+2 elts = q.pop() q.push(i+3) expected += i+3 for _, v := range elts { sum += v } q.recycle(&elts) elts = q.pop() for _, v := range elts { sum += v } q.recycle(&elts) } if sum != expected { // ERROR! } ``` If we use different variables, such as `elts1 := q.pop()`, etc.. then it works. And again, the way it was used before did not cause issues. In this PR, I am using []T and no sync.Pool. Instead, we will store up to 5 slices in queue's specific "pool". In the server, I believe that almost all use-cases are 1P-1C or NP-1C, so even a single pooled slice may be enough, but since this is a generic IPQueue, we use this small pool. The other changes are to use the "in progress" count (and now size) when checking for the limits, which we were not. So after a `pop()`, since the queue is empty, the `push()` side would be able to store up to the limits while the receiver was processing the popped elements. I have added APIs to indicate progress when processing elements in the "for-loop" that goes over the `pop()` result. I have modified code that uses a queue with limit (only 2 so far) so that they use the new API. I have added benchmarks so we can evaluate future changes. Aside from the modifications to queue with limits, running the benchmark from original code to new shows a slight improvement. Of course, updating progress for each element popped() is slower than doing as a bulk, but it allows for fine-grained control on the queue limits. And when using with processing of JetStream messages, it is likely that the effect is not relevant. Signed-off-by: Ivan Kozlovic <[email protected]>
4ae7551
to
41e6029
Compare
Sorry for the delay, I'll take a look. |
@neilalexander I have cancelled the buid/pr travis runs to let the 2.10.21 runs go to completion. Will restart mine later. Also, since this code changes quite a bit code from main and then may not be "compatible" with 2.10, it may make future cherry-pick a bit more difficult for the next 2.10.x version? Not sure, but if this is something that we think may be an issue, maybe we keep this as Draft until we are closer to a 2.11? |
server/ipqueue.go
Outdated
} | ||
|
||
// Returns the current length of the queue. | ||
func (q *ipQueue[T]) len() int { | ||
q.Lock() | ||
defer q.Unlock() | ||
if q.elts == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was when I started to have q.elts
as a *[]T
, but not necessary anymore, return len(q.elts) - q.pos
would work even if q.elts
is nil
, so I could remove that.
Signed-off-by: Ivan Kozlovic <[email protected]>
ch chan struct{} | ||
elts []T | ||
pos int | ||
pool *sync.Pool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes me a bit uncomfortable, as the important secondary property of a sync.Pool
is that they can be automatically emptied by the GC, which isn't true of our own pool
implementation (or at the very least, not until we get weak pointer support in Go ~1.24).
As I see it, there are two possible solutions to the SA6002 problem with sync.Pool
and slices. The first is to either use a pointer to the slice header and reuse it, with the understanding that we will need to keep an eye on the underlying capacity to ensure we don't re-pool massive arrays (the max recycle size). The second is to use a pointer to a fixed-length array and slice it on-demand (like how the client.go
buffers now do). I don't suppose either will really make a huge difference in this case because the value from the pool will escape to the heap anyway by storing it in the ipQueue
struct.
But all else equal, I think it's a far better idea to stick with sync.Pool
for the GC properties alone.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, then I can get back to the first version of this PR (unfortunately, I pushed-force the changes on it) to use a *[]T
and go back to a sync.pool. For that to work, we need the pop() to return the *[]T and pass that to recycle(), otherwise, we would have the problem described in the PR description.
sz := q.calc(e) | ||
if q.msz > 0 && q.sz+sz > q.msz { | ||
if sz > 0 { | ||
// Take into account the "in progress" size. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good change and it may be worth breaking this out into its own PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, so this PR would just be about using *[]T all the way only? But then it means that the pop() would just return the *[]T and then the recycle() take that and update the in progress count and that's it. The next PR would introduce the in progress size and have pop() returns the *[]T, list Len, list size and basically be like it is in this PR.
InProgress int `json:"in_progress,omitempty"` | ||
Pending int `json:"pending"` | ||
InProgress int `json:"in_progress,omitempty"` | ||
Size uint64 `json:"size,omitempty"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reporting the Size
is also a good idea. Can we break this down into smaller PRs so we can handle & review them as smaller units?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then the 3rd PR would be to add the monitoring aspect of it. Correct?
Go says that when using pool we should use pointer to a slice, not the slice itself to save on copy when getting/putting things back.
That's what I tried to do, but I believe that the gain was defeated by the fact that I was storing it as a []T in the queue object, and more importantly, was returning the address of the local variable when putting back in the pool. The way it was used worked, but is dangerous if queues were to be use differently. For instance with implementation before this change, this in a loop would fail:
If we use different variables, such as
elts1 := q.pop()
, etc.. then it works. And again, the way it was used before did not cause issues.The changes here use a pointer to a
[]T
all the way.I have tried dummy queue implementations of simply using
[]T
all the way, including when putting it back to the pool, and the perf and number of allocs etc... does not seem to change regardless of what I was using. However, with the real queue implementation, it somehow changes, so sticking with*[T]
for now.The other changes are to use the "in progress" count (and now size) when checking for the limits, which we were not. So after a
pop()
, since the queue is empty, thepush()
side would be able to store up to the limits while the receiver was processing the popped elements.I have added APIs to indicate progress when processing elements in the "for-loop" that goes over the
pop()
result. I have modified code that uses a queue with limit (only 2 so far) so that they use the new API.I have added benchmarks so we can evaluate future changes. Aside from the modifications to queue with limits, running the benchmark from original code to new shows a slight improvement. Of course, updating progress for each element popped() is slower than doing as a bulk, but it allows for fine-grained control on the queue limits. And when using with processing of JetStream messages, it is likely that the effect is not relevant.
Signed-off-by: Ivan Kozlovic [email protected]