Skip to content

Commit

Permalink
Allow the server connection-level configuration be per-port as well a…
Browse files Browse the repository at this point in the history
…s global
  • Loading branch information
kerumai committed Jul 11, 2019
1 parent 7c3d855 commit f582d24
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.netflix.netty.common.channel.config;

import com.netflix.zuul.netty.server.ServerTimeout;
import com.netflix.zuul.netty.ssl.SslContextFactory;
import com.netflix.netty.common.proxyprotocol.StripUntrustedProxyHeadersHandler;
import com.netflix.netty.common.ssl.ServerSslConfig;
Expand All @@ -33,6 +34,7 @@ public class CommonChannelConfigKeys
public static final ChannelConfigKey<Boolean> preferProxyProtocolForClientIp = new ChannelConfigKey<>("preferProxyProtocolForClientIp", true);

public static final ChannelConfigKey<Integer> idleTimeout = new ChannelConfigKey<>("idleTimeout");
public static final ChannelConfigKey<ServerTimeout> serverTimeout = new ChannelConfigKey<>("serverTimeout");
public static final ChannelConfigKey<Integer> httpRequestReadTimeout = new ChannelConfigKey<>("httpRequestReadTimeout");
public static final ChannelConfigKey<Integer> maxConnections = new ChannelConfigKey<>("maxConnections");
public static final ChannelConfigKey<Integer> maxRequestsPerConnection = new ChannelConfigKey<>("maxRequestsPerConnection", 4000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ public class CommonContextKeys {
public static final String OVERRIDE_GZIP_REQUESTED = "overrideGzipRequested";

/* Netty-specific keys */
public static final String IS_NETTY_BUILD = "_is_netty_build";
public static final String NETTY_HTTP_REQUEST = "_netty_http_request";
public static final String NETTY_SERVER_CHANNEL_HANDLER_CONTEXT = "_netty_server_channel_handler_context";
public static final String REQ_BODY_DCS = "_request_body_dcs";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package com.netflix.zuul.netty.server;

import com.netflix.appinfo.ApplicationInfoManager;
import com.netflix.config.ChainedDynamicProperty;
import com.netflix.config.DynamicBooleanProperty;
import com.netflix.config.DynamicIntProperty;
import com.netflix.discovery.EurekaClient;
import com.netflix.netty.common.accesslog.AccessLogPublisher;
Expand Down Expand Up @@ -52,7 +54,6 @@ public abstract class BaseServerStartup
protected static final Logger LOG = LoggerFactory.getLogger(BaseServerStartup.class);

protected final ServerStatusManager serverStatusManager;
protected final ServerTimeout serverTimeout;
protected final Registry registry;
protected final DirectMemoryMonitor directMemoryMonitor;
protected final EventLoopGroupMetrics eventLoopGroupMetrics;
Expand All @@ -70,15 +71,14 @@ public abstract class BaseServerStartup


@Inject
public BaseServerStartup(ServerStatusManager serverStatusManager, ServerTimeout serverTimeout, FilterLoader filterLoader,
public BaseServerStartup(ServerStatusManager serverStatusManager, FilterLoader filterLoader,
SessionContextDecorator sessionCtxDecorator, FilterUsageNotifier usageNotifier,
RequestCompleteHandler reqCompleteHandler, Registry registry,
DirectMemoryMonitor directMemoryMonitor, EventLoopGroupMetrics eventLoopGroupMetrics,
EurekaClient discoveryClient, ApplicationInfoManager applicationInfoManager,
AccessLogPublisher accessLogPublisher)
{
this.serverStatusManager = serverStatusManager;
this.serverTimeout = serverTimeout;
this.registry = registry;
this.directMemoryMonitor = directMemoryMonitor;
this.eventLoopGroupMetrics = eventLoopGroupMetrics;
Expand All @@ -99,23 +99,27 @@ public Server server()
@PostConstruct
public void init() throws Exception
{
ChannelConfig channelDeps = new ChannelConfig();
addChannelDependencies(channelDeps);

ChannelGroup clientChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
clientConnectionsShutdown = new ClientConnectionsShutdown(clientChannels,
GlobalEventExecutor.INSTANCE, discoveryClient);

portsToChannelInitializers = choosePortsAndChannels(clientChannels, channelDeps);
portsToChannelInitializers = choosePortsAndChannels(clientChannels);

directMemoryMonitor.init();

server = new Server(portsToChannelInitializers, serverStatusManager, clientConnectionsShutdown, eventLoopGroupMetrics);
}

protected abstract Map<Integer, ChannelInitializer> choosePortsAndChannels(
ChannelGroup clientChannels,
ChannelConfig channelDependencies);
protected abstract Map<Integer, ChannelInitializer> choosePortsAndChannels(ChannelGroup clientChannels);

protected ChannelConfig defaultChannelDependencies(String portName)
{
ChannelConfig channelDependencies = new ChannelConfig();
addChannelDependencies(channelDependencies, portName);
return channelDependencies;
}

protected void addChannelDependencies(ChannelConfig channelDeps) throws Exception
protected void addChannelDependencies(ChannelConfig channelDeps, String portName)
{
channelDeps.set(ZuulDependencyKeys.registry, registry);

Expand All @@ -136,26 +140,65 @@ protected void addChannelDependencies(ChannelConfig channelDeps) throws Exceptio

channelDeps.set(ZuulDependencyKeys.sslClientCertCheckChannelHandlerProvider, new NullChannelHandlerProvider());
channelDeps.set(ZuulDependencyKeys.rateLimitingChannelHandlerProvider, new NullChannelHandlerProvider());
}

directMemoryMonitor.init();
/**
* First looks for a property specific to the named port of the form - "server.${portName}.${propertySuffix}".
* If none found, then looks for a server-wide property of the form - "server.${propertySuffix}".
* If that is also not found, then returns the specified default value.
*
* @param portName
* @param propertySuffix
* @param defaultValue
* @return
*/
public static int chooseIntChannelProperty(String portName, String propertySuffix, int defaultValue)
{
String globalPropertyName = "server." + propertySuffix;
String portPropertyName = "server." + portName + "." + propertySuffix;
Integer value = new DynamicIntProperty(portPropertyName, -999).get();
if (value == -999) {
value = new DynamicIntProperty(globalPropertyName, -999).get();
if (value == -999) {
value = defaultValue;
}
}
return value;
}

public static boolean chooseBooleanChannelProperty(String portName, String propertySuffix, boolean defaultValue)
{
String globalPropertyName = "server." + propertySuffix;
String portPropertyName = "server." + portName + "." + propertySuffix;

Boolean value = new ChainedDynamicProperty.DynamicBooleanPropertyThatSupportsNull(portPropertyName, null).get();
if (value == null) {
value = new DynamicBooleanProperty(globalPropertyName, defaultValue).getDynamicProperty().getBoolean();
if (value == null) {
value = defaultValue;
}
}
return value;
}

public static ChannelConfig defaultChannelConfig(ServerTimeout serverTimeout)
protected ChannelConfig defaultChannelConfig(String portName)
{
ChannelConfig config = new ChannelConfig();

config.add(new ChannelConfigValue(CommonChannelConfigKeys.maxConnections,
new DynamicIntProperty("server.connection.max", 20000).get()));
chooseIntChannelProperty(portName, "connection.max", 20000)));
config.add(new ChannelConfigValue(CommonChannelConfigKeys.maxRequestsPerConnection,
new DynamicIntProperty("server.connection.max.requests", 20000).get()));
chooseIntChannelProperty(portName, "connection.max.requests", 20000)));
config.add(new ChannelConfigValue(CommonChannelConfigKeys.maxRequestsPerConnectionInBrownout,
new DynamicIntProperty("server.connection.max.requests.brownout", CommonChannelConfigKeys.maxRequestsPerConnectionInBrownout.defaultValue()).get()));
chooseIntChannelProperty(portName, "connection.max.requests.brownout", CommonChannelConfigKeys.maxRequestsPerConnectionInBrownout.defaultValue())));
config.add(new ChannelConfigValue(CommonChannelConfigKeys.connectionExpiry,
new DynamicIntProperty("server.connection.expiry", CommonChannelConfigKeys.connectionExpiry.defaultValue()).get()));
config.add(new ChannelConfigValue(CommonChannelConfigKeys.idleTimeout,
serverTimeout.connectionIdleTimeout()));
chooseIntChannelProperty(portName, "connection.expiry", CommonChannelConfigKeys.connectionExpiry.defaultValue())));
config.add(new ChannelConfigValue(CommonChannelConfigKeys.httpRequestReadTimeout,
new DynamicIntProperty("server.http.request.read.timeout", 5000).get()));
chooseIntChannelProperty(portName, "http.request.read.timeout", 5000)));

