From 77845e8b89abb3ca4112037f89ae6afdc713e441 Mon Sep 17 00:00:00 2001
From: Mrproliu <741550557@qq.com>
Date: Tue, 31 Oct 2023 14:09:37 +0800
Subject: [PATCH 1/3] Support collect OTLP traces
---
zipkin-server/pom.xml | 7 +
zipkin-server/receiver-otlp-trace/pom.xml | 37 +++++
.../server/receiver/otlp/OTLPTraceConfig.java | 122 +++++++++++++++++
.../receiver/otlp/OTLPTraceProvider.java | 126 ++++++++++++++++++
.../otlp/handler/OTLPTraceHandler.java | 33 +++++
...g.oap.server.library.module.ModuleProvider | 19 +++
.../receiver-zipkin-activemq/pom.xml | 8 +-
.../zipkin/activemq/ActiveMQHandler.java | 8 +-
.../activemq/ZipkinActiveMQProvider.java | 10 +-
zipkin-server/receiver-zipkin-core/pom.xml | 29 ++++
.../receiver/zipkin/core/SpanForwardCore.java | 36 +++++
.../zipkin/core/ZipkinReceiverCoreConfig.java | 53 ++++++++
.../core/ZipkinReceiverCoreProvider.java | 76 +++++++++++
...g.oap.server.library.module.ModuleProvider | 19 +++
zipkin-server/receiver-zipkin-http/pom.xml | 8 +-
.../http/ZipkinHTTPReceiverProvider.java | 8 +-
.../receiver/zipkin/http/ITHTTPReceiver.java | 19 +--
zipkin-server/receiver-zipkin-kafka/pom.xml | 8 +-
.../kafka/ZipkinKafkaReceiverProvider.java | 11 +-
.../zipkin/kafka/ITKafkaReceiver.java | 24 ++--
.../receiver-zipkin-rabbitmq/pom.xml | 8 +-
.../rabbitmq/ZipkinRabbitMQHandler.java | 6 +-
.../rabbitmq/ZipkinRabbitMQProvider.java | 10 +-
.../zipkin/rabbitmq/ITRabbitMQReceiver.java | 24 ++--
zipkin-server/receiver-zipkin-scribe/pom.xml | 8 +-
.../zipkin/scribe/ScribeSpanConsumer.java | 6 +-
.../zipkin/scribe/ZipkinScribeProvider.java | 11 +-
.../core/services/ZipkinConfigService.java | 6 -
zipkin-server/server-starter/pom.xml | 10 ++
.../src/main/resources/application.yml | 28 +++-
zipkin-server/storage-cassandra/pom.xml | 2 +-
.../dao/CassandraTagAutocompleteDAO.java | 13 +-
32 files changed, 662 insertions(+), 131 deletions(-)
create mode 100644 zipkin-server/receiver-otlp-trace/pom.xml
create mode 100644 zipkin-server/receiver-otlp-trace/src/main/java/zipkin/server/receiver/otlp/OTLPTraceConfig.java
create mode 100644 zipkin-server/receiver-otlp-trace/src/main/java/zipkin/server/receiver/otlp/OTLPTraceProvider.java
create mode 100644 zipkin-server/receiver-otlp-trace/src/main/java/zipkin/server/receiver/otlp/handler/OTLPTraceHandler.java
create mode 100755 zipkin-server/receiver-otlp-trace/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
create mode 100644 zipkin-server/receiver-zipkin-core/pom.xml
create mode 100644 zipkin-server/receiver-zipkin-core/src/main/java/zipkin/server/receiver/zipkin/core/SpanForwardCore.java
create mode 100644 zipkin-server/receiver-zipkin-core/src/main/java/zipkin/server/receiver/zipkin/core/ZipkinReceiverCoreConfig.java
create mode 100644 zipkin-server/receiver-zipkin-core/src/main/java/zipkin/server/receiver/zipkin/core/ZipkinReceiverCoreProvider.java
create mode 100755 zipkin-server/receiver-zipkin-core/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
diff --git a/zipkin-server/pom.xml b/zipkin-server/pom.xml
index f2bd95d2cf2..37d1099a76f 100644
--- a/zipkin-server/pom.xml
+++ b/zipkin-server/pom.xml
@@ -46,6 +46,8 @@
../skywalking/oap-server/server-core
../skywalking/oap-server/server-receiver-plugin/receiver-proto
../skywalking/oap-server/server-receiver-plugin/zipkin-receiver-plugin
+ ../skywalking/oap-server/server-receiver-plugin/otel-receiver-plugin
+ ../skywalking/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin
../skywalking/oap-server/server-cluster-plugin/cluster-standalone-plugin
../skywalking/oap-server/server-cluster-plugin/cluster-consul-plugin
../skywalking/oap-server/server-cluster-plugin/cluster-etcd-plugin
@@ -59,6 +61,9 @@
../skywalking/oap-server/server-testing
../skywalking/oap-server/server-configuration/configuration-api
../skywalking/oap-server/ai-pipeline
+ ../skywalking/oap-server/analyzer/meter-analyzer
+ ../skywalking/oap-server/analyzer/log-analyzer
+ ../skywalking/oap-server/analyzer/agent-analyzer
server-core
server-starter
@@ -73,6 +78,8 @@
telemetry-zipkin
zipkin-dependency
zipkin-storage-ext
+ receiver-zipkin-core
+ receiver-otlp-trace
diff --git a/zipkin-server/receiver-otlp-trace/pom.xml b/zipkin-server/receiver-otlp-trace/pom.xml
new file mode 100644
index 00000000000..02219d319d9
--- /dev/null
+++ b/zipkin-server/receiver-otlp-trace/pom.xml
@@ -0,0 +1,37 @@
+
+
+ 4.0.0
+
+ zipkin-server-parent
+ io.zipkin
+ 2.24.4-SNAPSHOT
+
+
+ receiver-otlp-trace
+ OTLP Trace Receiver
+
+
+
+ org.apache.skywalking
+ otel-receiver-plugin
+ ${skywalking.version}
+
+
+ org.apache.skywalking
+ skywalking-sharing-server-plugin
+
+
+ org.apache.skywalking
+ meter-analyzer
+
+
+ org.apache.skywalking
+ log-analyzer
+
+
+
+
+
+
\ No newline at end of file
diff --git a/zipkin-server/receiver-otlp-trace/src/main/java/zipkin/server/receiver/otlp/OTLPTraceConfig.java b/zipkin-server/receiver-otlp-trace/src/main/java/zipkin/server/receiver/otlp/OTLPTraceConfig.java
new file mode 100644
index 00000000000..d61d659a035
--- /dev/null
+++ b/zipkin-server/receiver-otlp-trace/src/main/java/zipkin/server/receiver/otlp/OTLPTraceConfig.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2015-2023 The OpenZipkin Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package zipkin.server.receiver.otlp;
+
+import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+
+public class OTLPTraceConfig extends ModuleConfig {
+ private String gRPCHost;
+ /**
+ * Only setting the real port(not 0) makes the gRPC server online.
+ */
+ private int gRPCPort;
+ private int maxConcurrentCallsPerConnection;
+ private int maxMessageSize;
+ private int gRPCThreadPoolSize;
+ private int gRPCThreadPoolQueueSize;
+ private String authentication;
+ private boolean gRPCSslEnabled = false;
+ private String gRPCSslKeyPath;
+ private String gRPCSslCertChainPath;
+ private String gRPCSslTrustedCAsPath;
+
+ public String getGRPCHost() {
+ return gRPCHost;
+ }
+
+ public void setGRPCHost(String gRPCHost) {
+ this.gRPCHost = gRPCHost;
+ }
+
+ public int getGRPCPort() {
+ return gRPCPort;
+ }
+
+ public void setGRPCPort(int gRPCPort) {
+ this.gRPCPort = gRPCPort;
+ }
+
+ public int getMaxConcurrentCallsPerConnection() {
+ return maxConcurrentCallsPerConnection;
+ }
+
+ public void setMaxConcurrentCallsPerConnection(int maxConcurrentCallsPerConnection) {
+ this.maxConcurrentCallsPerConnection = maxConcurrentCallsPerConnection;
+ }
+
+ public int getMaxMessageSize() {
+ return maxMessageSize;
+ }
+
+ public void setMaxMessageSize(int maxMessageSize) {
+ this.maxMessageSize = maxMessageSize;
+ }
+
+ public int getGRPCThreadPoolSize() {
+ return gRPCThreadPoolSize;
+ }
+
+ public void setGRPCThreadPoolSize(int gRPCThreadPoolSize) {
+ this.gRPCThreadPoolSize = gRPCThreadPoolSize;
+ }
+
+ public int getGRPCThreadPoolQueueSize() {
+ return gRPCThreadPoolQueueSize;
+ }
+
+ public void setGRPCThreadPoolQueueSize(int gRPCThreadPoolQueueSize) {
+ this.gRPCThreadPoolQueueSize = gRPCThreadPoolQueueSize;
+ }
+
+ public String getAuthentication() {
+ return authentication;
+ }
+
+ public void setAuthentication(String authentication) {
+ this.authentication = authentication;
+ }
+
+ public boolean getGRPCSslEnabled() {
+ return gRPCSslEnabled;
+ }
+
+ public void setGRPCSslEnabled(boolean gRPCSslEnabled) {
+ this.gRPCSslEnabled = gRPCSslEnabled;
+ }
+
+ public String getGRPCSslKeyPath() {
+ return gRPCSslKeyPath;
+ }
+
+ public void setGRPCSslKeyPath(String gRPCSslKeyPath) {
+ this.gRPCSslKeyPath = gRPCSslKeyPath;
+ }
+
+ public String getGRPCSslCertChainPath() {
+ return gRPCSslCertChainPath;
+ }
+
+ public void setGRPCSslCertChainPath(String gRPCSslCertChainPath) {
+ this.gRPCSslCertChainPath = gRPCSslCertChainPath;
+ }
+
+ public String getGRPCSslTrustedCAsPath() {
+ return gRPCSslTrustedCAsPath;
+ }
+
+ public void setGRPCSslTrustedCAsPath(String gRPCSslTrustedCAsPath) {
+ this.gRPCSslTrustedCAsPath = gRPCSslTrustedCAsPath;
+ }
+}
diff --git a/zipkin-server/receiver-otlp-trace/src/main/java/zipkin/server/receiver/otlp/OTLPTraceProvider.java b/zipkin-server/receiver-otlp-trace/src/main/java/zipkin/server/receiver/otlp/OTLPTraceProvider.java
new file mode 100644
index 00000000000..c83500f7ed3
--- /dev/null
+++ b/zipkin-server/receiver-otlp-trace/src/main/java/zipkin/server/receiver/otlp/OTLPTraceProvider.java
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2015-2023 The OpenZipkin Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package zipkin.server.receiver.otlp;
+
+import org.apache.logging.log4j.util.Strings;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
+import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegisterImpl;
+import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+import org.apache.skywalking.oap.server.library.module.ModuleDefine;
+import org.apache.skywalking.oap.server.library.module.ModuleProvider;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
+import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
+import org.apache.skywalking.oap.server.library.server.ServerException;
+import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
+import org.apache.skywalking.oap.server.receiver.otel.OtelMetricReceiverConfig;
+import org.apache.skywalking.oap.server.receiver.otel.OtelMetricReceiverModule;
+import org.apache.skywalking.oap.server.receiver.otel.otlp.OpenTelemetryMetricRequestProcessor;
+import org.apache.skywalking.oap.server.receiver.otel.otlp.OpenTelemetryTraceHandler;
+import zipkin.server.receiver.otlp.handler.OTLPTraceHandler;
+
+public class OTLPTraceProvider extends ModuleProvider {
+ private OTLPTraceConfig moduleConfig;
+ private OpenTelemetryTraceHandler traceHandler;
+ private GRPCServer grpcServer;
+
+ @Override
+ public String name() {
+ return "zipkin";
+ }
+
+ @Override
+ public Class extends ModuleDefine> module() {
+ return OtelMetricReceiverModule.class;
+ }
+
+ @Override
+ public ConfigCreator extends ModuleConfig> newConfigCreator() {
+ return new ConfigCreator() {
+
+ @Override
+ public Class type() {
+ return OTLPTraceConfig.class;
+ }
+
+ @Override
+ public void onInitialized(OTLPTraceConfig initialized) {
+ moduleConfig = initialized;
+ }
+ };
+ }
+
+ @Override
+ public void prepare() throws ServiceNotProvidedException, ModuleStartException {
+ this.registerServiceImplementation(OpenTelemetryMetricRequestProcessor.class,
+ new OpenTelemetryMetricRequestProcessor(getManager(), new OtelMetricReceiverConfig()));
+ }
+
+ @Override
+ public void start() throws ServiceNotProvidedException, ModuleStartException {
+ GRPCHandlerRegister handlerRegister;
+ if (moduleConfig.getGRPCPort() > 0) {
+ if (moduleConfig.getGRPCSslEnabled()) {
+ grpcServer = new GRPCServer(
+ Strings.isBlank(moduleConfig.getGRPCHost()) ? "0.0.0.0" : moduleConfig.getGRPCHost(),
+ moduleConfig.getGRPCPort(),
+ moduleConfig.getGRPCSslCertChainPath(),
+ moduleConfig.getGRPCSslKeyPath(),
+ moduleConfig.getGRPCSslTrustedCAsPath()
+ );
+ } else {
+ grpcServer = new GRPCServer(
+ Strings.isBlank(moduleConfig.getGRPCHost()) ? "0.0.0.0" : moduleConfig.getGRPCHost(),
+ moduleConfig.getGRPCPort()
+ );
+ }
+ if (moduleConfig.getMaxMessageSize() > 0) {
+ grpcServer.setMaxMessageSize(moduleConfig.getMaxMessageSize());
+ }
+ if (moduleConfig.getMaxConcurrentCallsPerConnection() > 0) {
+ grpcServer.setMaxConcurrentCallsPerConnection(moduleConfig.getMaxConcurrentCallsPerConnection());
+ }
+ if (moduleConfig.getGRPCThreadPoolQueueSize() > 0) {
+ grpcServer.setThreadPoolQueueSize(moduleConfig.getGRPCThreadPoolQueueSize());
+ }
+ if (moduleConfig.getGRPCThreadPoolSize() > 0) {
+ grpcServer.setThreadPoolSize(moduleConfig.getGRPCThreadPoolSize());
+ }
+ grpcServer.initialize();
+
+ handlerRegister = new GRPCHandlerRegisterImpl(grpcServer);
+ } else {
+ handlerRegister = getManager().find(CoreModule.NAME).provider().getService(GRPCHandlerRegister.class);
+ }
+ traceHandler = new OTLPTraceHandler(handlerRegister, getManager());
+ traceHandler.active();
+ }
+
+ @Override
+ public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
+ if (grpcServer != null) {
+ try {
+ grpcServer.start();
+ } catch (ServerException e) {
+ throw new ModuleStartException(e.getMessage(), e);
+ }
+ }
+ }
+
+ @Override
+ public String[] requiredModules() {
+ return new String[0];
+ }
+}
diff --git a/zipkin-server/receiver-otlp-trace/src/main/java/zipkin/server/receiver/otlp/handler/OTLPTraceHandler.java b/zipkin-server/receiver-otlp-trace/src/main/java/zipkin/server/receiver/otlp/handler/OTLPTraceHandler.java
new file mode 100644
index 00000000000..ff5e3e8de9f
--- /dev/null
+++ b/zipkin-server/receiver-otlp-trace/src/main/java/zipkin/server/receiver/otlp/handler/OTLPTraceHandler.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2015-2023 The OpenZipkin Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package zipkin.server.receiver.otlp.handler;
+
+import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
+import org.apache.skywalking.oap.server.receiver.otel.otlp.OpenTelemetryTraceHandler;
+
+public class OTLPTraceHandler extends OpenTelemetryTraceHandler {
+ private final GRPCHandlerRegister register;
+ public OTLPTraceHandler(GRPCHandlerRegister register, ModuleManager manager) {
+ super(manager);
+ this.register = register;
+ }
+
+ @Override
+ public void active() throws ModuleStartException {
+ register.addHandler(this);
+ }
+}
diff --git a/zipkin-server/receiver-otlp-trace/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider b/zipkin-server/receiver-otlp-trace/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
new file mode 100755
index 00000000000..cff78c2087a
--- /dev/null
+++ b/zipkin-server/receiver-otlp-trace/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+zipkin.server.receiver.otlp.OTLPTraceProvider
\ No newline at end of file
diff --git a/zipkin-server/receiver-zipkin-activemq/pom.xml b/zipkin-server/receiver-zipkin-activemq/pom.xml
index b400b6ae2a5..2dda22ee0ea 100644
--- a/zipkin-server/receiver-zipkin-activemq/pom.xml
+++ b/zipkin-server/receiver-zipkin-activemq/pom.xml
@@ -19,16 +19,10 @@
io.zipkin
- zipkin-server-core
+ receiver-zipkin-core
${project.version}
-
- org.apache.skywalking
- zipkin-receiver-plugin
- ${skywalking.version}
-
-
org.apache.activemq
activemq-client
diff --git a/zipkin-server/receiver-zipkin-activemq/src/main/java/zipkin/server/receiver/zipkin/activemq/ActiveMQHandler.java b/zipkin-server/receiver-zipkin-activemq/src/main/java/zipkin/server/receiver/zipkin/activemq/ActiveMQHandler.java
index d72de0d9c9a..dea971ecaea 100644
--- a/zipkin-server/receiver-zipkin-activemq/src/main/java/zipkin/server/receiver/zipkin/activemq/ActiveMQHandler.java
+++ b/zipkin-server/receiver-zipkin-activemq/src/main/java/zipkin/server/receiver/zipkin/activemq/ActiveMQHandler.java
@@ -20,7 +20,7 @@
import org.apache.activemq.transport.TransportListener;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.StringUtil;
-import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward;
+import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
@@ -54,7 +54,7 @@ public class ActiveMQHandler implements TransportListener, MessageListener, Clos
private static final Logger log = LoggerFactory.getLogger(ActiveMQHandler.class.getName());
private final ZipkinActiveMQConfig config;
- private final SpanForward spanForward;
+ private final SpanForwardService spanForward;
private final ActiveMQConnectionFactory connectionFactory;
static final CheckResult
@@ -70,11 +70,11 @@ public class ActiveMQHandler implements TransportListener, MessageListener, Clos
volatile CheckResult checkResult = CheckResult.OK;
- public ActiveMQHandler(ZipkinActiveMQConfig config, SpanForward spanForward, ModuleManager moduleManager) {
+ public ActiveMQHandler(ZipkinActiveMQConfig config, SpanForwardService spanForward, ModuleManager moduleManager) {
this(config, createConnectionFactory(config), spanForward, moduleManager);
}
- public ActiveMQHandler(ZipkinActiveMQConfig config, ActiveMQConnectionFactory connectionFactory, SpanForward spanForward, ModuleManager moduleManager) {
+ public ActiveMQHandler(ZipkinActiveMQConfig config, ActiveMQConnectionFactory connectionFactory, SpanForwardService spanForward, ModuleManager moduleManager) {
this.config = config;
this.spanForward = spanForward;
this.connectionFactory = connectionFactory;
diff --git a/zipkin-server/receiver-zipkin-activemq/src/main/java/zipkin/server/receiver/zipkin/activemq/ZipkinActiveMQProvider.java b/zipkin-server/receiver-zipkin-activemq/src/main/java/zipkin/server/receiver/zipkin/activemq/ZipkinActiveMQProvider.java
index d2430e0695d..2338872f6b4 100644
--- a/zipkin-server/receiver-zipkin-activemq/src/main/java/zipkin/server/receiver/zipkin/activemq/ZipkinActiveMQProvider.java
+++ b/zipkin-server/receiver-zipkin-activemq/src/main/java/zipkin/server/receiver/zipkin/activemq/ZipkinActiveMQProvider.java
@@ -15,18 +15,17 @@
package zipkin.server.receiver.zipkin.activemq;
import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
-import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward;
-import zipkin.server.core.services.ZipkinConfigService;
+import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService;
+import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverModule;
public class ZipkinActiveMQProvider extends ModuleProvider {
private ZipkinActiveMQConfig moduleConfig;
- private SpanForward spanForward;
+ private SpanForwardService spanForward;
@Override
public String name() {
@@ -59,8 +58,7 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException {
@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
- final ConfigService service = getManager().find(CoreModule.NAME).provider().getService(ConfigService.class);
- this.spanForward = new SpanForward(((ZipkinConfigService)service).toZipkinReceiverConfig(), getManager());
+ this.spanForward = getManager().find(ZipkinReceiverModule.NAME).provider().getService(SpanForwardService.class);
}
@Override
diff --git a/zipkin-server/receiver-zipkin-core/pom.xml b/zipkin-server/receiver-zipkin-core/pom.xml
new file mode 100644
index 00000000000..00fed3985fd
--- /dev/null
+++ b/zipkin-server/receiver-zipkin-core/pom.xml
@@ -0,0 +1,29 @@
+
+
+ 4.0.0
+
+ zipkin-server-parent
+ io.zipkin
+ 2.24.4-SNAPSHOT
+
+
+ receiver-zipkin-core
+ Zipkin Receiver Core
+
+
+
+ io.zipkin
+ zipkin-server-core
+ ${project.version}
+
+
+
+ org.apache.skywalking
+ zipkin-receiver-plugin
+ ${skywalking.version}
+
+
+
+
\ No newline at end of file
diff --git a/zipkin-server/receiver-zipkin-core/src/main/java/zipkin/server/receiver/zipkin/core/SpanForwardCore.java b/zipkin-server/receiver-zipkin-core/src/main/java/zipkin/server/receiver/zipkin/core/SpanForwardCore.java
new file mode 100644
index 00000000000..b688046840d
--- /dev/null
+++ b/zipkin-server/receiver-zipkin-core/src/main/java/zipkin/server/receiver/zipkin/core/SpanForwardCore.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2015-2023 The OpenZipkin Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package zipkin.server.receiver.zipkin.core;
+
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
+import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+public class SpanForwardCore extends SpanForward {
+ private final ZipkinReceiverConfig config;
+ public SpanForwardCore(ZipkinReceiverConfig config, ModuleManager manager) {
+ super(config, manager);
+ this.config = config;
+ }
+
+ public Set getTagAutocompleteKeys() {
+ return new HashSet<>(Arrays.asList(config.getSearchableTracesTags().split(Const.COMMA)));
+ }
+}
diff --git a/zipkin-server/receiver-zipkin-core/src/main/java/zipkin/server/receiver/zipkin/core/ZipkinReceiverCoreConfig.java b/zipkin-server/receiver-zipkin-core/src/main/java/zipkin/server/receiver/zipkin/core/ZipkinReceiverCoreConfig.java
new file mode 100644
index 00000000000..acb35146e01
--- /dev/null
+++ b/zipkin-server/receiver-zipkin-core/src/main/java/zipkin/server/receiver/zipkin/core/ZipkinReceiverCoreConfig.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2015-2023 The OpenZipkin Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package zipkin.server.receiver.zipkin.core;
+
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+
+public class ZipkinReceiverCoreConfig extends ModuleConfig {
+
+ /**
+ * The trace sample rate precision is 0.0001, should be between 0 and 1.
+ */
+ private double traceSampleRate = 1.0f;
+
+ /**
+ * Defines a set of span tag keys which are searchable.
+ * The max length of key=value should be less than 256 or will be dropped.
+ */
+ private String searchableTracesTags = DEFAULT_SEARCHABLE_TAG_KEYS;
+
+ private static final String DEFAULT_SEARCHABLE_TAG_KEYS = String.join(
+ Const.COMMA,
+ "http.method"
+ );
+
+ public double getTraceSampleRate() {
+ return traceSampleRate;
+ }
+
+ public void setTraceSampleRate(double traceSampleRate) {
+ this.traceSampleRate = traceSampleRate;
+ }
+
+ public String getSearchableTracesTags() {
+ return searchableTracesTags;
+ }
+
+ public void setSearchableTracesTags(String searchableTracesTags) {
+ this.searchableTracesTags = searchableTracesTags;
+ }
+}
diff --git a/zipkin-server/receiver-zipkin-core/src/main/java/zipkin/server/receiver/zipkin/core/ZipkinReceiverCoreProvider.java b/zipkin-server/receiver-zipkin-core/src/main/java/zipkin/server/receiver/zipkin/core/ZipkinReceiverCoreProvider.java
new file mode 100644
index 00000000000..95082236b23
--- /dev/null
+++ b/zipkin-server/receiver-zipkin-core/src/main/java/zipkin/server/receiver/zipkin/core/ZipkinReceiverCoreProvider.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2015-2023 The OpenZipkin Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package zipkin.server.receiver.zipkin.core;
+
+import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+import org.apache.skywalking.oap.server.library.module.ModuleDefine;
+import org.apache.skywalking.oap.server.library.module.ModuleProvider;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
+import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
+import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService;
+import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
+import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverModule;
+
+public class ZipkinReceiverCoreProvider extends ModuleProvider {
+ private ZipkinReceiverCoreConfig moduleConfig;
+
+ @Override
+ public String name() {
+ return "zipkin";
+ }
+
+ @Override
+ public Class extends ModuleDefine> module() {
+ return ZipkinReceiverModule.class;
+ }
+
+ @Override
+ public ConfigCreator extends ModuleConfig> newConfigCreator() {
+ return new ConfigCreator() {
+
+ @Override
+ public Class type() {
+ return ZipkinReceiverCoreConfig.class;
+ }
+
+ @Override
+ public void onInitialized(ZipkinReceiverCoreConfig initialized) {
+ moduleConfig = initialized;
+ }
+ };
+ }
+
+ @Override
+ public void prepare() throws ServiceNotProvidedException, ModuleStartException {
+ final ZipkinReceiverConfig config = new ZipkinReceiverConfig();
+ config.setSearchableTracesTags(moduleConfig.getSearchableTracesTags());
+ config.setSampleRate((int) (moduleConfig.getTraceSampleRate() * 10000));
+
+ this.registerServiceImplementation(SpanForwardService.class, new SpanForwardCore(config, getManager()));
+ }
+
+ @Override
+ public void start() throws ServiceNotProvidedException, ModuleStartException {
+ }
+
+ @Override
+ public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
+ }
+
+ @Override
+ public String[] requiredModules() {
+ return new String[0];
+ }
+}
diff --git a/zipkin-server/receiver-zipkin-core/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider b/zipkin-server/receiver-zipkin-core/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
new file mode 100755
index 00000000000..1e28be08cf5
--- /dev/null
+++ b/zipkin-server/receiver-zipkin-core/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+zipkin.server.receiver.zipkin.core.ZipkinReceiverCoreProvider
\ No newline at end of file
diff --git a/zipkin-server/receiver-zipkin-http/pom.xml b/zipkin-server/receiver-zipkin-http/pom.xml
index 612fab6aeb0..bf32ec7294d 100644
--- a/zipkin-server/receiver-zipkin-http/pom.xml
+++ b/zipkin-server/receiver-zipkin-http/pom.xml
@@ -15,15 +15,9 @@
io.zipkin
- zipkin-server-core
+ receiver-zipkin-core
${project.version}
-
-
- org.apache.skywalking
- zipkin-receiver-plugin
- ${skywalking.version}
-
\ No newline at end of file
diff --git a/zipkin-server/receiver-zipkin-http/src/main/java/zipkin/server/receiver/zipkin/http/ZipkinHTTPReceiverProvider.java b/zipkin-server/receiver-zipkin-http/src/main/java/zipkin/server/receiver/zipkin/http/ZipkinHTTPReceiverProvider.java
index c9a3e4cabaa..0550fbc089e 100644
--- a/zipkin-server/receiver-zipkin-http/src/main/java/zipkin/server/receiver/zipkin/http/ZipkinHTTPReceiverProvider.java
+++ b/zipkin-server/receiver-zipkin-http/src/main/java/zipkin/server/receiver/zipkin/http/ZipkinHTTPReceiverProvider.java
@@ -16,7 +16,6 @@
import com.linecorp.armeria.common.HttpMethod;
import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.server.HTTPHandlerRegister;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
@@ -25,10 +24,10 @@
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.library.server.http.HTTPServer;
import org.apache.skywalking.oap.server.library.server.http.HTTPServerConfig;
+import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService;
+import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverModule;
import org.apache.skywalking.oap.server.receiver.zipkin.handler.ZipkinSpanHTTPHandler;
-import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward;
import zipkin.server.core.services.HTTPConfigurableServer;
-import zipkin.server.core.services.ZipkinConfigService;
import java.util.Arrays;
@@ -81,8 +80,7 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException {
@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
- final ConfigService service = getManager().find(CoreModule.NAME).provider().getService(ConfigService.class);
- final SpanForward spanForward = new SpanForward(((ZipkinConfigService)service).toZipkinReceiverConfig(), getManager());
+ final SpanForwardService spanForward = getManager().find(ZipkinReceiverModule.NAME).provider().getService(SpanForwardService.class);
httpHandler = new ZipkinSpanHTTPHandler(spanForward, getManager());
if (httpServer != null) {
diff --git a/zipkin-server/receiver-zipkin-http/src/test/java/zipkin/server/receiver/zipkin/http/ITHTTPReceiver.java b/zipkin-server/receiver-zipkin-http/src/test/java/zipkin/server/receiver/zipkin/http/ITHTTPReceiver.java
index 0d72caaee4e..d51bfa772e8 100644
--- a/zipkin-server/receiver-zipkin-http/src/test/java/zipkin/server/receiver/zipkin/http/ITHTTPReceiver.java
+++ b/zipkin-server/receiver-zipkin-http/src/test/java/zipkin/server/receiver/zipkin/http/ITHTTPReceiver.java
@@ -16,13 +16,14 @@
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.CoreModuleProvider;
-import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.server.HTTPHandlerRegister;
import org.apache.skywalking.oap.server.core.server.HTTPHandlerRegisterImpl;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.server.http.HTTPServer;
import org.apache.skywalking.oap.server.library.server.http.HTTPServerConfig;
+import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService;
+import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverModule;
import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
@@ -37,8 +38,7 @@
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.powermock.reflect.Whitebox;
-import zipkin.server.core.CoreModuleConfig;
-import zipkin.server.core.services.ZipkinConfigService;
+import zipkin.server.receiver.zipkin.core.ZipkinReceiverCoreProvider;
import zipkin2.Span;
import zipkin2.TestObjects;
import zipkin2.codec.SpanBytesEncoder;
@@ -72,7 +72,7 @@ public class ITHTTPReceiver {
public void setup() throws ModuleStartException {
final HTTPServer httpServer = new HTTPServer(HTTPServerConfig.builder().host("0.0.0.0").port(port).contextPath("/").build());
httpServer.initialize();
- moduleManager = setupModuleManager(httpServer);
+ moduleManager = setupModuleManager(httpServer, forward);
final ZipkinHTTPReceiverProvider provider = new ZipkinHTTPReceiverProvider();
provider.setManager(moduleManager);
@@ -86,7 +86,6 @@ public void setup() throws ModuleStartException {
spans.add(invocationOnMock.getArgument(0, ArrayList.class));
return null;
}).when(forward).send(any());
- Whitebox.setInternalState(provider.getHttpHandler(), SpanForward.class, forward);
provider.notifyAfterCompleted();
}
@@ -116,7 +115,7 @@ public void test() throws Exception {
assertThat(spans.take()).containsExactly(CLIENT_SPAN);
}
- private ModuleManager setupModuleManager(HTTPServer httpServer) {
+ private ModuleManager setupModuleManager(HTTPServer httpServer, SpanForward forward) {
ModuleManager moduleManager = Mockito.mock(ModuleManager.class);
CoreModule coreModule = Mockito.spy(CoreModule.class);
@@ -125,13 +124,17 @@ private ModuleManager setupModuleManager(HTTPServer httpServer) {
Mockito.when(moduleManager.find(CoreModule.NAME)).thenReturn(coreModule);
Mockito.when(coreModule.provider().getService(HTTPHandlerRegister.class)).thenReturn(new HTTPHandlerRegisterImpl(httpServer));
+ final ZipkinReceiverModule zipkinReceiverModule = Mockito.spy(ZipkinReceiverModule.class);
+ final ZipkinReceiverCoreProvider receiverProvider = Mockito.mock(ZipkinReceiverCoreProvider.class);
+ Whitebox.setInternalState(zipkinReceiverModule, "loadedProvider", receiverProvider);
+ Mockito.when(moduleManager.find(ZipkinReceiverModule.NAME)).thenReturn(zipkinReceiverModule);
+ Mockito.when(zipkinReceiverModule.provider().getService(SpanForwardService.class)).thenReturn(forward);
+
TelemetryModule telemetryModule = Mockito.spy(TelemetryModule.class);
NoneTelemetryProvider noneTelemetryProvider = Mockito.mock(NoneTelemetryProvider.class);
Whitebox.setInternalState(telemetryModule, "loadedProvider", noneTelemetryProvider);
Mockito.when(moduleManager.find(TelemetryModule.NAME)).thenReturn(telemetryModule);
- Mockito.when(moduleProvider.getService(ConfigService.class))
- .thenReturn(new ZipkinConfigService(new CoreModuleConfig(), moduleProvider));
Mockito.when(noneTelemetryProvider.getService(MetricsCreator.class))
.thenReturn(new MetricsCreatorNoop());
diff --git a/zipkin-server/receiver-zipkin-kafka/pom.xml b/zipkin-server/receiver-zipkin-kafka/pom.xml
index f54ecc6684c..a8968153150 100644
--- a/zipkin-server/receiver-zipkin-kafka/pom.xml
+++ b/zipkin-server/receiver-zipkin-kafka/pom.xml
@@ -15,15 +15,9 @@
io.zipkin
- zipkin-server-core
+ receiver-zipkin-core
${project.version}
-
-
- org.apache.skywalking
- zipkin-receiver-plugin
- ${skywalking.version}
-
\ No newline at end of file
diff --git a/zipkin-server/receiver-zipkin-kafka/src/main/java/zipkin/server/receiver/zipkin/kafka/ZipkinKafkaReceiverProvider.java b/zipkin-server/receiver-zipkin-kafka/src/main/java/zipkin/server/receiver/zipkin/kafka/ZipkinKafkaReceiverProvider.java
index d20fa1422a9..7b3625ed6ef 100644
--- a/zipkin-server/receiver-zipkin-kafka/src/main/java/zipkin/server/receiver/zipkin/kafka/ZipkinKafkaReceiverProvider.java
+++ b/zipkin-server/receiver-zipkin-kafka/src/main/java/zipkin/server/receiver/zipkin/kafka/ZipkinKafkaReceiverProvider.java
@@ -15,19 +15,19 @@
package zipkin.server.receiver.zipkin.kafka;
import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
+import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService;
+import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverModule;
import org.apache.skywalking.oap.server.receiver.zipkin.kafka.KafkaHandler;
import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward;
-import zipkin.server.core.services.ZipkinConfigService;
public class ZipkinKafkaReceiverProvider extends ModuleProvider {
private ZipkinKafkaReceiverConfig moduleConfig;
- private SpanForward spanForward;
+ private SpanForwardService spanForward;
private KafkaHandler kafkaHandler;
@Override
@@ -61,13 +61,12 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException {
@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
- final ConfigService service = getManager().find(CoreModule.NAME).provider().getService(ConfigService.class);
- this.spanForward = new SpanForward(((ZipkinConfigService)service).toZipkinReceiverConfig(), getManager());
+ this.spanForward = getManager().find(ZipkinReceiverModule.NAME).provider().getService(SpanForwardService.class);
}
@Override
public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
- kafkaHandler = new KafkaHandler(moduleConfig.toSkyWalkingConfig(), this.spanForward, getManager());
+ kafkaHandler = new KafkaHandler(moduleConfig.toSkyWalkingConfig(), ((SpanForward) this.spanForward), getManager());
kafkaHandler.start();
}
diff --git a/zipkin-server/receiver-zipkin-kafka/src/test/java/zipkin/server/receiver/zipkin/kafka/ITKafkaReceiver.java b/zipkin-server/receiver-zipkin-kafka/src/test/java/zipkin/server/receiver/zipkin/kafka/ITKafkaReceiver.java
index f6d725163b2..93ae3679228 100644
--- a/zipkin-server/receiver-zipkin-kafka/src/test/java/zipkin/server/receiver/zipkin/kafka/ITKafkaReceiver.java
+++ b/zipkin-server/receiver-zipkin-kafka/src/test/java/zipkin/server/receiver/zipkin/kafka/ITKafkaReceiver.java
@@ -18,11 +18,10 @@
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.CoreModuleProvider;
-import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
+import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService;
+import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverModule;
import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
@@ -39,8 +38,7 @@
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.powermock.reflect.Whitebox;
-import zipkin.server.core.CoreModuleConfig;
-import zipkin.server.core.services.ZipkinConfigService;
+import zipkin.server.receiver.zipkin.core.ZipkinReceiverCoreProvider;
import zipkin2.Span;
import zipkin2.TestObjects;
import zipkin2.codec.SpanBytesEncoder;
@@ -80,7 +78,7 @@ public void setup() throws ModuleStartException {
config.setKafkaHandlerThreadPoolSize(2);
config.setKafkaHandlerThreadPoolQueueSize(100);
- moduleManager = setupModuleManager();
+ moduleManager = setupModuleManager(forward);
final ZipkinKafkaReceiverProvider provider = new ZipkinKafkaReceiverProvider();
provider.setManager(moduleManager);
@@ -91,7 +89,6 @@ public void setup() throws ModuleStartException {
spans.add(invocationOnMock.getArgument(0, ArrayList.class));
return null;
}).when(forward).send(any());
- Whitebox.setInternalState(provider, SpanForward.class, forward);
provider.notifyAfterCompleted();
kafka.prepareTopics(config.getKafkaTopic(), 1);
@@ -121,21 +118,20 @@ public void tearDown() {
}
}
- private ModuleManager setupModuleManager() {
+ private ModuleManager setupModuleManager(SpanForward forward) {
ModuleManager moduleManager = Mockito.mock(ModuleManager.class);
- CoreModule coreModule = Mockito.spy(CoreModule.class);
- CoreModuleProvider moduleProvider = Mockito.mock(CoreModuleProvider.class);
- Whitebox.setInternalState(coreModule, "loadedProvider", moduleProvider);
- Mockito.when(moduleManager.find(CoreModule.NAME)).thenReturn(coreModule);
+ final ZipkinReceiverModule zipkinReceiverModule = Mockito.spy(ZipkinReceiverModule.class);
+ final ZipkinReceiverCoreProvider receiverProvider = Mockito.mock(ZipkinReceiverCoreProvider.class);
+ Whitebox.setInternalState(zipkinReceiverModule, "loadedProvider", receiverProvider);
+ Mockito.when(moduleManager.find(ZipkinReceiverModule.NAME)).thenReturn(zipkinReceiverModule);
+ Mockito.when(zipkinReceiverModule.provider().getService(SpanForwardService.class)).thenReturn(forward);
TelemetryModule telemetryModule = Mockito.spy(TelemetryModule.class);
NoneTelemetryProvider noneTelemetryProvider = Mockito.mock(NoneTelemetryProvider.class);
Whitebox.setInternalState(telemetryModule, "loadedProvider", noneTelemetryProvider);
Mockito.when(moduleManager.find(TelemetryModule.NAME)).thenReturn(telemetryModule);
- Mockito.when(moduleProvider.getService(ConfigService.class))
- .thenReturn(new ZipkinConfigService(new CoreModuleConfig(), moduleProvider));
Mockito.when(noneTelemetryProvider.getService(MetricsCreator.class))
.thenReturn(new MetricsCreatorNoop());
diff --git a/zipkin-server/receiver-zipkin-rabbitmq/pom.xml b/zipkin-server/receiver-zipkin-rabbitmq/pom.xml
index 904a4c0c0da..ec2933ee564 100644
--- a/zipkin-server/receiver-zipkin-rabbitmq/pom.xml
+++ b/zipkin-server/receiver-zipkin-rabbitmq/pom.xml
@@ -19,16 +19,10 @@
io.zipkin
- zipkin-server-core
+ receiver-zipkin-core
${project.version}
-
- org.apache.skywalking
- zipkin-receiver-plugin
- ${skywalking.version}
-
-
com.rabbitmq
amqp-client
diff --git a/zipkin-server/receiver-zipkin-rabbitmq/src/main/java/zipkin/server/receriver/zipkin/rabbitmq/ZipkinRabbitMQHandler.java b/zipkin-server/receiver-zipkin-rabbitmq/src/main/java/zipkin/server/receriver/zipkin/rabbitmq/ZipkinRabbitMQHandler.java
index d6448ef54a0..a453c69d2a7 100644
--- a/zipkin-server/receiver-zipkin-rabbitmq/src/main/java/zipkin/server/receriver/zipkin/rabbitmq/ZipkinRabbitMQHandler.java
+++ b/zipkin-server/receiver-zipkin-rabbitmq/src/main/java/zipkin/server/receriver/zipkin/rabbitmq/ZipkinRabbitMQHandler.java
@@ -24,7 +24,7 @@
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.library.util.StringUtil;
-import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward;
+import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
@@ -45,13 +45,13 @@
public class ZipkinRabbitMQHandler {
private final ZipkinRabbitMQConfig config;
- private final SpanForward spanForward;
+ private final SpanForwardService spanForward;
private final ConnectionFactory connectionFactory = new ConnectionFactory();
private final HistogramMetrics histogram;
- public ZipkinRabbitMQHandler(ZipkinRabbitMQConfig config, SpanForward spanForward, ModuleManager moduleManager) throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException {
+ public ZipkinRabbitMQHandler(ZipkinRabbitMQConfig config, SpanForwardService spanForward, ModuleManager moduleManager) throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException {
this.config = config;
this.spanForward = spanForward;
diff --git a/zipkin-server/receiver-zipkin-rabbitmq/src/main/java/zipkin/server/receriver/zipkin/rabbitmq/ZipkinRabbitMQProvider.java b/zipkin-server/receiver-zipkin-rabbitmq/src/main/java/zipkin/server/receriver/zipkin/rabbitmq/ZipkinRabbitMQProvider.java
index 1852f1413a0..e089e7d0620 100644
--- a/zipkin-server/receiver-zipkin-rabbitmq/src/main/java/zipkin/server/receriver/zipkin/rabbitmq/ZipkinRabbitMQProvider.java
+++ b/zipkin-server/receiver-zipkin-rabbitmq/src/main/java/zipkin/server/receriver/zipkin/rabbitmq/ZipkinRabbitMQProvider.java
@@ -15,18 +15,17 @@
package zipkin.server.receriver.zipkin.rabbitmq;
import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
-import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward;
-import zipkin.server.core.services.ZipkinConfigService;
+import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService;
+import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverModule;
public class ZipkinRabbitMQProvider extends ModuleProvider {
private ZipkinRabbitMQConfig moduleConfig;
- private SpanForward spanForward;
+ private SpanForwardService spanForward;
@Override
public String name() {
@@ -59,8 +58,7 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException {
@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
- final ConfigService service = getManager().find(CoreModule.NAME).provider().getService(ConfigService.class);
- this.spanForward = new SpanForward(((ZipkinConfigService)service).toZipkinReceiverConfig(), getManager());
+ this.spanForward = getManager().find(ZipkinReceiverModule.NAME).provider().getService(SpanForwardService.class);
}
@Override
diff --git a/zipkin-server/receiver-zipkin-rabbitmq/src/test/java/zipkin/server/receiver/zipkin/rabbitmq/ITRabbitMQReceiver.java b/zipkin-server/receiver-zipkin-rabbitmq/src/test/java/zipkin/server/receiver/zipkin/rabbitmq/ITRabbitMQReceiver.java
index bc9ee4ac6e6..9006f7db5df 100644
--- a/zipkin-server/receiver-zipkin-rabbitmq/src/test/java/zipkin/server/receiver/zipkin/rabbitmq/ITRabbitMQReceiver.java
+++ b/zipkin-server/receiver-zipkin-rabbitmq/src/test/java/zipkin/server/receiver/zipkin/rabbitmq/ITRabbitMQReceiver.java
@@ -17,11 +17,10 @@
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
-import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.CoreModuleProvider;
-import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
+import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService;
+import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverModule;
import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
@@ -37,8 +36,7 @@
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.powermock.reflect.Whitebox;
-import zipkin.server.core.CoreModuleConfig;
-import zipkin.server.core.services.ZipkinConfigService;
+import zipkin.server.receiver.zipkin.core.ZipkinReceiverCoreProvider;
import zipkin.server.receriver.zipkin.rabbitmq.ZipkinRabbitMQConfig;
import zipkin.server.receriver.zipkin.rabbitmq.ZipkinRabbitMQProvider;
import zipkin2.Span;
@@ -77,7 +75,7 @@ public void setup() throws ModuleStartException, IOException, TimeoutException {
config.setAddresses(Collections.singletonList(rabbitMQ.host() + ":" + rabbitMQ.port()));
config.setQueue("test");
- moduleManager = setupModuleManager();
+ moduleManager = setupModuleManager(forward);
final ZipkinRabbitMQProvider provider = new ZipkinRabbitMQProvider();
provider.setManager(moduleManager);
@@ -88,7 +86,6 @@ public void setup() throws ModuleStartException, IOException, TimeoutException {
exceptedSpans.add(invocationOnMock.getArgument(0, ArrayList.class));
return null;
}).when(forward).send(any());
- Whitebox.setInternalState(provider, SpanForward.class, forward);
provider.notifyAfterCompleted();
ConnectionFactory factory = new ConnectionFactory();
@@ -121,21 +118,20 @@ void produceSpans(byte[] spans, String queue) throws Exception {
}
}
- private ModuleManager setupModuleManager() {
+ private ModuleManager setupModuleManager(SpanForward forward) {
ModuleManager moduleManager = Mockito.mock(ModuleManager.class);
- CoreModule coreModule = Mockito.spy(CoreModule.class);
- CoreModuleProvider moduleProvider = Mockito.mock(CoreModuleProvider.class);
- Whitebox.setInternalState(coreModule, "loadedProvider", moduleProvider);
- Mockito.when(moduleManager.find(CoreModule.NAME)).thenReturn(coreModule);
+ final ZipkinReceiverModule zipkinReceiverModule = Mockito.spy(ZipkinReceiverModule.class);
+ final ZipkinReceiverCoreProvider receiverProvider = Mockito.mock(ZipkinReceiverCoreProvider.class);
+ Whitebox.setInternalState(zipkinReceiverModule, "loadedProvider", receiverProvider);
+ Mockito.when(moduleManager.find(ZipkinReceiverModule.NAME)).thenReturn(zipkinReceiverModule);
+ Mockito.when(zipkinReceiverModule.provider().getService(SpanForwardService.class)).thenReturn(forward);
TelemetryModule telemetryModule = Mockito.spy(TelemetryModule.class);
NoneTelemetryProvider noneTelemetryProvider = Mockito.mock(NoneTelemetryProvider.class);
Whitebox.setInternalState(telemetryModule, "loadedProvider", noneTelemetryProvider);
Mockito.when(moduleManager.find(TelemetryModule.NAME)).thenReturn(telemetryModule);
- Mockito.when(moduleProvider.getService(ConfigService.class))
- .thenReturn(new ZipkinConfigService(new CoreModuleConfig(), moduleProvider));
Mockito.when(noneTelemetryProvider.getService(MetricsCreator.class))
.thenReturn(new MetricsCreatorNoop());
diff --git a/zipkin-server/receiver-zipkin-scribe/pom.xml b/zipkin-server/receiver-zipkin-scribe/pom.xml
index fca7e3786e1..8c6422c9db8 100644
--- a/zipkin-server/receiver-zipkin-scribe/pom.xml
+++ b/zipkin-server/receiver-zipkin-scribe/pom.xml
@@ -15,16 +15,10 @@
io.zipkin
- zipkin-server-core
+ receiver-zipkin-core
${project.version}
-
- org.apache.skywalking
- zipkin-receiver-plugin
- ${skywalking.version}
-
-
${armeria.groupId}
armeria-thrift0.15
diff --git a/zipkin-server/receiver-zipkin-scribe/src/main/java/zipkin/server/receiver/zipkin/scribe/ScribeSpanConsumer.java b/zipkin-server/receiver-zipkin-scribe/src/main/java/zipkin/server/receiver/zipkin/scribe/ScribeSpanConsumer.java
index 74fdf6d3096..ae7c9c77b5c 100644
--- a/zipkin-server/receiver-zipkin-scribe/src/main/java/zipkin/server/receiver/zipkin/scribe/ScribeSpanConsumer.java
+++ b/zipkin-server/receiver-zipkin-scribe/src/main/java/zipkin/server/receiver/zipkin/scribe/ScribeSpanConsumer.java
@@ -14,7 +14,7 @@
package zipkin.server.receiver.zipkin.scribe;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
-import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward;
+import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
@@ -32,12 +32,12 @@
import java.util.List;
final class ScribeSpanConsumer implements Scribe.AsyncIface {
- private final SpanForward spanForward;
+ private final SpanForwardService spanForward;
private final String category;
private final HistogramMetrics histogram;
- ScribeSpanConsumer(SpanForward spanForward, String category, ModuleManager moduleManager) {
+ ScribeSpanConsumer(SpanForwardService spanForward, String category, ModuleManager moduleManager) {
this.spanForward = spanForward;
this.category = category;
diff --git a/zipkin-server/receiver-zipkin-scribe/src/main/java/zipkin/server/receiver/zipkin/scribe/ZipkinScribeProvider.java b/zipkin-server/receiver-zipkin-scribe/src/main/java/zipkin/server/receiver/zipkin/scribe/ZipkinScribeProvider.java
index 89e462c7216..55fc216bd05 100644
--- a/zipkin-server/receiver-zipkin-scribe/src/main/java/zipkin/server/receiver/zipkin/scribe/ZipkinScribeProvider.java
+++ b/zipkin-server/receiver-zipkin-scribe/src/main/java/zipkin/server/receiver/zipkin/scribe/ZipkinScribeProvider.java
@@ -14,19 +14,17 @@
package zipkin.server.receiver.zipkin.scribe;
-import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
-import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward;
-import zipkin.server.core.services.ZipkinConfigService;
+import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService;
+import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverModule;
public class ZipkinScribeProvider extends ModuleProvider {
private ZipkinScribeConfig moduleConfig;
- private SpanForward spanForward;
+ private SpanForwardService spanForward;
@Override
public String name() {
@@ -59,8 +57,7 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException {
@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
- final ConfigService service = getManager().find(CoreModule.NAME).provider().getService(ConfigService.class);
- this.spanForward = new SpanForward(((ZipkinConfigService)service).toZipkinReceiverConfig(), getManager());
+ this.spanForward = getManager().find(ZipkinReceiverModule.NAME).provider().getService(SpanForwardService.class);
}
@Override
diff --git a/zipkin-server/server-core/src/main/java/zipkin/server/core/services/ZipkinConfigService.java b/zipkin-server/server-core/src/main/java/zipkin/server/core/services/ZipkinConfigService.java
index 2c1403b5da4..7cfdc8aa3de 100644
--- a/zipkin-server/server-core/src/main/java/zipkin/server/core/services/ZipkinConfigService.java
+++ b/zipkin-server/server-core/src/main/java/zipkin/server/core/services/ZipkinConfigService.java
@@ -30,10 +30,4 @@ public boolean getSearchEnable() {
return moduleConfig.getSearchEnable();
}
- public ZipkinReceiverConfig toZipkinReceiverConfig() {
- final ZipkinReceiverConfig config = new ZipkinReceiverConfig();
- config.setSearchableTracesTags(moduleConfig.getSearchableTracesTags());
- config.setSampleRate((int) (moduleConfig.getTraceSampleRate() * 10000));
- return config;
- }
}
diff --git a/zipkin-server/server-starter/pom.xml b/zipkin-server/server-starter/pom.xml
index 23abdceb191..0f4baa32c62 100644
--- a/zipkin-server/server-starter/pom.xml
+++ b/zipkin-server/server-starter/pom.xml
@@ -84,6 +84,11 @@
+
+ io.zipkin
+ receiver-zipkin-core
+ ${project.version}
+
io.zipkin
receiver-zipkin-http
@@ -109,6 +114,11 @@
receiver-zipkin-scribe
${project.version}
+
+ io.zipkin
+ receiver-otlp-trace
+ ${project.version}
+
diff --git a/zipkin-server/server-starter/src/main/resources/application.yml b/zipkin-server/server-starter/src/main/resources/application.yml
index d855028ec28..8a92ff64a05 100644
--- a/zipkin-server/server-starter/src/main/resources/application.yml
+++ b/zipkin-server/server-starter/src/main/resources/application.yml
@@ -29,11 +29,6 @@ core:
l1FlushPeriod: ${ZIPKIN_CORE_L1_AGGREGATION_FLUSH_PERIOD:500}
# The threshold of session time. Unit is ms. Default value is 70s.
storageSessionTimeout: ${ZIPKIN_CORE_STORAGE_SESSION_TIMEOUT:70000}
- # Defines a set of span tag keys which are searchable.
- # The max length of key=value should be less than 256 or will be dropped.
- searchableTracesTags: ${ZIPKIN_SEARCHABLE_TAG_KEYS:http.method}
- # The trace sample rate precision is 0.0001, should be between 0 and 1
- traceSampleRate: ${ZIPKIN_SAMPLE_RATE:1}
# The number of threads used to prepare metrics data to the storage.
prepareThreads: ${ZIPKIN_PREPARE_THREADS:2}
# The period of doing data persistence. Unit is second.Default value is 25s
@@ -168,6 +163,15 @@ storage: &storage
zipkin-dependency-storage-ext: *storage
+receiver-zipkin:
+ selector: zipkin
+ zipkin:
+ # Defines a set of span tag keys which are searchable.
+ # The max length of key=value should be less than 256 or will be dropped.
+ searchableTracesTags: ${ZIPKIN_SEARCHABLE_TAG_KEYS:http.method}
+ # The trace sample rate precision is 0.0001, should be between 0 and 1
+ traceSampleRate: ${ZIPKIN_SAMPLE_RATE:1}
+
receiver-zipkin-http:
selector: ${ZIPKIN_RECEIVER_ZIPKIN_HTTP:default}
default:
@@ -228,6 +232,20 @@ receiver-zipkin-scribe:
category: ${ZIPKIN_SCRIBE_CATEGORY:zipkin}
port: ${ZIPKIN_COLLECTOR_PORT:9410}
+receiver-otel:
+ selector: ${SW_OTEL_RECEIVER:zipkin}
+ zipkin:
+ gRPCHost: ${ZIPKIN_OTEL_GRPC_HOST:0.0.0.0}
+ gRPCPort: ${ZIPKIN_OTEL_GRPC_PORT:0}
+ gRPCThreadPoolQueueSize: ${ZIPKIN_OTEL_GRPC_POOL_QUEUE_SIZE:-1}
+ gRPCThreadPoolSize: ${ZIPKIN_OTEL_GRPC_THREAD_POOL_SIZE:-1}
+ gRPCSslEnabled: ${ZIPKIN_OTEL_GRPC_SSL_ENABLED:false}
+ gRPCSslKeyPath: ${ZIPKIN_OTEL_GRPC_SSL_KEY_PATH:""}
+ gRPCSslCertChainPath: ${ZIPKIN_OTEL_GRPC_SSL_CERT_CHAIN_PATH:""}
+ gRPCSslTrustedCAPath: ${ZIPKIN_OTEL_GRPC_SSL_TRUSTED_CA_PATH:""}
+ gRPCMaxConcurrentCallsPerConnection: ${ZIPKIN_OTEL_GRPC_MAX_CONCURRENT_CALL:0}
+ gRPCMaxMessageSize: ${ZIPKIN_OTEL_GRPC_MAX_MESSAGE_SIZE:0}
+
## This module is for Zipkin query API and support zipkin-lens UI
query-zipkin:
selector: ${ZIPKIN_QUERY_ZIPKIN:zipkin}
diff --git a/zipkin-server/storage-cassandra/pom.xml b/zipkin-server/storage-cassandra/pom.xml
index 3c96072f802..e7c53188b5f 100644
--- a/zipkin-server/storage-cassandra/pom.xml
+++ b/zipkin-server/storage-cassandra/pom.xml
@@ -16,7 +16,7 @@
io.zipkin
- zipkin-server-core
+ receiver-zipkin-core
${project.version}
diff --git a/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/CassandraTagAutocompleteDAO.java b/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/CassandraTagAutocompleteDAO.java
index f4d761e6dd7..acf9e414edb 100644
--- a/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/CassandraTagAutocompleteDAO.java
+++ b/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/CassandraTagAutocompleteDAO.java
@@ -14,22 +14,20 @@
package zipkin.server.storage.cassandra.dao;
-import org.apache.skywalking.oap.server.core.Const;
-import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.TagAutocompleteData;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.TagType;
-import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.query.input.Duration;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService;
+import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverModule;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.common.dao.JDBCTagAutoCompleteQueryDAO;
-import zipkin.server.core.services.ZipkinConfigService;
+import zipkin.server.receiver.zipkin.core.SpanForwardCore;
import zipkin.server.storage.cassandra.CassandraClient;
import zipkin.server.storage.cassandra.CassandraTableHelper;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
-import java.util.stream.Stream;
import static java.util.Objects.nonNull;
@@ -51,9 +49,8 @@ private Set getTagAutocompleteKeys() {
if (tagAutocompleteKeys != null) {
return tagAutocompleteKeys;
}
- final ConfigService service = moduleManager.find(CoreModule.NAME).provider().getService(ConfigService.class);
- tagAutocompleteKeys = Stream.of((((ZipkinConfigService) service).toZipkinReceiverConfig().getSearchableTracesTags())
- .split(Const.COMMA)).collect(Collectors.toSet());
+ final SpanForwardService service = moduleManager.find(ZipkinReceiverModule.NAME).provider().getService(SpanForwardService.class);
+ tagAutocompleteKeys = ((SpanForwardCore) service).getTagAutocompleteKeys();
return tagAutocompleteKeys;
}
From b90970d021a87a9793a2f4cbc2e80820f2aadad9 Mon Sep 17 00:00:00 2001
From: Mrproliu <741550557@qq.com>
Date: Tue, 31 Oct 2023 17:13:04 +0800
Subject: [PATCH 2/3] Reduce port configuration and set all data collection
handler bind to server port
---
.../server/receiver/otlp/OTLPTraceConfig.java | 102 ----------
.../receiver/otlp/OTLPTraceProvider.java | 47 +----
.../zipkin/http/ZipkinHTTPReceiverConfig.java | 67 -------
.../http/ZipkinHTTPReceiverProvider.java | 32 +---
.../receiver/zipkin/http/ITHTTPReceiver.java | 1 -
zipkin-server/server-core/pom.xml | 5 +
.../zipkin/server/core/CoreModuleConfig.java | 174 ++++++------------
.../server/core/CoreModuleProvider.java | 81 +++-----
.../core/GRPCHandlerRegisterAdapter.java | 47 +++++
.../core/services/HTTPConfigurableServer.java | 13 ++
.../src/main/resources/application.yml | 46 ++---
.../storage/cassandra/ITCassandraStorage.java | 10 +-
.../src/test/resources/application.yml | 43 +++--
13 files changed, 183 insertions(+), 485 deletions(-)
create mode 100644 zipkin-server/server-core/src/main/java/zipkin/server/core/GRPCHandlerRegisterAdapter.java
diff --git a/zipkin-server/receiver-otlp-trace/src/main/java/zipkin/server/receiver/otlp/OTLPTraceConfig.java b/zipkin-server/receiver-otlp-trace/src/main/java/zipkin/server/receiver/otlp/OTLPTraceConfig.java
index d61d659a035..158c5591c1b 100644
--- a/zipkin-server/receiver-otlp-trace/src/main/java/zipkin/server/receiver/otlp/OTLPTraceConfig.java
+++ b/zipkin-server/receiver-otlp-trace/src/main/java/zipkin/server/receiver/otlp/OTLPTraceConfig.java
@@ -17,106 +17,4 @@
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
public class OTLPTraceConfig extends ModuleConfig {
- private String gRPCHost;
- /**
- * Only setting the real port(not 0) makes the gRPC server online.
- */
- private int gRPCPort;
- private int maxConcurrentCallsPerConnection;
- private int maxMessageSize;
- private int gRPCThreadPoolSize;
- private int gRPCThreadPoolQueueSize;
- private String authentication;
- private boolean gRPCSslEnabled = false;
- private String gRPCSslKeyPath;
- private String gRPCSslCertChainPath;
- private String gRPCSslTrustedCAsPath;
-
- public String getGRPCHost() {
- return gRPCHost;
- }
-
- public void setGRPCHost(String gRPCHost) {
- this.gRPCHost = gRPCHost;
- }
-
- public int getGRPCPort() {
- return gRPCPort;
- }
-
- public void setGRPCPort(int gRPCPort) {
- this.gRPCPort = gRPCPort;
- }
-
- public int getMaxConcurrentCallsPerConnection() {
- return maxConcurrentCallsPerConnection;
- }
-
- public void setMaxConcurrentCallsPerConnection(int maxConcurrentCallsPerConnection) {
- this.maxConcurrentCallsPerConnection = maxConcurrentCallsPerConnection;
- }
-
- public int getMaxMessageSize() {
- return maxMessageSize;
- }
-
- public void setMaxMessageSize(int maxMessageSize) {
- this.maxMessageSize = maxMessageSize;
- }
-
- public int getGRPCThreadPoolSize() {
- return gRPCThreadPoolSize;
- }
-
- public void setGRPCThreadPoolSize(int gRPCThreadPoolSize) {
- this.gRPCThreadPoolSize = gRPCThreadPoolSize;
- }
-
- public int getGRPCThreadPoolQueueSize() {
- return gRPCThreadPoolQueueSize;
- }
-
- public void setGRPCThreadPoolQueueSize(int gRPCThreadPoolQueueSize) {
- this.gRPCThreadPoolQueueSize = gRPCThreadPoolQueueSize;
- }
-
- public String getAuthentication() {
- return authentication;
- }
-
- public void setAuthentication(String authentication) {
- this.authentication = authentication;
- }
-
- public boolean getGRPCSslEnabled() {
- return gRPCSslEnabled;
- }
-
- public void setGRPCSslEnabled(boolean gRPCSslEnabled) {
- this.gRPCSslEnabled = gRPCSslEnabled;
- }
-
- public String getGRPCSslKeyPath() {
- return gRPCSslKeyPath;
- }
-
- public void setGRPCSslKeyPath(String gRPCSslKeyPath) {
- this.gRPCSslKeyPath = gRPCSslKeyPath;
- }
-
- public String getGRPCSslCertChainPath() {
- return gRPCSslCertChainPath;
- }
-
- public void setGRPCSslCertChainPath(String gRPCSslCertChainPath) {
- this.gRPCSslCertChainPath = gRPCSslCertChainPath;
- }
-
- public String getGRPCSslTrustedCAsPath() {
- return gRPCSslTrustedCAsPath;
- }
-
- public void setGRPCSslTrustedCAsPath(String gRPCSslTrustedCAsPath) {
- this.gRPCSslTrustedCAsPath = gRPCSslTrustedCAsPath;
- }
}
diff --git a/zipkin-server/receiver-otlp-trace/src/main/java/zipkin/server/receiver/otlp/OTLPTraceProvider.java b/zipkin-server/receiver-otlp-trace/src/main/java/zipkin/server/receiver/otlp/OTLPTraceProvider.java
index c83500f7ed3..5028828a338 100644
--- a/zipkin-server/receiver-otlp-trace/src/main/java/zipkin/server/receiver/otlp/OTLPTraceProvider.java
+++ b/zipkin-server/receiver-otlp-trace/src/main/java/zipkin/server/receiver/otlp/OTLPTraceProvider.java
@@ -14,17 +14,13 @@
package zipkin.server.receiver.otlp;
-import org.apache.logging.log4j.util.Strings;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
-import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegisterImpl;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
-import org.apache.skywalking.oap.server.library.server.ServerException;
-import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
import org.apache.skywalking.oap.server.receiver.otel.OtelMetricReceiverConfig;
import org.apache.skywalking.oap.server.receiver.otel.OtelMetricReceiverModule;
import org.apache.skywalking.oap.server.receiver.otel.otlp.OpenTelemetryMetricRequestProcessor;
@@ -34,7 +30,6 @@
public class OTLPTraceProvider extends ModuleProvider {
private OTLPTraceConfig moduleConfig;
private OpenTelemetryTraceHandler traceHandler;
- private GRPCServer grpcServer;
@Override
public String name() {
@@ -70,53 +65,13 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException {
@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
- GRPCHandlerRegister handlerRegister;
- if (moduleConfig.getGRPCPort() > 0) {
- if (moduleConfig.getGRPCSslEnabled()) {
- grpcServer = new GRPCServer(
- Strings.isBlank(moduleConfig.getGRPCHost()) ? "0.0.0.0" : moduleConfig.getGRPCHost(),
- moduleConfig.getGRPCPort(),
- moduleConfig.getGRPCSslCertChainPath(),
- moduleConfig.getGRPCSslKeyPath(),
- moduleConfig.getGRPCSslTrustedCAsPath()
- );
- } else {
- grpcServer = new GRPCServer(
- Strings.isBlank(moduleConfig.getGRPCHost()) ? "0.0.0.0" : moduleConfig.getGRPCHost(),
- moduleConfig.getGRPCPort()
- );
- }
- if (moduleConfig.getMaxMessageSize() > 0) {
- grpcServer.setMaxMessageSize(moduleConfig.getMaxMessageSize());
- }
- if (moduleConfig.getMaxConcurrentCallsPerConnection() > 0) {
- grpcServer.setMaxConcurrentCallsPerConnection(moduleConfig.getMaxConcurrentCallsPerConnection());
- }
- if (moduleConfig.getGRPCThreadPoolQueueSize() > 0) {
- grpcServer.setThreadPoolQueueSize(moduleConfig.getGRPCThreadPoolQueueSize());
- }
- if (moduleConfig.getGRPCThreadPoolSize() > 0) {
- grpcServer.setThreadPoolSize(moduleConfig.getGRPCThreadPoolSize());
- }
- grpcServer.initialize();
-
- handlerRegister = new GRPCHandlerRegisterImpl(grpcServer);
- } else {
- handlerRegister = getManager().find(CoreModule.NAME).provider().getService(GRPCHandlerRegister.class);
- }
+ GRPCHandlerRegister handlerRegister = getManager().find(CoreModule.NAME).provider().getService(GRPCHandlerRegister.class);
traceHandler = new OTLPTraceHandler(handlerRegister, getManager());
traceHandler.active();
}
@Override
public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
- if (grpcServer != null) {
- try {
- grpcServer.start();
- } catch (ServerException e) {
- throw new ModuleStartException(e.getMessage(), e);
- }
- }
}
@Override
diff --git a/zipkin-server/receiver-zipkin-http/src/main/java/zipkin/server/receiver/zipkin/http/ZipkinHTTPReceiverConfig.java b/zipkin-server/receiver-zipkin-http/src/main/java/zipkin/server/receiver/zipkin/http/ZipkinHTTPReceiverConfig.java
index 4d9ca221525..675f129047f 100644
--- a/zipkin-server/receiver-zipkin-http/src/main/java/zipkin/server/receiver/zipkin/http/ZipkinHTTPReceiverConfig.java
+++ b/zipkin-server/receiver-zipkin-http/src/main/java/zipkin/server/receiver/zipkin/http/ZipkinHTTPReceiverConfig.java
@@ -17,71 +17,4 @@
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
public class ZipkinHTTPReceiverConfig extends ModuleConfig {
- private String restHost;
- private int restPort;
- private String restContextPath;
- private int restMaxThreads = 200;
- private long restIdleTimeOut = 30000;
- private int restAcceptQueueSize = 0;
- /**
- * The maximum size in bytes allowed for request headers.
- * Use -1 to disable it.
- */
- private int restMaxRequestHeaderSize = 8192;
-
- public String getRestHost() {
- return restHost;
- }
-
- public void setRestHost(String restHost) {
- this.restHost = restHost;
- }
-
- public int getRestPort() {
- return restPort;
- }
-
- public void setRestPort(int restPort) {
- this.restPort = restPort;
- }
-
- public String getRestContextPath() {
- return restContextPath;
- }
-
- public void setRestContextPath(String restContextPath) {
- this.restContextPath = restContextPath;
- }
-
- public int getRestMaxThreads() {
- return restMaxThreads;
- }
-
- public void setRestMaxThreads(int restMaxThreads) {
- this.restMaxThreads = restMaxThreads;
- }
-
- public long getRestIdleTimeOut() {
- return restIdleTimeOut;
- }
-
- public void setRestIdleTimeOut(long restIdleTimeOut) {
- this.restIdleTimeOut = restIdleTimeOut;
- }
-
- public int getRestAcceptQueueSize() {
- return restAcceptQueueSize;
- }
-
- public void setRestAcceptQueueSize(int restAcceptQueueSize) {
- this.restAcceptQueueSize = restAcceptQueueSize;
- }
-
- public int getRestMaxRequestHeaderSize() {
- return restMaxRequestHeaderSize;
- }
-
- public void setRestMaxRequestHeaderSize(int restMaxRequestHeaderSize) {
- this.restMaxRequestHeaderSize = restMaxRequestHeaderSize;
- }
}
diff --git a/zipkin-server/receiver-zipkin-http/src/main/java/zipkin/server/receiver/zipkin/http/ZipkinHTTPReceiverProvider.java b/zipkin-server/receiver-zipkin-http/src/main/java/zipkin/server/receiver/zipkin/http/ZipkinHTTPReceiverProvider.java
index 0550fbc089e..0efe9436e52 100644
--- a/zipkin-server/receiver-zipkin-http/src/main/java/zipkin/server/receiver/zipkin/http/ZipkinHTTPReceiverProvider.java
+++ b/zipkin-server/receiver-zipkin-http/src/main/java/zipkin/server/receiver/zipkin/http/ZipkinHTTPReceiverProvider.java
@@ -22,19 +22,15 @@
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
-import org.apache.skywalking.oap.server.library.server.http.HTTPServer;
-import org.apache.skywalking.oap.server.library.server.http.HTTPServerConfig;
import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService;
import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverModule;
import org.apache.skywalking.oap.server.receiver.zipkin.handler.ZipkinSpanHTTPHandler;
-import zipkin.server.core.services.HTTPConfigurableServer;
import java.util.Arrays;
public class ZipkinHTTPReceiverProvider extends ModuleProvider {
private ZipkinHTTPReceiverConfig moduleConfig;
private ZipkinSpanHTTPHandler httpHandler;
- private HTTPServer httpServer;
@Override
public String name() {
@@ -63,19 +59,6 @@ public void onInitialized(ZipkinHTTPReceiverConfig initialized) {
@Override
public void prepare() throws ServiceNotProvidedException, ModuleStartException {
- if (moduleConfig.getRestPort() > 0) {
- HTTPServerConfig httpServerConfig = HTTPServerConfig.builder()
- .host(moduleConfig.getRestHost())
- .port(moduleConfig.getRestPort())
- .contextPath(moduleConfig.getRestContextPath())
- .idleTimeOut(moduleConfig.getRestIdleTimeOut())
- .maxThreads(moduleConfig.getRestMaxThreads())
- .acceptQueueSize(moduleConfig.getRestAcceptQueueSize())
- .maxRequestHeaderSize(moduleConfig.getRestMaxRequestHeaderSize())
- .build();
- httpServer = new HTTPConfigurableServer(httpServerConfig);
- httpServer.initialize();
- }
}
@Override
@@ -83,19 +66,12 @@ public void start() throws ServiceNotProvidedException, ModuleStartException {
final SpanForwardService spanForward = getManager().find(ZipkinReceiverModule.NAME).provider().getService(SpanForwardService.class);
httpHandler = new ZipkinSpanHTTPHandler(spanForward, getManager());
- if (httpServer != null) {
- httpServer.addHandler(httpHandler, Arrays.asList(HttpMethod.POST, HttpMethod.GET));
- } else {
- final HTTPHandlerRegister httpRegister = getManager().find(CoreModule.NAME).provider().getService(HTTPHandlerRegister.class);
- httpRegister.addHandler(httpHandler, Arrays.asList(HttpMethod.POST, HttpMethod.GET));
- }
+ final HTTPHandlerRegister httpRegister = getManager().find(CoreModule.NAME).provider().getService(HTTPHandlerRegister.class);
+ httpRegister.addHandler(httpHandler, Arrays.asList(HttpMethod.POST, HttpMethod.GET));
}
@Override
public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
- if (httpServer != null) {
- httpServer.start();
- }
}
@Override
@@ -104,8 +80,4 @@ public String[] requiredModules() {
CoreModule.NAME,
};
}
-
- public ZipkinSpanHTTPHandler getHttpHandler() {
- return httpHandler;
- }
}
diff --git a/zipkin-server/receiver-zipkin-http/src/test/java/zipkin/server/receiver/zipkin/http/ITHTTPReceiver.java b/zipkin-server/receiver-zipkin-http/src/test/java/zipkin/server/receiver/zipkin/http/ITHTTPReceiver.java
index d51bfa772e8..41298e9c411 100644
--- a/zipkin-server/receiver-zipkin-http/src/test/java/zipkin/server/receiver/zipkin/http/ITHTTPReceiver.java
+++ b/zipkin-server/receiver-zipkin-http/src/test/java/zipkin/server/receiver/zipkin/http/ITHTTPReceiver.java
@@ -77,7 +77,6 @@ public void setup() throws ModuleStartException {
final ZipkinHTTPReceiverProvider provider = new ZipkinHTTPReceiverProvider();
provider.setManager(moduleManager);
final ZipkinHTTPReceiverConfig config = new ZipkinHTTPReceiverConfig();
- config.setRestPort(-1);
Whitebox.setInternalState(provider, ZipkinHTTPReceiverConfig.class, config);
provider.prepare();
provider.start();
diff --git a/zipkin-server/server-core/pom.xml b/zipkin-server/server-core/pom.xml
index 02b3525b5a0..32e2b5ca570 100644
--- a/zipkin-server/server-core/pom.xml
+++ b/zipkin-server/server-core/pom.xml
@@ -29,6 +29,11 @@
classgraph
4.8.162
+
+ ${armeria.groupId}
+ armeria-grpc
+ ${armeria.version}
+
\ No newline at end of file
diff --git a/zipkin-server/server-core/src/main/java/zipkin/server/core/CoreModuleConfig.java b/zipkin-server/server-core/src/main/java/zipkin/server/core/CoreModuleConfig.java
index dbbb0dd71be..80bfb05e7cb 100644
--- a/zipkin-server/server-core/src/main/java/zipkin/server/core/CoreModuleConfig.java
+++ b/zipkin-server/server-core/src/main/java/zipkin/server/core/CoreModuleConfig.java
@@ -17,29 +17,21 @@
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
public class CoreModuleConfig extends ModuleConfig {
- private String gRPCHost;
- private int gRPCPort;
- private boolean gRPCSslEnabled = false;
- private String gRPCSslKeyPath;
- private String gRPCSslCertChainPath;
- private String gRPCSslTrustedCAPath;
- private int gRPCThreadPoolSize;
- private int gRPCThreadPoolQueueSize;
- private int gRPCMaxConcurrentCallsPerConnection;
- private int gRPCMaxMessageSize;
-
- private String restHost;
- private int restPort;
- private String restContextPath;
- private int restMaxThreads = 200;
- private long restIdleTimeOut = 30000;
- private int restAcceptQueueSize = 0;
+ private String serverHost;
+ private int serverPort;
+ private int serverMaxThreads = 200;
+ private long serverIdleTimeOut = 30000;
+ private int serverAcceptQueueSize = 0;
/**
* The maximum size in bytes allowed for request headers.
* Use -1 to disable it.
*/
- private int restMaxRequestHeaderSize = 8192;
-
+ private int serverMaxRequestHeaderSize = 8192;
+ private boolean serverEnableTLS = false;
+ private String serverTLSKeyPath;
+ private String serverTLSCertChainPath;
+ private String clusterSslTrustedCAPath;
+
/**
* The max length of the service name.
*/
@@ -221,147 +213,91 @@ public void setRemoteTimeout(int remoteTimeout) {
this.remoteTimeout = remoteTimeout;
}
- public String getGRPCHost() {
- return gRPCHost;
- }
-
- public void setGRPCHost(String gRPCHost) {
- this.gRPCHost = gRPCHost;
- }
-
- public int getGRPCPort() {
- return gRPCPort;
- }
-
- public void setGRPCPort(int gRPCPort) {
- this.gRPCPort = gRPCPort;
- }
-
- public boolean getGRPCSslEnabled() {
- return gRPCSslEnabled;
- }
-
- public void setGRPCSslEnabled(boolean gRPCSslEnabled) {
- this.gRPCSslEnabled = gRPCSslEnabled;
- }
-
- public String getGRPCSslKeyPath() {
- return gRPCSslKeyPath;
- }
-
- public void setGRPCSslKeyPath(String gRPCSslKeyPath) {
- this.gRPCSslKeyPath = gRPCSslKeyPath;
- }
-
- public String getGRPCSslCertChainPath() {
- return gRPCSslCertChainPath;
- }
-
- public void setGRPCSslCertChainPath(String gRPCSslCertChainPath) {
- this.gRPCSslCertChainPath = gRPCSslCertChainPath;
- }
-
- public String getGRPCSslTrustedCAPath() {
- return gRPCSslTrustedCAPath;
- }
-
- public void setGRPCSslTrustedCAPath(String gRPCSslTrustedCAPath) {
- this.gRPCSslTrustedCAPath = gRPCSslTrustedCAPath;
- }
-
- public int getGRPCThreadPoolSize() {
- return gRPCThreadPoolSize;
- }
-
- public void setGRPCThreadPoolSize(int gRPCThreadPoolSize) {
- this.gRPCThreadPoolSize = gRPCThreadPoolSize;
- }
-
- public int getGRPCThreadPoolQueueSize() {
- return gRPCThreadPoolQueueSize;
+ public boolean getSearchEnable() {
+ return searchEnable;
}
- public void setGRPCThreadPoolQueueSize(int gRPCThreadPoolQueueSize) {
- this.gRPCThreadPoolQueueSize = gRPCThreadPoolQueueSize;
+ public void setSearchEnable(boolean searchEnable) {
+ this.searchEnable = searchEnable;
}
- public int getGRPCMaxConcurrentCallsPerConnection() {
- return gRPCMaxConcurrentCallsPerConnection;
+ public String getServerHost() {
+ return serverHost;
}
- public void setGRPCMaxConcurrentCallsPerConnection(int gRPCMaxConcurrentCallsPerConnection) {
- this.gRPCMaxConcurrentCallsPerConnection = gRPCMaxConcurrentCallsPerConnection;
+ public void setServerHost(String serverHost) {
+ this.serverHost = serverHost;
}
- public int getGRPCMaxMessageSize() {
- return gRPCMaxMessageSize;
+ public int getServerPort() {
+ return serverPort;
}
- public void setGRPCMaxMessageSize(int gRPCMaxMessageSize) {
- this.gRPCMaxMessageSize = gRPCMaxMessageSize;
+ public void setServerPort(int serverPort) {
+ this.serverPort = serverPort;
}
- public String getRestHost() {
- return restHost;
+ public int getServerMaxThreads() {
+ return serverMaxThreads;
}
- public void setRestHost(String restHost) {
- this.restHost = restHost;
+ public void setServerMaxThreads(int serverMaxThreads) {
+ this.serverMaxThreads = serverMaxThreads;
}
- public int getRestPort() {
- return restPort;
+ public long getServerIdleTimeOut() {
+ return serverIdleTimeOut;
}
- public void setRestPort(int restPort) {
- this.restPort = restPort;
+ public void setServerIdleTimeOut(long serverIdleTimeOut) {
+ this.serverIdleTimeOut = serverIdleTimeOut;
}
- public String getRestContextPath() {
- return restContextPath;
+ public int getServerAcceptQueueSize() {
+ return serverAcceptQueueSize;
}
- public void setRestContextPath(String restContextPath) {
- this.restContextPath = restContextPath;
+ public void setServerAcceptQueueSize(int serverAcceptQueueSize) {
+ this.serverAcceptQueueSize = serverAcceptQueueSize;
}
- public int getRestMaxThreads() {
- return restMaxThreads;
+ public int getServerMaxRequestHeaderSize() {
+ return serverMaxRequestHeaderSize;
}
- public void setRestMaxThreads(int restMaxThreads) {
- this.restMaxThreads = restMaxThreads;
+ public void setServerMaxRequestHeaderSize(int serverMaxRequestHeaderSize) {
+ this.serverMaxRequestHeaderSize = serverMaxRequestHeaderSize;
}
- public long getRestIdleTimeOut() {
- return restIdleTimeOut;
+ public boolean getServerEnableTLS() {
+ return serverEnableTLS;
}
- public void setRestIdleTimeOut(long restIdleTimeOut) {
- this.restIdleTimeOut = restIdleTimeOut;
+ public void setServerEnableTLS(boolean serverEnableTLS) {
+ this.serverEnableTLS = serverEnableTLS;
}
- public int getRestAcceptQueueSize() {
- return restAcceptQueueSize;
+ public String getServerTLSKeyPath() {
+ return serverTLSKeyPath;
}
- public void setRestAcceptQueueSize(int restAcceptQueueSize) {
- this.restAcceptQueueSize = restAcceptQueueSize;
+ public void setServerTLSKeyPath(String serverTLSKeyPath) {
+ this.serverTLSKeyPath = serverTLSKeyPath;
}
- public int getRestMaxRequestHeaderSize() {
- return restMaxRequestHeaderSize;
+ public String getServerTLSCertChainPath() {
+ return serverTLSCertChainPath;
}
- public void setRestMaxRequestHeaderSize(int restMaxRequestHeaderSize) {
- this.restMaxRequestHeaderSize = restMaxRequestHeaderSize;
+ public void setServerTLSCertChainPath(String serverTLSCertChainPath) {
+ this.serverTLSCertChainPath = serverTLSCertChainPath;
}
- public boolean getSearchEnable() {
- return searchEnable;
+ public String getClusterSslTrustedCAPath() {
+ return clusterSslTrustedCAPath;
}
- public void setSearchEnable(boolean searchEnable) {
- this.searchEnable = searchEnable;
+ public void setClusterSslTrustedCAPath(String clusterSslTrustedCAPath) {
+ this.clusterSslTrustedCAPath = clusterSslTrustedCAPath;
}
}
diff --git a/zipkin-server/server-core/src/main/java/zipkin/server/core/CoreModuleProvider.java b/zipkin-server/server-core/src/main/java/zipkin/server/core/CoreModuleProvider.java
index 53b91e86cf4..d6c6c79f59f 100644
--- a/zipkin-server/server-core/src/main/java/zipkin/server/core/CoreModuleProvider.java
+++ b/zipkin-server/server-core/src/main/java/zipkin/server/core/CoreModuleProvider.java
@@ -18,7 +18,6 @@
import org.apache.skywalking.oap.server.core.RunningMode;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
-import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
import org.apache.skywalking.oap.server.core.cache.NetworkAddressAliasCache;
import org.apache.skywalking.oap.server.core.cache.ProfileTaskCache;
import org.apache.skywalking.oap.server.core.cluster.ClusterCoordinator;
@@ -58,7 +57,6 @@
import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
import org.apache.skywalking.oap.server.core.remote.health.HealthCheckServiceHandler;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
-import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegisterImpl;
import org.apache.skywalking.oap.server.core.server.HTTPHandlerRegister;
import org.apache.skywalking.oap.server.core.server.HTTPHandlerRegisterImpl;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
@@ -79,8 +77,6 @@
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
-import org.apache.skywalking.oap.server.library.server.ServerException;
-import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
import org.apache.skywalking.oap.server.library.server.http.HTTPServer;
import org.apache.skywalking.oap.server.library.server.http.HTTPServerConfig;
import org.apache.skywalking.oap.server.telemetry.api.TelemetryRelatedContext;
@@ -102,8 +98,7 @@ public class CoreModuleProvider extends ModuleProvider {
private EndpointNameGrouping endpointNameGrouping;
private ZipkinSourceReceiverImpl receiver;
private RemoteClientManager remoteClientManager;
- private GRPCServer grpcServer;
- private HTTPServer httpServer;
+ private HTTPServer server;
public CoreModuleProvider() {
this.annotationScan = new ZipkinAnnotationScan();
@@ -151,48 +146,25 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException {
annotationScan.registerListener(new ZipkinStreamAnnotationListener(getManager(), moduleConfig.getSearchEnable()));
HTTPServerConfig httpServerConfig = HTTPServerConfig.builder()
- .host(moduleConfig.getRestHost())
- .port(moduleConfig.getRestPort())
- .contextPath(moduleConfig.getRestContextPath())
- .idleTimeOut(moduleConfig.getRestIdleTimeOut())
- .maxThreads(moduleConfig.getRestMaxThreads())
- .acceptQueueSize(
- moduleConfig.getRestAcceptQueueSize())
- .maxRequestHeaderSize(
- moduleConfig.getRestMaxRequestHeaderSize())
+ .host(moduleConfig.getServerHost())
+ .port(moduleConfig.getServerPort())
+ .contextPath("/")
+ .idleTimeOut(moduleConfig.getServerIdleTimeOut())
+ .maxThreads(moduleConfig.getServerMaxThreads())
+ .acceptQueueSize(moduleConfig.getServerAcceptQueueSize())
+ .maxRequestHeaderSize(moduleConfig.getServerMaxRequestHeaderSize())
+ .enableTLS(moduleConfig.getServerEnableTLS())
+ .tlsKeyPath(moduleConfig.getServerTLSKeyPath())
+ .tlsCertChainPath(moduleConfig.getServerTLSCertChainPath())
.build();
- httpServer = new HTTPConfigurableServer(httpServerConfig);
- httpServer.initialize();
+ server = new HTTPConfigurableServer(httpServerConfig);
+ server.initialize();
// "/info" handler
- httpServer.addHandler(new HTTPInfoHandler(), Arrays.asList(HttpMethod.GET, HttpMethod.POST));
+ server.addHandler(new HTTPInfoHandler(), Arrays.asList(HttpMethod.GET, HttpMethod.POST));
- // grpc
- if (moduleConfig.getGRPCSslEnabled()) {
- grpcServer = new GRPCServer(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(),
- moduleConfig.getGRPCSslCertChainPath(),
- moduleConfig.getGRPCSslKeyPath(),
- null
- );
- } else {
- grpcServer = new GRPCServer(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort());
- }
- if (moduleConfig.getGRPCMaxConcurrentCallsPerConnection() > 0) {
- grpcServer.setMaxConcurrentCallsPerConnection(moduleConfig.getGRPCMaxConcurrentCallsPerConnection());
- }
- if (moduleConfig.getGRPCMaxMessageSize() > 0) {
- grpcServer.setMaxMessageSize(moduleConfig.getGRPCMaxMessageSize());
- }
- if (moduleConfig.getGRPCThreadPoolQueueSize() > 0) {
- grpcServer.setThreadPoolQueueSize(moduleConfig.getGRPCThreadPoolQueueSize());
- }
- if (moduleConfig.getGRPCThreadPoolSize() > 0) {
- grpcServer.setThreadPoolSize(moduleConfig.getGRPCThreadPoolSize());
- }
- grpcServer.initialize();
-
- if (moduleConfig.getGRPCSslEnabled()) {
+ if (moduleConfig.getServerEnableTLS()) {
this.remoteClientManager = new RemoteClientManager(getManager(), moduleConfig.getRemoteTimeout(),
- moduleConfig.getGRPCSslTrustedCAPath()
+ moduleConfig.getClusterSslTrustedCAPath()
);
} else {
this.remoteClientManager = new RemoteClientManager(getManager(), moduleConfig.getRemoteTimeout());
@@ -203,8 +175,8 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException {
this.registerServiceImplementation(ConfigService.class, new ZipkinConfigService(moduleConfig, this));
this.registerServiceImplementation(ServerStatusService.class, new ServerStatusService(getManager()));
this.registerServiceImplementation(DownSamplingConfigService.class, new DownSamplingConfigService(Collections.emptyList()));
- this.registerServiceImplementation(GRPCHandlerRegister.class, new GRPCHandlerRegisterImpl(grpcServer));
- this.registerServiceImplementation(HTTPHandlerRegister.class, new HTTPHandlerRegisterImpl(httpServer));
+ this.registerServiceImplementation(GRPCHandlerRegister.class, new GRPCHandlerRegisterAdapter(server));
+ this.registerServiceImplementation(HTTPHandlerRegister.class, new HTTPHandlerRegisterImpl(server));
this.registerServiceImplementation(IComponentLibraryCatalogService.class, new EmptyComponentLibraryCatalogService());
this.registerServiceImplementation(SourceReceiver.class, receiver);
final WorkerInstancesService instancesService = new WorkerInstancesService();
@@ -258,8 +230,8 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException {
@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
- grpcServer.addHandler(new RemoteServiceHandler(getManager()));
- grpcServer.addHandler(new HealthCheckServiceHandler());
+ server.addHandler(new RemoteServiceHandler(getManager()), Arrays.asList(HttpMethod.GET));
+ server.addHandler(new HealthCheckServiceHandler(), Arrays.asList(HttpMethod.GET));
try {
receiver.scan();
@@ -268,7 +240,7 @@ public void start() throws ServiceNotProvidedException, ModuleStartException {
throw new ModuleStartException(e.getMessage(), e);
}
- Address gRPCServerInstanceAddress = new Address(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(), true);
+ Address gRPCServerInstanceAddress = new Address(moduleConfig.getServerHost(), moduleConfig.getServerPort(), true);
TelemetryRelatedContext.INSTANCE.setId(gRPCServerInstanceAddress.toString());
ClusterCoordinator coordinator = this.getManager()
.find(ClusterModule.NAME)
@@ -282,14 +254,9 @@ public void start() throws ServiceNotProvidedException, ModuleStartException {
@Override
public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
- try {
- if (!RunningMode.isInitMode()) {
- grpcServer.start();
- httpServer.start();
- remoteClientManager.start();
- }
- } catch (ServerException e) {
- throw new ModuleStartException(e.getMessage(), e);
+ if (!RunningMode.isInitMode()) {
+ server.start();
+ remoteClientManager.start();
}
final org.apache.skywalking.oap.server.core.CoreModuleConfig swConfig = this.moduleConfig.toSkyWalkingConfig();
diff --git a/zipkin-server/server-core/src/main/java/zipkin/server/core/GRPCHandlerRegisterAdapter.java b/zipkin-server/server-core/src/main/java/zipkin/server/core/GRPCHandlerRegisterAdapter.java
new file mode 100644
index 00000000000..208042b982f
--- /dev/null
+++ b/zipkin-server/server-core/src/main/java/zipkin/server/core/GRPCHandlerRegisterAdapter.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2015-2023 The OpenZipkin Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package zipkin.server.core;
+
+import com.linecorp.armeria.common.HttpMethod;
+import io.grpc.BindableService;
+import io.grpc.ServerInterceptor;
+import io.grpc.ServerServiceDefinition;
+import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
+import org.apache.skywalking.oap.server.library.server.http.HTTPServer;
+
+import java.util.Arrays;
+
+public class GRPCHandlerRegisterAdapter implements GRPCHandlerRegister {
+
+ private final HTTPServer server;
+
+ public GRPCHandlerRegisterAdapter(HTTPServer server) {
+ this.server = server;
+ }
+
+ @Override
+ public void addHandler(BindableService handler) {
+ server.addHandler(handler, Arrays.asList(HttpMethod.GET));
+ }
+
+ @Override
+ public void addHandler(ServerServiceDefinition definition) {
+ server.addHandler(definition, Arrays.asList(HttpMethod.GET));
+ }
+
+ @Override
+ public void addFilter(ServerInterceptor interceptor) {
+ server.addHandler(interceptor, Arrays.asList(HttpMethod.GET));
+ }
+}
diff --git a/zipkin-server/server-core/src/main/java/zipkin/server/core/services/HTTPConfigurableServer.java b/zipkin-server/server-core/src/main/java/zipkin/server/core/services/HTTPConfigurableServer.java
index 05d9e219e0f..6121a3794bd 100644
--- a/zipkin-server/server-core/src/main/java/zipkin/server/core/services/HTTPConfigurableServer.java
+++ b/zipkin-server/server-core/src/main/java/zipkin/server/core/services/HTTPConfigurableServer.java
@@ -16,6 +16,10 @@
import com.linecorp.armeria.common.HttpMethod;
import com.linecorp.armeria.server.ServerBuilder;
+import com.linecorp.armeria.server.grpc.GrpcService;
+import io.grpc.BindableService;
+import io.grpc.ServerInterceptor;
+import io.grpc.ServerServiceDefinition;
import org.apache.skywalking.oap.server.library.server.http.HTTPServer;
import org.apache.skywalking.oap.server.library.server.http.HTTPServerConfig;
@@ -32,6 +36,15 @@ public void addHandler(Object handler, List httpMethods) {
((ServerConfiguration) handler).configure(sb);
allowedMethods.addAll(httpMethods);
return;
+ } else if (handler instanceof BindableService) {
+ sb.service(GrpcService.builder().addService((BindableService) handler).build());
+ return;
+ } else if (handler instanceof ServerServiceDefinition) {
+ sb.service(GrpcService.builder().addService((ServerServiceDefinition) handler).build());
+ return;
+ } else if (handler instanceof ServerInterceptor) {
+ sb.service(GrpcService.builder().intercept((ServerInterceptor) handler).build());
+ return;
}
super.addHandler(handler, httpMethods);
}
diff --git a/zipkin-server/server-starter/src/main/resources/application.yml b/zipkin-server/server-starter/src/main/resources/application.yml
index 8a92ff64a05..7d5e8b5e1f2 100644
--- a/zipkin-server/server-starter/src/main/resources/application.yml
+++ b/zipkin-server/server-starter/src/main/resources/application.yml
@@ -33,23 +33,17 @@ core:
prepareThreads: ${ZIPKIN_PREPARE_THREADS:2}
# The period of doing data persistence. Unit is second.Default value is 25s
persistentPeriod: ${ZIPKIN_PERSISTENT_PERIOD:25}
- gRPCHost: ${ZIPKIN_GRPC_HOST:0.0.0.0}
- gRPCPort: ${ZIPKIN_GRPC_PORT:11800}
- gRPCThreadPoolQueueSize: ${ZIPKIN_GRPC_POOL_QUEUE_SIZE:-1}
- gRPCThreadPoolSize: ${ZIPKIN_GRPC_THREAD_POOL_SIZE:-1}
- gRPCSslEnabled: ${ZIPKIN_GRPC_SSL_ENABLED:false}
- gRPCSslKeyPath: ${ZIPKIN_GRPC_SSL_KEY_PATH:""}
- gRPCSslCertChainPath: ${ZIPKIN_GRPC_SSL_CERT_CHAIN_PATH:""}
- gRPCSslTrustedCAPath: ${ZIPKIN_GRPC_SSL_TRUSTED_CA_PATH:""}
- gRPCMaxConcurrentCallsPerConnection: ${ZIPKIN_GRPC_MAX_CONCURRENT_CALL:0}
- gRPCMaxMessageSize: ${ZIPKIN_GRPC_MAX_MESSAGE_SIZE:0}
- restHost: ${ZIPKIN_REST_HOST:0.0.0.0}
- restPort: ${ZIPKIN_REST_PORT:9411}
- restContextPath: ${ZIPKIN_REST_CONTEXT_PATH:/}
- restMaxThreads: ${ZIPKIN_REST_MAX_THREADS:200}
- restIdleTimeOut: ${ZIPKIN_REST_IDLE_TIMEOUT:30000}
- restAcceptQueueSize: ${ZIPKIN_REST_QUEUE_SIZE:0}
- restMaxRequestHeaderSize: ${ZIPKIN_REST_MAX_REQUEST_HEADER_SIZE:8192}
+ serverHost: ${ZIPKIN_SERVER_HOST:0.0.0.0}
+ serverPort: ${ZIPKIN_SERVER_PORT:9411}
+ serverMaxThreads: ${ZIPKIN_SERVER_MAX_THREADS:200}
+ serverIdleTimeOut: ${ZIPKIN_SERVER_IDLE_TIMEOUT:30000}
+ serverAcceptQueueSize: ${ZIPKIN_SERVER_QUEUE_SIZE:0}
+ serverMaxRequestHeaderSize: ${ZIPKIN_SERVER_MAX_REQUEST_HEADER_SIZE:8192}
+ serverThreadPoolQueueSize: ${ZIPKIN_SERVER_POOL_QUEUE_SIZE:-1}
+ serverEnableTLS: ${ZIPKIN_SERVER_SSL_ENABLED:false}
+ serverTLSKeyPath: ${ZIPKIN_SERVER_SSL_KEY_PATH:""}
+ serverTLSCertChainPath: ${ZIPKIN_SERVER_SSL_CERT_CHAIN_PATH:""}
+ clusterSslTrustedCAPath: ${ZIPKIN_SERVER_SSL_TRUSTED_CA_PATH:""}
searchEnable: ${ZIPKIN_SEARCH_ENABLED:true}
storage: &storage
@@ -175,14 +169,6 @@ receiver-zipkin:
receiver-zipkin-http:
selector: ${ZIPKIN_RECEIVER_ZIPKIN_HTTP:default}
default:
- restHost: ${ZIPKIN_RECEIVER_HTTP_REST_HOST:0.0.0.0}
- # The port number of the HTTP service, the default HTTP service in the core would be used if the value is smaller than 0.
- restPort: ${ZIPKIN_RECEIVER_HTTP_REST_PORT:-1}
- restContextPath: ${ZIPKIN_RECEIVER_HTTP_REST_CONTEXT_PATH:/}
- restMaxThreads: ${ZIPKIN_RECEIVER_HTTP_REST_MAX_THREADS:200}
- restIdleTimeOut: ${ZIPKIN_RECEIVER_HTTP_REST_IDLE_TIMEOUT:30000}
- restAcceptQueueSize: ${ZIPKIN_RECEIVER_HTTP_REST_QUEUE_SIZE:0}
- restMaxRequestHeaderSize: ${ZIPKIN_RECEIVER_HTTP_REST_MAX_REQUEST_HEADER_SIZE:8192}
receiver-zipkin-kafka:
selector: ${ZIPKIN_RECEIVER_ZIPKIN_KAFKA:-}
@@ -235,16 +221,6 @@ receiver-zipkin-scribe:
receiver-otel:
selector: ${SW_OTEL_RECEIVER:zipkin}
zipkin:
- gRPCHost: ${ZIPKIN_OTEL_GRPC_HOST:0.0.0.0}
- gRPCPort: ${ZIPKIN_OTEL_GRPC_PORT:0}
- gRPCThreadPoolQueueSize: ${ZIPKIN_OTEL_GRPC_POOL_QUEUE_SIZE:-1}
- gRPCThreadPoolSize: ${ZIPKIN_OTEL_GRPC_THREAD_POOL_SIZE:-1}
- gRPCSslEnabled: ${ZIPKIN_OTEL_GRPC_SSL_ENABLED:false}
- gRPCSslKeyPath: ${ZIPKIN_OTEL_GRPC_SSL_KEY_PATH:""}
- gRPCSslCertChainPath: ${ZIPKIN_OTEL_GRPC_SSL_CERT_CHAIN_PATH:""}
- gRPCSslTrustedCAPath: ${ZIPKIN_OTEL_GRPC_SSL_TRUSTED_CA_PATH:""}
- gRPCMaxConcurrentCallsPerConnection: ${ZIPKIN_OTEL_GRPC_MAX_CONCURRENT_CALL:0}
- gRPCMaxMessageSize: ${ZIPKIN_OTEL_GRPC_MAX_MESSAGE_SIZE:0}
## This module is for Zipkin query API and support zipkin-lens UI
query-zipkin:
diff --git a/zipkin-server/storage-cassandra/src/test/java/zipkin/server/storage/cassandra/ITCassandraStorage.java b/zipkin-server/storage-cassandra/src/test/java/zipkin/server/storage/cassandra/ITCassandraStorage.java
index bb66f3df247..b33ab735712 100644
--- a/zipkin-server/storage-cassandra/src/test/java/zipkin/server/storage/cassandra/ITCassandraStorage.java
+++ b/zipkin-server/storage-cassandra/src/test/java/zipkin/server/storage/cassandra/ITCassandraStorage.java
@@ -24,8 +24,8 @@
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleNotFoundException;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
-import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
-import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward;
+import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService;
+import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverModule;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.jupiter.api.BeforeAll;
@@ -54,16 +54,14 @@ public class ITCassandraStorage {
CassandraExtension cassandra = new CassandraExtension();
private final ModuleManager moduleManager = new ModuleManager();
- private SpanForward forward;
+ private SpanForwardService forward;
private ITagAutoCompleteQueryDAO tagAutoCompleteQueryDAO;
private IZipkinQueryDAO zipkinQueryDAO;
@BeforeAll
public void setup() throws ModuleNotFoundException, ModuleConfigException, ModuleStartException {
final ApplicationConfigLoader loader = new ApplicationConfigLoader();
moduleManager.init(loader.load());
- final ZipkinReceiverConfig config = new ZipkinReceiverConfig();
- config.setSearchableTracesTags("http.path");
- this.forward = new SpanForward(config, moduleManager);
+ this.forward = moduleManager.find(ZipkinReceiverModule.NAME).provider().getService(SpanForwardService.class);
this.tagAutoCompleteQueryDAO = moduleManager.find(StorageModule.NAME).provider().getService(ITagAutoCompleteQueryDAO.class);
this.zipkinQueryDAO = moduleManager.find(StorageModule.NAME).provider().getService(IZipkinQueryDAO.class);
}
diff --git a/zipkin-server/storage-cassandra/src/test/resources/application.yml b/zipkin-server/storage-cassandra/src/test/resources/application.yml
index df0e4c489a8..8477b4f0d8c 100644
--- a/zipkin-server/storage-cassandra/src/test/resources/application.yml
+++ b/zipkin-server/storage-cassandra/src/test/resources/application.yml
@@ -29,32 +29,22 @@ core:
l1FlushPeriod: ${ZIPKIN_CORE_L1_AGGREGATION_FLUSH_PERIOD:500}
# The threshold of session time. Unit is ms. Default value is 70s.
storageSessionTimeout: ${ZIPKIN_CORE_STORAGE_SESSION_TIMEOUT:70000}
- # Defines a set of span tag keys which are searchable.
- # The max length of key=value should be less than 256 or will be dropped.
- searchableTracesTags: ${ZIPKIN_SEARCHABLE_TAG_KEYS:http.path}
- # The trace sample rate precision is 1/10000, should be between 0 and 10000
- traceSampleRate: ${ZIPKIN_SAMPLE_RATE:10000}
# The number of threads used to prepare metrics data to the storage.
prepareThreads: ${ZIPKIN_PREPARE_THREADS:2}
# The period of doing data persistence. Unit is second.Default value is 25s
persistentPeriod: ${ZIPKIN_PERSISTENT_PERIOD:1}
- gRPCHost: ${ZIPKIN_GRPC_HOST:0.0.0.0}
- gRPCPort: ${ZIPKIN_GRPC_PORT:11800}
- gRPCThreadPoolQueueSize: ${ZIPKIN_GRPC_POOL_QUEUE_SIZE:-1}
- gRPCThreadPoolSize: ${ZIPKIN_GRPC_THREAD_POOL_SIZE:-1}
- gRPCSslEnabled: ${ZIPKIN_GRPC_SSL_ENABLED:false}
- gRPCSslKeyPath: ${ZIPKIN_GRPC_SSL_KEY_PATH:""}
- gRPCSslCertChainPath: ${ZIPKIN_GRPC_SSL_CERT_CHAIN_PATH:""}
- gRPCSslTrustedCAPath: ${ZIPKIN_GRPC_SSL_TRUSTED_CA_PATH:""}
- gRPCMaxConcurrentCallsPerConnection: ${ZIPKIN_GRPC_MAX_CONCURRENT_CALL:0}
- gRPCMaxMessageSize: ${ZIPKIN_GRPC_MAX_MESSAGE_SIZE:0}
- restHost: ${ZIPKIN_REST_HOST:0.0.0.0}
- restPort: ${ZIPKIN_REST_PORT:9411}
- restContextPath: ${ZIPKIN_REST_CONTEXT_PATH:/}
- restMaxThreads: ${ZIPKIN_REST_MAX_THREADS:200}
- restIdleTimeOut: ${ZIPKIN_REST_IDLE_TIMEOUT:30000}
- restAcceptQueueSize: ${ZIPKIN_REST_QUEUE_SIZE:0}
- restMaxRequestHeaderSize: ${ZIPKIN_REST_MAX_REQUEST_HEADER_SIZE:8192}
+ serverHost: ${ZIPKIN_SERVER_HOST:0.0.0.0}
+ serverPort: ${ZIPKIN_SERVER_PORT:9411}
+ serverMaxThreads: ${ZIPKIN_SERVER_MAX_THREADS:200}
+ serverIdleTimeOut: ${ZIPKIN_SERVER_IDLE_TIMEOUT:30000}
+ serverAcceptQueueSize: ${ZIPKIN_SERVER_QUEUE_SIZE:0}
+ serverMaxRequestHeaderSize: ${ZIPKIN_SERVER_MAX_REQUEST_HEADER_SIZE:8192}
+ serverThreadPoolQueueSize: ${ZIPKIN_SERVER_POOL_QUEUE_SIZE:-1}
+ serverEnableTLS: ${ZIPKIN_SERVER_SSL_ENABLED:false}
+ serverTLSKeyPath: ${ZIPKIN_SERVER_SSL_KEY_PATH:""}
+ serverTLSCertChainPath: ${ZIPKIN_SERVER_SSL_CERT_CHAIN_PATH:""}
+ clusterSslTrustedCAPath: ${ZIPKIN_SERVER_SSL_TRUSTED_CA_PATH:""}
+ searchEnable: ${ZIPKIN_SEARCH_ENABLED:true}
storage:
selector: ${ZIPKIN_STORAGE:cassandra}
@@ -74,6 +64,15 @@ storage:
maxSizeOfBatchCql: ${ZIPKIN_STORAGE_CASSANDRA_MAX_SIZE_OF_BATCH_CQL:2000}
asyncBatchPersistentPoolSize: ${ZIPKIN_STORAGE_CASSANDRA_ASYNC_BATCH_PERSISTENT_POOL_SIZE:4}
+receiver-zipkin:
+ selector: zipkin
+ zipkin:
+ # Defines a set of span tag keys which are searchable.
+ # The max length of key=value should be less than 256 or will be dropped.
+ searchableTracesTags: ${ZIPKIN_SEARCHABLE_TAG_KEYS:http.path}
+ # The trace sample rate precision is 0.0001, should be between 0 and 1
+ traceSampleRate: ${ZIPKIN_SAMPLE_RATE:1}
+
telemetry:
selector: ${ZIPKIN_TELEMETRY:none}
none:
From 8f2b3114050c3aff63f3f54a2847c56985cf3e5b Mon Sep 17 00:00:00 2001
From: Mrproliu <741550557@qq.com>
Date: Tue, 31 Oct 2023 17:54:41 +0800
Subject: [PATCH 3/3] Fix CI
---
.../receiver/zipkin/http/ZipkinHTTPReceiverProvider.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/zipkin-server/receiver-zipkin-http/src/main/java/zipkin/server/receiver/zipkin/http/ZipkinHTTPReceiverProvider.java b/zipkin-server/receiver-zipkin-http/src/main/java/zipkin/server/receiver/zipkin/http/ZipkinHTTPReceiverProvider.java
index 0efe9436e52..a9f7985bf93 100644
--- a/zipkin-server/receiver-zipkin-http/src/main/java/zipkin/server/receiver/zipkin/http/ZipkinHTTPReceiverProvider.java
+++ b/zipkin-server/receiver-zipkin-http/src/main/java/zipkin/server/receiver/zipkin/http/ZipkinHTTPReceiverProvider.java
@@ -25,6 +25,7 @@
import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService;
import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverModule;
import org.apache.skywalking.oap.server.receiver.zipkin.handler.ZipkinSpanHTTPHandler;
+import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward;
import java.util.Arrays;
@@ -64,7 +65,7 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException {
@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
final SpanForwardService spanForward = getManager().find(ZipkinReceiverModule.NAME).provider().getService(SpanForwardService.class);
- httpHandler = new ZipkinSpanHTTPHandler(spanForward, getManager());
+ httpHandler = new ZipkinSpanHTTPHandler(((SpanForward) spanForward), getManager());
final HTTPHandlerRegister httpRegister = getManager().find(CoreModule.NAME).provider().getService(HTTPHandlerRegister.class);
httpRegister.addHandler(httpHandler, Arrays.asList(HttpMethod.POST, HttpMethod.GET));