Skip to content

Commit

Permalink
ensures no modifications during iteration
Browse files Browse the repository at this point in the history
since Processor/Subscription termination with `cancel` or `onError` leads to the following self-removal logic, it can happen that collection concurrent modification exception may appear. To avoid so we can copy all the entries and by doing so avoid any subsequent problems

Signed-off-by: Oleh Dokuka <[email protected]>
Signed-off-by: Oleh Dokuka <[email protected]>
  • Loading branch information
OlegDokuka committed Jun 7, 2021
1 parent 040278a commit b8c7c2e
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 7 deletions.
18 changes: 13 additions & 5 deletions rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import io.rsocket.keepalive.KeepAliveSupport;
import io.rsocket.lease.RequesterLeaseHandler;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
Expand Down Expand Up @@ -772,20 +774,26 @@ private void terminate(Throwable e) {
leaseHandler.dispose();

// Iterate explicitly to handle collisions with concurrent removals
for (IntObjectMap.PrimitiveEntry<Processor<Payload, Payload>> entry : receivers.entries()) {
final IntObjectMap<Processor<Payload, Payload>> receivers = this.receivers;
// copy to avoid collection modification from the foreach loop
final Collection<Processor<Payload, Payload>> receiversCopy =
new ArrayList<>(receivers.values());
for (Processor<Payload, Payload> handler : receiversCopy) {
try {
entry.value().onError(e);
handler.onError(e);
} catch (Throwable ex) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Dropped exception", ex);
}
}
}

// Iterate explicitly to handle collisions with concurrent removals
for (IntObjectMap.PrimitiveEntry<Subscription> entry : senders.entries()) {
final IntObjectMap<Subscription> senders = this.senders;
// copy to avoid collection modification from the foreach loop
final Collection<Subscription> sendersCopy = new ArrayList<>(senders.values());
for (Subscription subscription : sendersCopy) {
try {
entry.value().cancel();
subscription.cancel();
} catch (Throwable ex) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Dropped exception", ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import io.rsocket.internal.UnboundedProcessor;
import io.rsocket.lease.ResponderLeaseHandler;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
Expand Down Expand Up @@ -264,9 +266,12 @@ private void cleanup(Throwable e) {

private synchronized void cleanUpSendingSubscriptions() {
// Iterate explicitly to handle collisions with concurrent removals
for (IntObjectMap.PrimitiveEntry<Subscription> entry : sendingSubscriptions.entries()) {
final IntObjectMap<Subscription> sendingSubscriptions = this.sendingSubscriptions;
final Collection<Subscription> sendingSubscriptionsCopy =
new ArrayList<>(sendingSubscriptions.values());
for (Subscription subscription : sendingSubscriptionsCopy) {
try {
entry.value().cancel();
subscription.cancel();
} catch (Throwable ex) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Dropped exception", ex);
Expand Down

0 comments on commit b8c7c2e

Please sign in to comment.