int connectionIdleTimeout = chooseIntChannelProperty(portName, "connection.idle.timeout", 65000);
config.add(new ChannelConfigValue(CommonChannelConfigKeys.idleTimeout, connectionIdleTimeout));
config.add(new ChannelConfigValue(CommonChannelConfigKeys.serverTimeout, new ServerTimeout(connectionIdleTimeout)));

// For security, default to NEVER allowing XFF/Proxy headers from client.
config.add(new ChannelConfigValue(CommonChannelConfigKeys.allowProxyHeadersWhen, StripUntrustedProxyHeadersHandler.AllowWhen.NEVER));
Expand All @@ -164,25 +207,26 @@ public static ChannelConfig defaultChannelConfig(ServerTimeout serverTimeout)
config.set(CommonChannelConfigKeys.preferProxyProtocolForClientIp, true);

config.add(new ChannelConfigValue(CommonChannelConfigKeys.connCloseDelay,
new DynamicIntProperty("server.connection.close.delay", 10).get()));
chooseIntChannelProperty(portName, "connection.close.delay", 10)));

return config;
}

public static void addHttp2DefaultConfig(ChannelConfig config) {
public static void addHttp2DefaultConfig(ChannelConfig config, String portName)
{
config.add(new ChannelConfigValue(CommonChannelConfigKeys.maxConcurrentStreams,
new DynamicIntProperty("server.http2.max.concurrent.streams", CommonChannelConfigKeys.maxConcurrentStreams.defaultValue()).get()));
chooseIntChannelProperty(portName, "http2.max.concurrent.streams", CommonChannelConfigKeys.maxConcurrentStreams.defaultValue())));
config.add(new ChannelConfigValue(CommonChannelConfigKeys.initialWindowSize,
new DynamicIntProperty("server.http2.initialwindowsize", CommonChannelConfigKeys.initialWindowSize.defaultValue()).get()));
chooseIntChannelProperty(portName, "http2.initialwindowsize", CommonChannelConfigKeys.initialWindowSize.defaultValue())));
config.add(new ChannelConfigValue(CommonChannelConfigKeys.maxHttp2HeaderTableSize,
new DynamicIntProperty("server.http2.maxheadertablesize", 65536).get()));
chooseIntChannelProperty(portName, "http2.maxheadertablesize", 65536)));
config.add(new ChannelConfigValue(CommonChannelConfigKeys.maxHttp2HeaderListSize,
new DynamicIntProperty("server.http2.maxheaderlistsize", 32768).get()));
chooseIntChannelProperty(portName, "http2.maxheaderlistsize", 32768)));

