-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Potential pattern of ignoring stranded RPC workers #5127
Comments
Looks like I say "at a minimum" because I think we should consider moving the cleanup to the dedicated |
Good finds @chewbranca! Clearly there is something broken here and we should fix it. Thanks for the detailed analysis!
For streams we already have a cleanup process spawned for every streaming request https://github.com/apache/couchdb/blob/main/src/fabric/src/fabric_streams.erl#L47. We should see why that doesn't clean up the workers and lets them timeout instead. Perhaps it's too cautious to avoid sending unnecessary kill messages? It tries to use the Recently we also added a kill_all command to aggregate kill commands per node, so instead of sending one per shard, it's one per node with a list of refs, maybe that's enough to keep the overhead of the extra kills fairly low. Another thing to keep it mind is that we don't always want to kill the workers, at least in the update docs path we specifically allow them to finish updating to reduce the pressure on the internal replicator.
Dreyfus doesn't use the streams facility, so likely has a slightly different way to doing cleanup. There is also the complication of replacements if they are spawned, those have to be cleaned up as well. However if we do a blanket Do you have a easily reproducible scenario to test it out? Start a 3 node cluster and issue a bunch of _all_docs calls? |
Having failed to reproduce this locally so moved on to investigate on a cluster where this error happens regularly. Found a cluster where However after a more thorough investigation, the reason for that is that design docs are updated often enough that the ddoc cache is quickly firing up and immediately kill the In general, we already have a fabric_streams mechanism to handle the coordinator being killed unexpectedly. However tracing the lifetime of the We submit the jobs: couchdb/src/fabric/src/fabric_view_all_docs.erl Lines 28 to 30 in d0cf54e
Then we spawn the cleanup process: couchdb/src/fabric/src/fabric_streams.erl Lines 49 to 51 in d0cf54e
Those may seem like they would happen almost immediately, however tracing the On the positive side, these workers don't actually do any work, they just wait in a receive clause, albeit with an open handle Db handle which is not too great. To fix this particular case we have to ensure the cleaner process starts even earlier. By the time the coordinator submits the jobs the cleanup process should be up and waiting with the node-ref tuples ready to clean them up. |
Previously, if the coordinator process is killed too quickly, before the stream worker cleanup process is spawned, remote workers may be left around waiting until the default 5 minute timeout expires. In order to reliably clean up processes in that state, need to start the cleaner process, with all the job references, before we start submitting them for execution. At first, it may seem impossible to monitor a process until after it's already spawned. That's true for regular processes, however rexi operates on plain references. For each process we spawn remotely we create a reference on the coordinator side, which we can then use to track that job. Those are just plain manually created references. Nothing stops us from creating them first, adding them to a cleaner process, and only then submitting them. That's exactly what this commit accomplishes: * Create a streams specific `fabric_streams:submit_jobs/4` function, which spawns the cleanup process early, generates worker references, and then submits the jobs. This way, all the existing streaming submit_jobs can be replaced easily in one line: fabric_util -> fabric_streams. * The cleanup process operates as previously: monitors the coordinator for exits, and fires off `kill_all` message to each node. * Create `rexi:cast_ref(...)` variants of `rexi:cast(...)` calls, where the caller specifies the references a new argument. This is what allows us to start the cleanup process before the even get submitted. Older calls can just be easily call into the `cast_ref` versions with their own created references. Since we added the new `rexi:cast_ref(...)` variants, ensure to add more test coverage, including the streaming logic as well. It's not 100% yet, but getting there. Also, the comments in `rexi.erl` were full of erldoc stanzas and we don't actually build erldocs anywhere, so replace them with something more helpful. The streaming protocol itself was never quite described anywhere, and it can take sometime to figure it out (at least it took me), so took the chance to also add a very basic, high level description of the message flow. Related: #5127 (comment)
Previously, if the coordinator process is killed too quickly, before the stream worker cleanup process is spawned, remote workers may be left around waiting until the default 5 minute timeout expires. In order to reliably clean up processes in that state, need to start the cleaner process, with all the job references, before we start submitting them for execution. At first, it may seem impossible to monitor a process until after it's already spawned. That's true for regular processes, however rexi operates on plain references. For each process we spawn remotely we create a reference on the coordinator side, which we can then use to track that job. Those are just plain manually created references. Nothing stops us from creating them first, adding them to a cleaner process, and only then submitting them. That's exactly what this commit accomplishes: * Create a streams specific `fabric_streams:submit_jobs/4` function, which spawns the cleanup process early, generates worker references, and then submits the jobs. This way, all the existing streaming submit_jobs calls can be replaced easily in one line: `fabric_util` -> `fabric_streams`. * The cleanup process operates as previously: monitors the coordinator for exits, and fires off `kill_all` message to each node when needed. * Create `rexi:cast_ref(...)` variants of `rexi:cast(...)` calls, where the caller specifies the references as arguments. This is what allows us to start the cleanup process before the jobs are even submitted. Older calls can just be transformed to call into the `cast_ref` versions with their own created references. Noticed that we don't need to keep the whole list of shards in memory in the cleaner process. For Q=64, N=3 that can add up to a decent blob of binary paths. We only need node names (atoms) and refs. So updated to use just a set of [{Node, Ref}, ...]. A set since in theory someone would add the same worker twice to it. Since we added the new `rexi:cast_ref(...)` variants, ensure to add more test coverage, including the streaming logic as well. It's not 100% yet, but getting there. Also, the comments in `rexi.erl` were full of erldoc stanzas and we don't actually build erldocs anywhere, so replace them with something more helpful. The streaming protocol itself was never quite described anywhere, and it can take sometime to figure it out (at least it took me), so took the chance to also add a very basic, high level description of the message flow. Related: #5127 (comment)
Previously, if the coordinator process is killed too quickly, before the stream worker cleanup process is spawned, remote workers may be left around waiting until the default 5 minute timeout expires. In order to reliably clean up processes in that state, need to start the cleaner process, with all the job references, before we start submitting them for execution. At first, it may seem impossible to monitor a process until after it's already spawned. That's true for regular processes, however rexi operates on plain references. For each process we spawn remotely we create a reference on the coordinator side, which we can then use to track that job. Those are just plain manually created references. Nothing stops us from creating them first, adding them to a cleaner process, and only then submitting them. That's exactly what this commit accomplishes: * Create a streams specific `fabric_streams:submit_jobs/4` function, which spawns the cleanup process early, generates worker references, and then submits the jobs. This way, all the existing streaming submit_jobs calls can be replaced easily in one line: `fabric_util` -> `fabric_streams`. * The cleanup process operates as previously: monitors the coordinator for exits, and fires off `kill_all` message to each node when needed. * Create `rexi:cast_ref(...)` variants of `rexi:cast(...)` calls, where the caller specifies the references as arguments. This is what allows us to start the cleanup process before the jobs are even submitted. Older calls can just be transformed to call into the `cast_ref` versions with their own created references. Noticed that we don't need to keep the whole list of shards in memory in the cleaner process. For Q=64, N=3 that can add up to a decent blob of binary paths. We only need node names (atoms) and refs. So updated to use just a set of [{Node, Ref}, ...]. A set since in theory someone would add the same worker twice to it. Since we added the new `rexi:cast_ref(...)` variants, ensure to add more test coverage, including the streaming logic as well. It's not 100% yet, but getting there. Also, the comments in `rexi.erl` were full of erldoc stanzas and we don't actually build erldocs anywhere, so replace them with something more helpful. The streaming protocol itself was never quite described anywhere, and it can take sometime to figure it out (at least it took me), so took the chance to also add a very basic, high level description of the message flow. Related: #5127 (comment)
Previously, if the coordinator process is killed too quickly, before the stream worker cleanup process is spawned, remote workers may be left around waiting until the default 5 minute timeout expires. In order to reliably clean up processes in that state, need to start the cleaner process, with all the job references, before we start submitting them for execution. At first, it may seem impossible to monitor a process until after it's already spawned. That's true for regular processes, however rexi operates on plain references. For each process we spawn remotely we create a reference on the coordinator side, which we can then use to track that job. Those are just plain manually created references. Nothing stops us from creating them first, adding them to a cleaner process, and only then submitting them. That's exactly what this commit accomplishes: * Create a streams specific `fabric_streams:submit_jobs/4` function, which spawns the cleanup process early, generates worker references, and then submits the jobs. This way, all the existing streaming submit_jobs calls can be replaced easily in one line: `fabric_util` -> `fabric_streams`. * The cleanup process operates as previously: monitors the coordinator for exits, and fires off `kill_all` message to each node when needed. * Create `rexi:cast_ref(...)` variants of `rexi:cast(...)` calls, where the caller specifies the references as arguments. This is what allows us to start the cleanup process before the jobs are even submitted. Older calls can just be transformed to call into the `cast_ref` versions with their own created references. Noticed that we don't need to keep the whole list of shards in memory in the cleaner process. For Q=64, N=3 that can add up to a decent blob of binary paths. We only need node names (atoms) and refs. So updated to use just a set of [{Node, Ref}, ...]. A set since in theory someone would add the same worker twice to it. Since we added the new `rexi:cast_ref(...)` variants, ensure to add more test coverage, including the streaming logic as well. It's not 100% yet, but getting there. Also, the comments in `rexi.erl` were full of erldoc stanzas and we don't actually build erldocs anywhere, so replace them with something more helpful. The streaming protocol itself was never quite described anywhere, and it can take sometime to figure it out (at least it took me), so took the chance to also add a very basic, high level description of the message flow. Related: #5127 (comment)
Previously, we performed cleanup only for specific errors such as `ddoc_updated`, and `insufficient_storage`. In case of other errors, or timeouts, there was a chance we would leak workers waiting to be either started or canceled. Those workers would then wait around until the 5 minute rexi timeout fires, and then they emit an error in the logs. It's not a big deal as that happens on errors only, and the processes are all waiting in receive, however, they do hold a Db handle open, so they can waste resources from that point of view. To fix that, this commit extends cleanup to other errors and timeouts. Moreover, in case of timeouts, we log fabric worker timeout errors. In order to do that we export the `fabric_streams` internal `#stream_acc` record to every `fabric_streams` user. That's a bit untidy, so make the timeout error return the defunct workers only, and so, we can avoid leaking the `#stream_acc` record outside the fabric_streams module. Related to #5127
So far in production we noticed most of the cases of couchdb/src/fabric/src/fabric_streams.erl Lines 168 to 171 in a2241d3
In this PR we improve cleanup and perform cleanup for all stream start errors, including timeouts. |
Previously, we performed cleanup only for specific errors such as `ddoc_updated`, and `insufficient_storage`. In case of other errors, or timeouts, there was a chance we would leak workers waiting to be either started or canceled. Those workers would then wait around until the 5 minute rexi timeout fires, and then they emit an error in the logs. It's not a big deal as that happens on errors only, and the processes are all waiting in receive, however, they do hold a Db handle open, so they can waste resources from that point of view. To fix that, this commit extends cleanup to other errors and timeouts. Moreover, in case of timeouts, we log fabric worker timeout errors. In order to do that we export the `fabric_streams` internal `#stream_acc` record to every `fabric_streams` user. That's a bit untidy, so make the timeout error return the defunct workers only, and so, we can avoid leaking the `#stream_acc` record outside the fabric_streams module. Related to #5127
Previously, we performed cleanup only for specific errors such as `ddoc_updated`, and `insufficient_storage`. In case of other errors, or timeouts, there was a chance we would leak workers waiting to be either started or canceled. Those workers would then wait around until the 5 minute rexi timeout fires, and then they emit an error in the logs. It's not a big deal as that happens on errors only, and the processes are all waiting in receive, however, they do hold a Db handle open, so they can waste resources from that point of view. To fix that, this commit extends cleanup to other errors and timeouts. Moreover, in case of timeouts, we log fabric worker timeout errors. In order to do that we export the `fabric_streams` internal `#stream_acc` record to every `fabric_streams` user. That's a bit untidy, so make the timeout error return the defunct workers only, and so, we can avoid leaking the `#stream_acc` record outside the fabric_streams module. Related to #5127
While trying to understand why we'd encounter
rexi:init_stream
errors in #5122 I believe I've identified a pattern present in at least four of the fabric RPC related modules. I thinkfabric_view_all_docs.erl
is a relatively straightforward representation of the issue, so I'm going to dissect the flow from there.Step 1) Instantiate RPC workers
We first create a set of RPC workers on the remote nodes as specified in
Shards
. This creates the handleWorkers0
with a set of references to all instantiated RPC workers.Step 2) create a set of monitors for all remote nodes
This creates a set of monitors on the relevant remote rexi processes for each of the nodes in question, not the workers themselves:
Step 3 handle
fabric_streams:start
in atry ... after .... end
blockThis invokes
fabric_streams:start
in atry
block so thatafter
we invokerexi_monitor:stop(RexiMon)
to clear out the monitors.Step 4) handle the inner case clauses of Step 3)
First off we have the successful case when the stream has been initialized:
The key thing of note here is that this clause performs a
fabric_streams:cleanup(Workers)
in theafter
clause of atry
block to ensure the remote workers are cleaned up after the job is done.However, the cleanup is performed against the subset of workers selected to perform the job in
Workers
, not the original full set of RPC workers instantiated and stored inWorkers0
.Next we have the two failure cases for this fabric operation. I'll lump them together as their behavior is identical:
Both of these failure clauses bubble up the error through the caller provided
Callback
, however, neither performs any cleanup of the workers. In the outerafter
clause we do arexi_monitor:stop(RexiMon)
but that's basically a no-op to kill the dedicated monitoring process.Core Issue
I think there are two things going on here we need to address:
fabric_streams:start
error modesI think this is fairly straightforward here, we should always ensure workers are cleaned up, especially when failures happen. Basically I think we should do a
fabric_streams:cleanup
on the workers in the outerafter
clause so they're always cleaned up.fabric_streams:cleanup(Workers)
it's onWorkers
instead ofWorkers0
This might be a bit more controversial, but I suspect one of the ways in which #5122 manifests is because we're not diligent about canceling RPC workers. It's possible that
fabric_streams:cleanup(Workers)
is sufficient, but I thinkfabric_streams:cleanup(Workers0)
against the full original set of workers is appropriate.The core rationale here is that
after
clauses do not trigger when a process is killed, leaving the possibility of remote zombied RPC workers. In theory the remote nodes'rexi_server
processes should get a process down notification? Again, perhaps that's sufficient, I'm personally inclined to do double bookkeeping in these types of scenarios, where we monitor from the RPC and also send out a kill signal from the coordinator side. What do folks think?Presence in the codebase
Right now I think I've identified this pattern in the four following fabric modules, although I've not done a full audit of the other modules so there may be more instances of this:
The text was updated successfully, but these errors were encountered: