Skip to content

Commit af686b8

Browse files
SingleThreadedEventLoop: add read-only iterator of channels (netty#12610)
Motivation: For servers there is a need for per eventloop "watchdog" timer that ticks for all eventloop channels. For moderately high number of connections, It is cheaper cpu-wise than per channel scheduling. Now this requires additional data structure to hold active channels - which is wasteful as It duplicates the one from SingleThreadedEventLoop (its practically useful impls - epoll, kqueue, nio). Modification: Add read-only iterator of eventloop channels exposed on SingleThreadedEventLoop as Iterator<Channel> registeredChannelsIterator() Result: SingleThreadedEventLoop channels may be iterated without additional redundant data structures Co-authored-by: Norman Maurer <[email protected]>
1 parent 4ebc4ee commit af686b8

File tree

9 files changed

+332
-0
lines changed

9 files changed

+332
-0
lines changed

testsuite/src/main/java/io/netty/testsuite/transport/AbstractSingleThreadEventLoopTest.java

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,18 +27,27 @@
2727
import io.netty.channel.local.LocalServerChannel;
2828
import io.netty.util.concurrent.EventExecutor;
2929
import io.netty.util.concurrent.Future;
30+
import io.netty.util.concurrent.Promise;
3031
import org.junit.jupiter.api.Test;
3132
import org.junit.jupiter.api.Timeout;
33+
import org.junit.jupiter.api.function.Executable;
3234

35+
import java.util.HashSet;
36+
import java.util.Iterator;
37+
import java.util.NoSuchElementException;
38+
import java.util.Set;
3339
import java.util.concurrent.Callable;
3440
import java.util.concurrent.CountDownLatch;
3541
import java.util.concurrent.RejectedExecutionException;
3642
import java.util.concurrent.TimeUnit;
3743

3844
import static org.junit.jupiter.api.Assertions.assertEquals;
3945
import static org.junit.jupiter.api.Assertions.assertFalse;
46+
import static org.junit.jupiter.api.Assertions.assertNotNull;
47+
import static org.junit.jupiter.api.Assertions.assertThrows;
4048
import static org.junit.jupiter.api.Assertions.assertTrue;
4149
import static org.junit.jupiter.api.Assertions.fail;
50+
import static org.junit.jupiter.api.Assumptions.assumeTrue;
4251

4352
public abstract class AbstractSingleThreadEventLoopTest {
4453

@@ -156,6 +165,134 @@ public void run() {
156165
assertRejection(loop);
157166
}
158167

168+
@Test
169+
@Timeout(value = 3000, unit = TimeUnit.MILLISECONDS)
170+
public void testChannelsIteratorEmpty() throws Exception {
171+
assumeTrue(supportsChannelIteration());
172+
EventLoopGroup group = newEventLoopGroup();
173+
final SingleThreadEventLoop loop = (SingleThreadEventLoop) group.next();
174+
try {
175+
runBlockingOn(loop, new Runnable() {
176+
@Override
177+
public void run() {
178+
final Iterator<Channel> iterator = loop.registeredChannelsIterator();
179+
180+
assertFalse(iterator.hasNext());
181+
assertThrows(NoSuchElementException.class, new Executable() {
182+
@Override
183+
public void execute() {
184+
iterator.next();
185+
}
186+
});
187+
}
188+
});
189+
} finally {
190+
group.shutdownGracefully();
191+
}
192+
}
193+
194+
@Test
195+
@Timeout(value = 3000, unit = TimeUnit.MILLISECONDS)
196+
public void testChannelsIterator() throws Exception {
197+
assumeTrue(supportsChannelIteration());
198+
EventLoopGroup group = newEventLoopGroup();
199+
final SingleThreadEventLoop loop = (SingleThreadEventLoop) group.next();
200+
try {
201+
final Channel ch1 = newChannel();
202+
final Channel ch2 = newChannel();
203+
loop.register(ch1).syncUninterruptibly();
204+
loop.register(ch2).syncUninterruptibly();
205+
assertEquals(2, registeredChannels(loop));
206+
207+
runBlockingOn(loop, new Runnable() {
208+
@Override
209+
public void run() {
210+
final Iterator<Channel> iterator = loop.registeredChannelsIterator();
211+
212+
assertTrue(iterator.hasNext());
213+
Channel actualCh1 = iterator.next();
214+
assertNotNull(actualCh1);
215+
216+
assertTrue(iterator.hasNext());
217+
Channel actualCh2 = iterator.next();
218+
assertNotNull(actualCh2);
219+
220+
Set<Channel> expected = new HashSet<Channel>(4);
221+
expected.add(ch1);
222+
expected.add(ch2);
223+
expected.remove(actualCh1);
224+
expected.remove(actualCh2);
225+
assertTrue(expected.isEmpty());
226+
227+
assertFalse(iterator.hasNext());
228+
assertThrows(NoSuchElementException.class, new Executable() {
229+
@Override
230+
public void execute() {
231+
iterator.next();
232+
}
233+
});
234+
}
235+
});
236+
} finally {
237+
group.shutdownGracefully();
238+
}
239+
}
240+
241+
@Test
242+
@Timeout(value = 3000, unit = TimeUnit.MILLISECONDS)
243+
public void testChannelsIteratorRemoveThrows() throws Exception {
244+
assumeTrue(supportsChannelIteration());
245+
EventLoopGroup group = newEventLoopGroup();
246+
final SingleThreadEventLoop loop = (SingleThreadEventLoop) group.next();
247+
248+
try {
249+
final Channel ch = newChannel();
250+
loop.register(ch).syncUninterruptibly();
251+
assertEquals(1, registeredChannels(loop));
252+
253+
runBlockingOn(loop, new Runnable() {
254+
@Override
255+
public void run() {
256+
assertThrows(UnsupportedOperationException.class, new Executable() {
257+
@Override
258+
public void execute() {
259+
loop.registeredChannelsIterator().remove();
260+
}
261+
});
262+
}
263+
});
264+
} finally {
265+
group.shutdownGracefully();
266+
}
267+
}
268+
269+
private static void runBlockingOn(EventLoop eventLoop, final Runnable action) {
270+
final Promise<Void> promise = eventLoop.newPromise();
271+
eventLoop.execute(new Runnable() {
272+
@Override
273+
public void run() {
274+
try {
275+
action.run();
276+
promise.setSuccess(null);
277+
} catch (Throwable t) {
278+
promise.tryFailure(t);
279+
}
280+
}
281+
});
282+
try {
283+
promise.await();
284+
} catch (InterruptedException e) {
285+
throw new RuntimeException(e);
286+
}
287+
Throwable cause = promise.cause();
288+
if (cause != null) {
289+
if (cause instanceof RuntimeException) {
290+
throw (RuntimeException) cause;
291+
}
292+
throw new RuntimeException(cause);
293+
}
294+
}
295+
159296
private static final Runnable NOOP = new Runnable() {
160297
@Override
161298
public void run() { }
@@ -170,6 +307,9 @@ private static void assertRejection(EventExecutor loop) {
170307
}
171308
}
172309

310+
protected boolean supportsChannelIteration() {
311+
return false;
312+
}
173313
protected abstract EventLoopGroup newEventLoopGroup();
174314
protected abstract Channel newChannel();
175315
protected abstract Class<? extends ServerChannel> serverChannelClass();

testsuite/src/main/java/io/netty/testsuite/transport/DefaultEventLoopTest.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,38 @@
1919
import io.netty.channel.DefaultEventLoopGroup;
2020
import io.netty.channel.EventLoopGroup;
2121
import io.netty.channel.ServerChannel;
22+
import io.netty.channel.SingleThreadEventLoop;
2223
import io.netty.channel.local.LocalChannel;
2324
import io.netty.channel.local.LocalServerChannel;
25+
import org.junit.jupiter.api.Test;
26+
import org.junit.jupiter.api.Timeout;
27+
import org.junit.jupiter.api.function.Executable;
28+
import java.util.concurrent.TimeUnit;
29+
30+
import static org.junit.jupiter.api.Assertions.assertThrows;
2431

2532
public class DefaultEventLoopTest extends AbstractSingleThreadEventLoopTest {
2633

34+
@Test
35+
@Timeout(value = 3000, unit = TimeUnit.MILLISECONDS)
36+
public void testChannelsIteratorNotSupported() throws Exception {
37+
EventLoopGroup group = newEventLoopGroup();
38+
final SingleThreadEventLoop loop = (SingleThreadEventLoop) group.next();
39+
try {
40+
final Channel ch = newChannel();
41+
loop.register(ch).syncUninterruptibly();
42+
43+
assertThrows(UnsupportedOperationException.class, new Executable() {
44+
@Override
45+
public void execute() throws Throwable {
46+
loop.registeredChannelsIterator();
47+
}
48+
});
49+
} finally {
50+
group.shutdownGracefully();
51+
}
52+
}
53+
2754
@Override
2855
protected EventLoopGroup newEventLoopGroup() {
2956
return new DefaultEventLoopGroup();

testsuite/src/main/java/io/netty/testsuite/transport/NioEventLoopTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@
2424

2525
public class NioEventLoopTest extends AbstractSingleThreadEventLoopTest {
2626

27+
@Override
28+
protected boolean supportsChannelIteration() {
29+
return true;
30+
}
31+
2732
@Override
2833
protected EventLoopGroup newEventLoopGroup() {
2934
return new NioEventLoopGroup();

transport-classes-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package io.netty.channel.epoll;
1717

18+
import io.netty.channel.Channel;
1819
import io.netty.channel.EventLoop;
1920
import io.netty.channel.EventLoopGroup;
2021
import io.netty.channel.EventLoopTaskQueueFactory;
@@ -34,6 +35,7 @@
3435
import io.netty.util.internal.logging.InternalLoggerFactory;
3536

3637
import java.io.IOException;
38+
import java.util.Iterator;
3739
import java.util.Queue;
3840
import java.util.concurrent.Executor;
3941
import java.util.concurrent.atomic.AtomicLong;
@@ -279,6 +281,16 @@ public int registeredChannels() {
279281
return channels.size();
280282
}
281283

284+
@Override
285+
public Iterator<Channel> registeredChannelsIterator() {
286+
assert inEventLoop();
287+
IntObjectMap<AbstractEpollChannel> ch = channels;
288+
if (ch.isEmpty()) {
289+
return ChannelsReadOnlyIterator.empty();
290+
}
291+
return new ChannelsReadOnlyIterator<AbstractEpollChannel>(ch.values());
292+
}
293+
282294
private long epollWait(long deadlineNanos) throws IOException {
283295
if (deadlineNanos == NONE) {
284296
return Native.epollWait(epollFd, events, timerFd,

transport-classes-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package io.netty.channel.kqueue;
1717

18+
import io.netty.channel.Channel;
1819
import io.netty.channel.EventLoop;
1920
import io.netty.channel.EventLoopGroup;
2021
import io.netty.channel.EventLoopTaskQueueFactory;
@@ -33,6 +34,7 @@
3334
import io.netty.util.internal.logging.InternalLoggerFactory;
3435

3536
import java.io.IOException;
37+
import java.util.Iterator;
3638
import java.util.Queue;
3739
import java.util.concurrent.Executor;
3840
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -353,6 +355,16 @@ public int registeredChannels() {
353355
return channels.size();
354356
}
355357

358+
@Override
359+
public Iterator<Channel> registeredChannelsIterator() {
360+
assert inEventLoop();
361+
IntObjectMap<AbstractKQueueChannel> ch = channels;
362+
if (ch.isEmpty()) {
363+
return ChannelsReadOnlyIterator.empty();
364+
}
365+
return new ChannelsReadOnlyIterator<AbstractKQueueChannel>(ch.values());
366+
}
367+
356368
@Override
357369
protected void cleanup() {
358370
try {

transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollEventLoopTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,11 @@
4040

4141
public class EpollEventLoopTest extends AbstractSingleThreadEventLoopTest {
4242

43+
@Override
44+
protected boolean supportsChannelIteration() {
45+
return true;
46+
}
47+
4348
@Override
4449
protected EventLoopGroup newEventLoopGroup() {
4550
return new EpollEventLoopGroup();

transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueEventLoopTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@
3030

3131
public class KQueueEventLoopTest extends AbstractSingleThreadEventLoopTest {
3232

33+
@Override
34+
protected boolean supportsChannelIteration() {
35+
return true;
36+
}
37+
3338
@Override
3439
protected EventLoopGroup newEventLoopGroup() {
3540
return new KQueueEventLoopGroup();

transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import io.netty.util.internal.SystemPropertyUtil;
2323
import io.netty.util.internal.UnstableApi;
2424

25+
import java.util.Iterator;
26+
import java.util.NoSuchElementException;
2527
import java.util.Queue;
2628
import java.util.concurrent.Executor;
2729
import java.util.concurrent.ThreadFactory;
@@ -154,4 +156,62 @@ public int pendingTasks() {
154156
public int registeredChannels() {
155157
return -1;
156158
}
159+
160+
/**
161+
* @return read-only iterator of active {@link Channel}s registered with this {@link EventLoop}.
162+
* The returned value is not guaranteed to be exact accurate and
163+
* should be viewed as a best effort. This method is expected to be called from within
164+
* event loop.
165+
* @throws UnsupportedOperationException if operation is not supported by implementation.
166+
*/
167+
@UnstableApi
168+
public Iterator<Channel> registeredChannelsIterator() {
169+
throw new UnsupportedOperationException("registeredChannelsIterator");
170+
}
171+
172+
protected static final class ChannelsReadOnlyIterator<T extends Channel> implements Iterator<Channel> {
173+
private final Iterator<T> channelIterator;
174+
175+
public ChannelsReadOnlyIterator(Iterable<T> channelIterable) {
176+
this.channelIterator =
177+
ObjectUtil.checkNotNull(channelIterable, "channelIterable").iterator();
178+
}
179+
180+
@Override
181+
public boolean hasNext() {
182+
return channelIterator.hasNext();
183+
}
184+
185+
@Override
186+
public Channel next() {
187+
return channelIterator.next();
188+
}
189+
190+
@Override
191+
public void remove() {
192+
throw new UnsupportedOperationException("remove");
193+
}
194+
195+
@SuppressWarnings("unchecked")
196+
public static <T> Iterator<T> empty() {
197+
return (Iterator<T>) EMPTY;
198+
}
199+
200+
private static final Iterator<Object> EMPTY = new Iterator<Object>() {
201+
@Override
202+
public boolean hasNext() {
203+
return false;
204+
}
205+
206+
@Override
207+
public Object next() {
208+
throw new NoSuchElementException();
209+
}
210+
211+
@Override
212+
public void remove() {
213+
throw new UnsupportedOperationException("remove");
214+
}
215+
};
216+
}
157217
}

0 commit comments

Comments
 (0)