// Override this to a lower value, as we'll be using ELB TCP listeners for h2, and therefore the connection
// is direct from each device rather than shared in an ELB pool.
config.add(new ChannelConfigValue(CommonChannelConfigKeys.maxRequestsPerConnection,
new DynamicIntProperty("server.connection.max.requests", 4000).get()));
chooseIntChannelProperty(portName, "connection.max.requests", 4000)));
}

protected void logPortConfigured(int port, ServerSslConfig serverSslConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,7 @@
import com.netflix.netty.common.accesslog.AccessLogPublisher;
import com.netflix.netty.common.channel.config.ChannelConfig;
import com.netflix.netty.common.channel.config.CommonChannelConfigKeys;
import com.netflix.netty.common.metrics.EventLoopGroupMetrics;
import com.netflix.netty.common.metrics.HttpBodySizeRecordingChannelHandler;
import com.netflix.netty.common.metrics.HttpMetricsChannelHandler;
import com.netflix.netty.common.metrics.PerEventLoopMetricsChannelHandler;
import com.netflix.netty.common.metrics.ServerChannelMetrics;
import com.netflix.netty.common.metrics.*;
import com.netflix.netty.common.proxyprotocol.ElbProxyProtocolChannelHandler;
import com.netflix.netty.common.proxyprotocol.StripUntrustedProxyHeadersHandler;
import com.netflix.netty.common.status.ServerStatusHeaderHandler;
Expand All @@ -39,18 +35,18 @@
import com.netflix.zuul.RequestCompleteHandler;
import com.netflix.zuul.context.SessionContextDecorator;
import com.netflix.zuul.filters.ZuulFilter;
import com.netflix.zuul.filters.passport.InboundPassportStampingFilter;
import com.netflix.zuul.filters.passport.OutboundPassportStampingFilter;
import com.netflix.zuul.message.ZuulMessage;
import com.netflix.zuul.message.http.HttpRequestMessage;
import com.netflix.zuul.message.http.HttpResponseMessage;
import com.netflix.zuul.netty.insights.PassportLoggingHandler;
import com.netflix.zuul.netty.insights.PassportStateHttpServerHandler;
import com.netflix.zuul.netty.insights.PassportStateServerHandler;
import com.netflix.zuul.filters.passport.InboundPassportStampingFilter;
import com.netflix.zuul.filters.passport.OutboundPassportStampingFilter;
import com.netflix.zuul.netty.filter.FilterRunner;
import com.netflix.zuul.netty.filter.ZuulEndPointRunner;
import com.netflix.zuul.netty.filter.ZuulFilterChainHandler;
import com.netflix.zuul.netty.filter.ZuulFilterChainRunner;
import com.netflix.zuul.netty.insights.PassportLoggingHandler;
import com.netflix.zuul.netty.insights.PassportStateHttpServerHandler;
import com.netflix.zuul.netty.insights.PassportStateServerHandler;
import com.netflix.zuul.netty.server.ssl.SslHandshakeInfoHandler;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
Expand All @@ -61,16 +57,12 @@
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;

import java.util.List;
import java.util.concurrent.TimeUnit;

import static com.netflix.zuul.passport.PassportState.FILTERS_INBOUND_END;
import static com.netflix.zuul.passport.PassportState.FILTERS_INBOUND_START;
import static com.netflix.zuul.passport.PassportState.FILTERS_OUTBOUND_END;
import static com.netflix.zuul.passport.PassportState.FILTERS_OUTBOUND_START;
import static com.netflix.zuul.passport.PassportState.*;

/**
* User: Mike Smith
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,24 @@
package com.netflix.zuul.netty.server;

import com.netflix.config.CachedDynamicIntProperty;

import javax.inject.Singleton;

@Singleton
public class ServerTimeout
{
private static CachedDynamicIntProperty SERVER_CONN_IDLE_TIMEOUT =
new CachedDynamicIntProperty("server.connection.idle.timeout", 65000);
private final int connectionIdleTimeout;

public ServerTimeout(int connectionIdleTimeout)
{
this.connectionIdleTimeout = connectionIdleTimeout;
}

public int connectionIdleTimeout()
{
return SERVER_CONN_IDLE_TIMEOUT.get();
return connectionIdleTimeout;
}

public int defaultRequestExpiryTimeout()
{
// Note this is the timeout for the inbound request to zuul, not for each outbound attempt.
// It needs to align with the inbound connection idle timeout and/or the ELB idle timeout. So we
// set it here to 1 sec less than that.

int idleTimeout = connectionIdleTimeout();
return idleTimeout > 1000 ? idleTimeout - 1000 : 1000;
return connectionIdleTimeout > 1000 ? connectionIdleTimeout - 1000 : 1000;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,31 +71,32 @@ enum ServerType {
private final SamplePushMessageSenderInitializer pushSenderInitializer;

@Inject
public SampleServerStartup(ServerStatusManager serverStatusManager, ServerTimeout serverTimeout, FilterLoader filterLoader,
public SampleServerStartup(ServerStatusManager serverStatusManager, FilterLoader filterLoader,
SessionContextDecorator sessionCtxDecorator, FilterUsageNotifier usageNotifier,
RequestCompleteHandler reqCompleteHandler, Registry registry,
DirectMemoryMonitor directMemoryMonitor, EventLoopGroupMetrics eventLoopGroupMetrics,
EurekaClient discoveryClient, ApplicationInfoManager applicationInfoManager,
AccessLogPublisher accessLogPublisher, PushConnectionRegistry pushConnectionRegistry,
SamplePushMessageSenderInitializer pushSenderInitializer) {
super(serverStatusManager, serverTimeout, filterLoader, sessionCtxDecorator, usageNotifier, reqCompleteHandler, registry,
super(serverStatusManager, filterLoader, sessionCtxDecorator, usageNotifier, reqCompleteHandler, registry,
directMemoryMonitor, eventLoopGroupMetrics, discoveryClient, applicationInfoManager,
accessLogPublisher);
this.pushConnectionRegistry = pushConnectionRegistry;
this.pushSenderInitializer = pushSenderInitializer;
}

@Override
protected Map<Integer, ChannelInitializer> choosePortsAndChannels(
ChannelGroup clientChannels,
ChannelConfig channelDependencies) {
protected Map<Integer, ChannelInitializer> choosePortsAndChannels(ChannelGroup clientChannels) {
Map<Integer, ChannelInitializer> portsToChannels = new HashMap<>();

String mainPortName = "main";
int port = new DynamicIntProperty("zuul.server.port.main", 7001).get();

ChannelConfig channelConfig = BaseServerStartup.defaultChannelConfig(serverTimeout);
ChannelConfig channelConfig = defaultChannelConfig(mainPortName);
int pushPort = new DynamicIntProperty("zuul.server.port.http.push", 7008).get();
ServerSslConfig sslConfig;
ChannelConfig channelDependencies = defaultChannelDependencies(mainPortName);

/* These settings may need to be tweaked depending if you're running behind an ELB HTTP listener, TCP listener,
* or directly on the internet.
*/
Expand Down Expand Up @@ -128,7 +129,7 @@ protected Map<Integer, ChannelInitializer> choosePortsAndChannels(
channelConfig.set(CommonChannelConfigKeys.serverSslConfig, sslConfig);
channelConfig.set(CommonChannelConfigKeys.sslContextFactory, new BaseSslContextFactory(registry, sslConfig));

addHttp2DefaultConfig(channelConfig);
addHttp2DefaultConfig(channelConfig, mainPortName);

portsToChannels.put(port, new Http2SslChannelInitializer(port, channelConfig, channelDependencies, clientChannels));
logPortConfigured(port, sslConfig);
Expand Down

0 comments on commit f582d24

Please sign in to comment.