diff --git a/zipkin-server/pom.xml b/zipkin-server/pom.xml index f2bd95d2cf2..37d1099a76f 100644 --- a/zipkin-server/pom.xml +++ b/zipkin-server/pom.xml @@ -46,6 +46,8 @@ ../skywalking/oap-server/server-core ../skywalking/oap-server/server-receiver-plugin/receiver-proto ../skywalking/oap-server/server-receiver-plugin/zipkin-receiver-plugin + ../skywalking/oap-server/server-receiver-plugin/otel-receiver-plugin + ../skywalking/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin ../skywalking/oap-server/server-cluster-plugin/cluster-standalone-plugin ../skywalking/oap-server/server-cluster-plugin/cluster-consul-plugin ../skywalking/oap-server/server-cluster-plugin/cluster-etcd-plugin @@ -59,6 +61,9 @@ ../skywalking/oap-server/server-testing ../skywalking/oap-server/server-configuration/configuration-api ../skywalking/oap-server/ai-pipeline + ../skywalking/oap-server/analyzer/meter-analyzer + ../skywalking/oap-server/analyzer/log-analyzer + ../skywalking/oap-server/analyzer/agent-analyzer server-core server-starter @@ -73,6 +78,8 @@ telemetry-zipkin zipkin-dependency zipkin-storage-ext + receiver-zipkin-core + receiver-otlp-trace diff --git a/zipkin-server/receiver-otlp-trace/pom.xml b/zipkin-server/receiver-otlp-trace/pom.xml new file mode 100644 index 00000000000..02219d319d9 --- /dev/null +++ b/zipkin-server/receiver-otlp-trace/pom.xml @@ -0,0 +1,37 @@ + + + 4.0.0 + + zipkin-server-parent + io.zipkin + 2.24.4-SNAPSHOT + + + receiver-otlp-trace + OTLP Trace Receiver + + + + org.apache.skywalking + otel-receiver-plugin + ${skywalking.version} + + + org.apache.skywalking + skywalking-sharing-server-plugin + + + org.apache.skywalking + meter-analyzer + + + org.apache.skywalking + log-analyzer + + + + + + \ No newline at end of file diff --git a/zipkin-server/receiver-otlp-trace/src/main/java/zipkin/server/receiver/otlp/OTLPTraceConfig.java b/zipkin-server/receiver-otlp-trace/src/main/java/zipkin/server/receiver/otlp/OTLPTraceConfig.java new file mode 100644 index 00000000000..d61d659a035 --- /dev/null +++ b/zipkin-server/receiver-otlp-trace/src/main/java/zipkin/server/receiver/otlp/OTLPTraceConfig.java @@ -0,0 +1,122 @@ +/* + * Copyright 2015-2023 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package zipkin.server.receiver.otlp; + +import org.apache.skywalking.oap.server.library.module.ModuleConfig; + +public class OTLPTraceConfig extends ModuleConfig { + private String gRPCHost; + /** + * Only setting the real port(not 0) makes the gRPC server online. + */ + private int gRPCPort; + private int maxConcurrentCallsPerConnection; + private int maxMessageSize; + private int gRPCThreadPoolSize; + private int gRPCThreadPoolQueueSize; + private String authentication; + private boolean gRPCSslEnabled = false; + private String gRPCSslKeyPath; + private String gRPCSslCertChainPath; + private String gRPCSslTrustedCAsPath; + + public String getGRPCHost() { + return gRPCHost; + } + + public void setGRPCHost(String gRPCHost) { + this.gRPCHost = gRPCHost; + } + + public int getGRPCPort() { + return gRPCPort; + } + + public void setGRPCPort(int gRPCPort) { + this.gRPCPort = gRPCPort; + } + + public int getMaxConcurrentCallsPerConnection() { + return maxConcurrentCallsPerConnection; + } + + public void setMaxConcurrentCallsPerConnection(int maxConcurrentCallsPerConnection) { + this.maxConcurrentCallsPerConnection = maxConcurrentCallsPerConnection; + } + + public int getMaxMessageSize() { + return maxMessageSize; + } + + public void setMaxMessageSize(int maxMessageSize) { + this.maxMessageSize = maxMessageSize; + } + + public int getGRPCThreadPoolSize() { + return gRPCThreadPoolSize; + } + + public void setGRPCThreadPoolSize(int gRPCThreadPoolSize) { + this.gRPCThreadPoolSize = gRPCThreadPoolSize; + } + + public int getGRPCThreadPoolQueueSize() { + return gRPCThreadPoolQueueSize; + } + + public void setGRPCThreadPoolQueueSize(int gRPCThreadPoolQueueSize) { + this.gRPCThreadPoolQueueSize = gRPCThreadPoolQueueSize; + } + + public String getAuthentication() { + return authentication; + } + + public void setAuthentication(String authentication) { + this.authentication = authentication; + } + + public boolean getGRPCSslEnabled() { + return gRPCSslEnabled; + } + + public void setGRPCSslEnabled(boolean gRPCSslEnabled) { + this.gRPCSslEnabled = gRPCSslEnabled; + } + + public String getGRPCSslKeyPath() { + return gRPCSslKeyPath; + } + + public void setGRPCSslKeyPath(String gRPCSslKeyPath) { + this.gRPCSslKeyPath = gRPCSslKeyPath; + } + + public String getGRPCSslCertChainPath() { + return gRPCSslCertChainPath; + } + + public void setGRPCSslCertChainPath(String gRPCSslCertChainPath) { + this.gRPCSslCertChainPath = gRPCSslCertChainPath; + } + + public String getGRPCSslTrustedCAsPath() { + return gRPCSslTrustedCAsPath; + } + + public void setGRPCSslTrustedCAsPath(String gRPCSslTrustedCAsPath) { + this.gRPCSslTrustedCAsPath = gRPCSslTrustedCAsPath; + } +} diff --git a/zipkin-server/receiver-otlp-trace/src/main/java/zipkin/server/receiver/otlp/OTLPTraceProvider.java b/zipkin-server/receiver-otlp-trace/src/main/java/zipkin/server/receiver/otlp/OTLPTraceProvider.java new file mode 100644 index 00000000000..c83500f7ed3 --- /dev/null +++ b/zipkin-server/receiver-otlp-trace/src/main/java/zipkin/server/receiver/otlp/OTLPTraceProvider.java @@ -0,0 +1,126 @@ +/* + * Copyright 2015-2023 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package zipkin.server.receiver.otlp; + +import org.apache.logging.log4j.util.Strings; +import org.apache.skywalking.oap.server.core.CoreModule; +import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister; +import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegisterImpl; +import org.apache.skywalking.oap.server.library.module.ModuleConfig; +import org.apache.skywalking.oap.server.library.module.ModuleDefine; +import org.apache.skywalking.oap.server.library.module.ModuleProvider; +import org.apache.skywalking.oap.server.library.module.ModuleStartException; +import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException; +import org.apache.skywalking.oap.server.library.server.ServerException; +import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer; +import org.apache.skywalking.oap.server.receiver.otel.OtelMetricReceiverConfig; +import org.apache.skywalking.oap.server.receiver.otel.OtelMetricReceiverModule; +import org.apache.skywalking.oap.server.receiver.otel.otlp.OpenTelemetryMetricRequestProcessor; +import org.apache.skywalking.oap.server.receiver.otel.otlp.OpenTelemetryTraceHandler; +import zipkin.server.receiver.otlp.handler.OTLPTraceHandler; + +public class OTLPTraceProvider extends ModuleProvider { + private OTLPTraceConfig moduleConfig; + private OpenTelemetryTraceHandler traceHandler; + private GRPCServer grpcServer; + + @Override + public String name() { + return "zipkin"; + } + + @Override + public Class module() { + return OtelMetricReceiverModule.class; + } + + @Override + public ConfigCreator newConfigCreator() { + return new ConfigCreator() { + + @Override + public Class type() { + return OTLPTraceConfig.class; + } + + @Override + public void onInitialized(OTLPTraceConfig initialized) { + moduleConfig = initialized; + } + }; + } + + @Override + public void prepare() throws ServiceNotProvidedException, ModuleStartException { + this.registerServiceImplementation(OpenTelemetryMetricRequestProcessor.class, + new OpenTelemetryMetricRequestProcessor(getManager(), new OtelMetricReceiverConfig())); + } + + @Override + public void start() throws ServiceNotProvidedException, ModuleStartException { + GRPCHandlerRegister handlerRegister; + if (moduleConfig.getGRPCPort() > 0) { + if (moduleConfig.getGRPCSslEnabled()) { + grpcServer = new GRPCServer( + Strings.isBlank(moduleConfig.getGRPCHost()) ? "0.0.0.0" : moduleConfig.getGRPCHost(), + moduleConfig.getGRPCPort(), + moduleConfig.getGRPCSslCertChainPath(), + moduleConfig.getGRPCSslKeyPath(), + moduleConfig.getGRPCSslTrustedCAsPath() + ); + } else { + grpcServer = new GRPCServer( + Strings.isBlank(moduleConfig.getGRPCHost()) ? "0.0.0.0" : moduleConfig.getGRPCHost(), + moduleConfig.getGRPCPort() + ); + } + if (moduleConfig.getMaxMessageSize() > 0) { + grpcServer.setMaxMessageSize(moduleConfig.getMaxMessageSize()); + } + if (moduleConfig.getMaxConcurrentCallsPerConnection() > 0) { + grpcServer.setMaxConcurrentCallsPerConnection(moduleConfig.getMaxConcurrentCallsPerConnection()); + } + if (moduleConfig.getGRPCThreadPoolQueueSize() > 0) { + grpcServer.setThreadPoolQueueSize(moduleConfig.getGRPCThreadPoolQueueSize()); + } + if (moduleConfig.getGRPCThreadPoolSize() > 0) { + grpcServer.setThreadPoolSize(moduleConfig.getGRPCThreadPoolSize()); + } + grpcServer.initialize(); + + handlerRegister = new GRPCHandlerRegisterImpl(grpcServer); + } else { + handlerRegister = getManager().find(CoreModule.NAME).provider().getService(GRPCHandlerRegister.class); + } + traceHandler = new OTLPTraceHandler(handlerRegister, getManager()); + traceHandler.active(); + } + + @Override + public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException { + if (grpcServer != null) { + try { + grpcServer.start(); + } catch (ServerException e) { + throw new ModuleStartException(e.getMessage(), e); + } + } + } + + @Override + public String[] requiredModules() { + return new String[0]; + } +} diff --git a/zipkin-server/receiver-otlp-trace/src/main/java/zipkin/server/receiver/otlp/handler/OTLPTraceHandler.java b/zipkin-server/receiver-otlp-trace/src/main/java/zipkin/server/receiver/otlp/handler/OTLPTraceHandler.java new file mode 100644 index 00000000000..ff5e3e8de9f --- /dev/null +++ b/zipkin-server/receiver-otlp-trace/src/main/java/zipkin/server/receiver/otlp/handler/OTLPTraceHandler.java @@ -0,0 +1,33 @@ +/* + * Copyright 2015-2023 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package zipkin.server.receiver.otlp.handler; + +import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister; +import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.library.module.ModuleStartException; +import org.apache.skywalking.oap.server.receiver.otel.otlp.OpenTelemetryTraceHandler; + +public class OTLPTraceHandler extends OpenTelemetryTraceHandler { + private final GRPCHandlerRegister register; + public OTLPTraceHandler(GRPCHandlerRegister register, ModuleManager manager) { + super(manager); + this.register = register; + } + + @Override + public void active() throws ModuleStartException { + register.addHandler(this); + } +} diff --git a/zipkin-server/receiver-otlp-trace/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider b/zipkin-server/receiver-otlp-trace/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider new file mode 100755 index 00000000000..cff78c2087a --- /dev/null +++ b/zipkin-server/receiver-otlp-trace/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# + +zipkin.server.receiver.otlp.OTLPTraceProvider \ No newline at end of file diff --git a/zipkin-server/receiver-zipkin-activemq/pom.xml b/zipkin-server/receiver-zipkin-activemq/pom.xml index b400b6ae2a5..2dda22ee0ea 100644 --- a/zipkin-server/receiver-zipkin-activemq/pom.xml +++ b/zipkin-server/receiver-zipkin-activemq/pom.xml @@ -19,16 +19,10 @@ io.zipkin - zipkin-server-core + receiver-zipkin-core ${project.version} - - org.apache.skywalking - zipkin-receiver-plugin - ${skywalking.version} - - org.apache.activemq activemq-client diff --git a/zipkin-server/receiver-zipkin-activemq/src/main/java/zipkin/server/receiver/zipkin/activemq/ActiveMQHandler.java b/zipkin-server/receiver-zipkin-activemq/src/main/java/zipkin/server/receiver/zipkin/activemq/ActiveMQHandler.java index d72de0d9c9a..dea971ecaea 100644 --- a/zipkin-server/receiver-zipkin-activemq/src/main/java/zipkin/server/receiver/zipkin/activemq/ActiveMQHandler.java +++ b/zipkin-server/receiver-zipkin-activemq/src/main/java/zipkin/server/receiver/zipkin/activemq/ActiveMQHandler.java @@ -20,7 +20,7 @@ import org.apache.activemq.transport.TransportListener; import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.util.StringUtil; -import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward; +import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService; import org.apache.skywalking.oap.server.telemetry.TelemetryModule; import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics; import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics; @@ -54,7 +54,7 @@ public class ActiveMQHandler implements TransportListener, MessageListener, Clos private static final Logger log = LoggerFactory.getLogger(ActiveMQHandler.class.getName()); private final ZipkinActiveMQConfig config; - private final SpanForward spanForward; + private final SpanForwardService spanForward; private final ActiveMQConnectionFactory connectionFactory; static final CheckResult @@ -70,11 +70,11 @@ public class ActiveMQHandler implements TransportListener, MessageListener, Clos volatile CheckResult checkResult = CheckResult.OK; - public ActiveMQHandler(ZipkinActiveMQConfig config, SpanForward spanForward, ModuleManager moduleManager) { + public ActiveMQHandler(ZipkinActiveMQConfig config, SpanForwardService spanForward, ModuleManager moduleManager) { this(config, createConnectionFactory(config), spanForward, moduleManager); } - public ActiveMQHandler(ZipkinActiveMQConfig config, ActiveMQConnectionFactory connectionFactory, SpanForward spanForward, ModuleManager moduleManager) { + public ActiveMQHandler(ZipkinActiveMQConfig config, ActiveMQConnectionFactory connectionFactory, SpanForwardService spanForward, ModuleManager moduleManager) { this.config = config; this.spanForward = spanForward; this.connectionFactory = connectionFactory; diff --git a/zipkin-server/receiver-zipkin-activemq/src/main/java/zipkin/server/receiver/zipkin/activemq/ZipkinActiveMQProvider.java b/zipkin-server/receiver-zipkin-activemq/src/main/java/zipkin/server/receiver/zipkin/activemq/ZipkinActiveMQProvider.java index d2430e0695d..2338872f6b4 100644 --- a/zipkin-server/receiver-zipkin-activemq/src/main/java/zipkin/server/receiver/zipkin/activemq/ZipkinActiveMQProvider.java +++ b/zipkin-server/receiver-zipkin-activemq/src/main/java/zipkin/server/receiver/zipkin/activemq/ZipkinActiveMQProvider.java @@ -15,18 +15,17 @@ package zipkin.server.receiver.zipkin.activemq; import org.apache.skywalking.oap.server.core.CoreModule; -import org.apache.skywalking.oap.server.core.config.ConfigService; import org.apache.skywalking.oap.server.library.module.ModuleConfig; import org.apache.skywalking.oap.server.library.module.ModuleDefine; import org.apache.skywalking.oap.server.library.module.ModuleProvider; import org.apache.skywalking.oap.server.library.module.ModuleStartException; import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException; -import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward; -import zipkin.server.core.services.ZipkinConfigService; +import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService; +import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverModule; public class ZipkinActiveMQProvider extends ModuleProvider { private ZipkinActiveMQConfig moduleConfig; - private SpanForward spanForward; + private SpanForwardService spanForward; @Override public String name() { @@ -59,8 +58,7 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException { @Override public void start() throws ServiceNotProvidedException, ModuleStartException { - final ConfigService service = getManager().find(CoreModule.NAME).provider().getService(ConfigService.class); - this.spanForward = new SpanForward(((ZipkinConfigService)service).toZipkinReceiverConfig(), getManager()); + this.spanForward = getManager().find(ZipkinReceiverModule.NAME).provider().getService(SpanForwardService.class); } @Override diff --git a/zipkin-server/receiver-zipkin-core/pom.xml b/zipkin-server/receiver-zipkin-core/pom.xml new file mode 100644 index 00000000000..00fed3985fd --- /dev/null +++ b/zipkin-server/receiver-zipkin-core/pom.xml @@ -0,0 +1,29 @@ + + + 4.0.0 + + zipkin-server-parent + io.zipkin + 2.24.4-SNAPSHOT + + + receiver-zipkin-core + Zipkin Receiver Core + + + + io.zipkin + zipkin-server-core + ${project.version} + + + + org.apache.skywalking + zipkin-receiver-plugin + ${skywalking.version} + + + + \ No newline at end of file diff --git a/zipkin-server/receiver-zipkin-core/src/main/java/zipkin/server/receiver/zipkin/core/SpanForwardCore.java b/zipkin-server/receiver-zipkin-core/src/main/java/zipkin/server/receiver/zipkin/core/SpanForwardCore.java new file mode 100644 index 00000000000..b688046840d --- /dev/null +++ b/zipkin-server/receiver-zipkin-core/src/main/java/zipkin/server/receiver/zipkin/core/SpanForwardCore.java @@ -0,0 +1,36 @@ +/* + * Copyright 2015-2023 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package zipkin.server.receiver.zipkin.core; + +import org.apache.skywalking.oap.server.core.Const; +import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig; +import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +public class SpanForwardCore extends SpanForward { + private final ZipkinReceiverConfig config; + public SpanForwardCore(ZipkinReceiverConfig config, ModuleManager manager) { + super(config, manager); + this.config = config; + } + + public Set getTagAutocompleteKeys() { + return new HashSet<>(Arrays.asList(config.getSearchableTracesTags().split(Const.COMMA))); + } +} diff --git a/zipkin-server/receiver-zipkin-core/src/main/java/zipkin/server/receiver/zipkin/core/ZipkinReceiverCoreConfig.java b/zipkin-server/receiver-zipkin-core/src/main/java/zipkin/server/receiver/zipkin/core/ZipkinReceiverCoreConfig.java new file mode 100644 index 00000000000..acb35146e01 --- /dev/null +++ b/zipkin-server/receiver-zipkin-core/src/main/java/zipkin/server/receiver/zipkin/core/ZipkinReceiverCoreConfig.java @@ -0,0 +1,53 @@ +/* + * Copyright 2015-2023 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package zipkin.server.receiver.zipkin.core; + +import org.apache.skywalking.oap.server.core.Const; +import org.apache.skywalking.oap.server.library.module.ModuleConfig; + +public class ZipkinReceiverCoreConfig extends ModuleConfig { + + /** + * The trace sample rate precision is 0.0001, should be between 0 and 1. + */ + private double traceSampleRate = 1.0f; + + /** + * Defines a set of span tag keys which are searchable. + * The max length of key=value should be less than 256 or will be dropped. + */ + private String searchableTracesTags = DEFAULT_SEARCHABLE_TAG_KEYS; + + private static final String DEFAULT_SEARCHABLE_TAG_KEYS = String.join( + Const.COMMA, + "http.method" + ); + + public double getTraceSampleRate() { + return traceSampleRate; + } + + public void setTraceSampleRate(double traceSampleRate) { + this.traceSampleRate = traceSampleRate; + } + + public String getSearchableTracesTags() { + return searchableTracesTags; + } + + public void setSearchableTracesTags(String searchableTracesTags) { + this.searchableTracesTags = searchableTracesTags; + } +} diff --git a/zipkin-server/receiver-zipkin-core/src/main/java/zipkin/server/receiver/zipkin/core/ZipkinReceiverCoreProvider.java b/zipkin-server/receiver-zipkin-core/src/main/java/zipkin/server/receiver/zipkin/core/ZipkinReceiverCoreProvider.java new file mode 100644 index 00000000000..95082236b23 --- /dev/null +++ b/zipkin-server/receiver-zipkin-core/src/main/java/zipkin/server/receiver/zipkin/core/ZipkinReceiverCoreProvider.java @@ -0,0 +1,76 @@ +/* + * Copyright 2015-2023 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package zipkin.server.receiver.zipkin.core; + +import org.apache.skywalking.oap.server.library.module.ModuleConfig; +import org.apache.skywalking.oap.server.library.module.ModuleDefine; +import org.apache.skywalking.oap.server.library.module.ModuleProvider; +import org.apache.skywalking.oap.server.library.module.ModuleStartException; +import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException; +import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService; +import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig; +import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverModule; + +public class ZipkinReceiverCoreProvider extends ModuleProvider { + private ZipkinReceiverCoreConfig moduleConfig; + + @Override + public String name() { + return "zipkin"; + } + + @Override + public Class module() { + return ZipkinReceiverModule.class; + } + + @Override + public ConfigCreator newConfigCreator() { + return new ConfigCreator() { + + @Override + public Class type() { + return ZipkinReceiverCoreConfig.class; + } + + @Override + public void onInitialized(ZipkinReceiverCoreConfig initialized) { + moduleConfig = initialized; + } + }; + } + + @Override + public void prepare() throws ServiceNotProvidedException, ModuleStartException { + final ZipkinReceiverConfig config = new ZipkinReceiverConfig(); + config.setSearchableTracesTags(moduleConfig.getSearchableTracesTags()); + config.setSampleRate((int) (moduleConfig.getTraceSampleRate() * 10000)); + + this.registerServiceImplementation(SpanForwardService.class, new SpanForwardCore(config, getManager())); + } + + @Override + public void start() throws ServiceNotProvidedException, ModuleStartException { + } + + @Override + public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException { + } + + @Override + public String[] requiredModules() { + return new String[0]; + } +} diff --git a/zipkin-server/receiver-zipkin-core/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider b/zipkin-server/receiver-zipkin-core/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider new file mode 100755 index 00000000000..1e28be08cf5 --- /dev/null +++ b/zipkin-server/receiver-zipkin-core/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# + +zipkin.server.receiver.zipkin.core.ZipkinReceiverCoreProvider \ No newline at end of file diff --git a/zipkin-server/receiver-zipkin-http/pom.xml b/zipkin-server/receiver-zipkin-http/pom.xml index 612fab6aeb0..bf32ec7294d 100644 --- a/zipkin-server/receiver-zipkin-http/pom.xml +++ b/zipkin-server/receiver-zipkin-http/pom.xml @@ -15,15 +15,9 @@ io.zipkin - zipkin-server-core + receiver-zipkin-core ${project.version} - - - org.apache.skywalking - zipkin-receiver-plugin - ${skywalking.version} - \ No newline at end of file diff --git a/zipkin-server/receiver-zipkin-http/src/main/java/zipkin/server/receiver/zipkin/http/ZipkinHTTPReceiverProvider.java b/zipkin-server/receiver-zipkin-http/src/main/java/zipkin/server/receiver/zipkin/http/ZipkinHTTPReceiverProvider.java index c9a3e4cabaa..0550fbc089e 100644 --- a/zipkin-server/receiver-zipkin-http/src/main/java/zipkin/server/receiver/zipkin/http/ZipkinHTTPReceiverProvider.java +++ b/zipkin-server/receiver-zipkin-http/src/main/java/zipkin/server/receiver/zipkin/http/ZipkinHTTPReceiverProvider.java @@ -16,7 +16,6 @@ import com.linecorp.armeria.common.HttpMethod; import org.apache.skywalking.oap.server.core.CoreModule; -import org.apache.skywalking.oap.server.core.config.ConfigService; import org.apache.skywalking.oap.server.core.server.HTTPHandlerRegister; import org.apache.skywalking.oap.server.library.module.ModuleConfig; import org.apache.skywalking.oap.server.library.module.ModuleDefine; @@ -25,10 +24,10 @@ import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException; import org.apache.skywalking.oap.server.library.server.http.HTTPServer; import org.apache.skywalking.oap.server.library.server.http.HTTPServerConfig; +import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService; +import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverModule; import org.apache.skywalking.oap.server.receiver.zipkin.handler.ZipkinSpanHTTPHandler; -import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward; import zipkin.server.core.services.HTTPConfigurableServer; -import zipkin.server.core.services.ZipkinConfigService; import java.util.Arrays; @@ -81,8 +80,7 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException { @Override public void start() throws ServiceNotProvidedException, ModuleStartException { - final ConfigService service = getManager().find(CoreModule.NAME).provider().getService(ConfigService.class); - final SpanForward spanForward = new SpanForward(((ZipkinConfigService)service).toZipkinReceiverConfig(), getManager()); + final SpanForwardService spanForward = getManager().find(ZipkinReceiverModule.NAME).provider().getService(SpanForwardService.class); httpHandler = new ZipkinSpanHTTPHandler(spanForward, getManager()); if (httpServer != null) { diff --git a/zipkin-server/receiver-zipkin-http/src/test/java/zipkin/server/receiver/zipkin/http/ITHTTPReceiver.java b/zipkin-server/receiver-zipkin-http/src/test/java/zipkin/server/receiver/zipkin/http/ITHTTPReceiver.java index 0d72caaee4e..d51bfa772e8 100644 --- a/zipkin-server/receiver-zipkin-http/src/test/java/zipkin/server/receiver/zipkin/http/ITHTTPReceiver.java +++ b/zipkin-server/receiver-zipkin-http/src/test/java/zipkin/server/receiver/zipkin/http/ITHTTPReceiver.java @@ -16,13 +16,14 @@ import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.core.CoreModuleProvider; -import org.apache.skywalking.oap.server.core.config.ConfigService; import org.apache.skywalking.oap.server.core.server.HTTPHandlerRegister; import org.apache.skywalking.oap.server.core.server.HTTPHandlerRegisterImpl; import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.module.ModuleStartException; import org.apache.skywalking.oap.server.library.server.http.HTTPServer; import org.apache.skywalking.oap.server.library.server.http.HTTPServerConfig; +import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService; +import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverModule; import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward; import org.apache.skywalking.oap.server.telemetry.TelemetryModule; import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator; @@ -37,8 +38,7 @@ import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import org.powermock.reflect.Whitebox; -import zipkin.server.core.CoreModuleConfig; -import zipkin.server.core.services.ZipkinConfigService; +import zipkin.server.receiver.zipkin.core.ZipkinReceiverCoreProvider; import zipkin2.Span; import zipkin2.TestObjects; import zipkin2.codec.SpanBytesEncoder; @@ -72,7 +72,7 @@ public class ITHTTPReceiver { public void setup() throws ModuleStartException { final HTTPServer httpServer = new HTTPServer(HTTPServerConfig.builder().host("0.0.0.0").port(port).contextPath("/").build()); httpServer.initialize(); - moduleManager = setupModuleManager(httpServer); + moduleManager = setupModuleManager(httpServer, forward); final ZipkinHTTPReceiverProvider provider = new ZipkinHTTPReceiverProvider(); provider.setManager(moduleManager); @@ -86,7 +86,6 @@ public void setup() throws ModuleStartException { spans.add(invocationOnMock.getArgument(0, ArrayList.class)); return null; }).when(forward).send(any()); - Whitebox.setInternalState(provider.getHttpHandler(), SpanForward.class, forward); provider.notifyAfterCompleted(); } @@ -116,7 +115,7 @@ public void test() throws Exception { assertThat(spans.take()).containsExactly(CLIENT_SPAN); } - private ModuleManager setupModuleManager(HTTPServer httpServer) { + private ModuleManager setupModuleManager(HTTPServer httpServer, SpanForward forward) { ModuleManager moduleManager = Mockito.mock(ModuleManager.class); CoreModule coreModule = Mockito.spy(CoreModule.class); @@ -125,13 +124,17 @@ private ModuleManager setupModuleManager(HTTPServer httpServer) { Mockito.when(moduleManager.find(CoreModule.NAME)).thenReturn(coreModule); Mockito.when(coreModule.provider().getService(HTTPHandlerRegister.class)).thenReturn(new HTTPHandlerRegisterImpl(httpServer)); + final ZipkinReceiverModule zipkinReceiverModule = Mockito.spy(ZipkinReceiverModule.class); + final ZipkinReceiverCoreProvider receiverProvider = Mockito.mock(ZipkinReceiverCoreProvider.class); + Whitebox.setInternalState(zipkinReceiverModule, "loadedProvider", receiverProvider); + Mockito.when(moduleManager.find(ZipkinReceiverModule.NAME)).thenReturn(zipkinReceiverModule); + Mockito.when(zipkinReceiverModule.provider().getService(SpanForwardService.class)).thenReturn(forward); + TelemetryModule telemetryModule = Mockito.spy(TelemetryModule.class); NoneTelemetryProvider noneTelemetryProvider = Mockito.mock(NoneTelemetryProvider.class); Whitebox.setInternalState(telemetryModule, "loadedProvider", noneTelemetryProvider); Mockito.when(moduleManager.find(TelemetryModule.NAME)).thenReturn(telemetryModule); - Mockito.when(moduleProvider.getService(ConfigService.class)) - .thenReturn(new ZipkinConfigService(new CoreModuleConfig(), moduleProvider)); Mockito.when(noneTelemetryProvider.getService(MetricsCreator.class)) .thenReturn(new MetricsCreatorNoop()); diff --git a/zipkin-server/receiver-zipkin-kafka/pom.xml b/zipkin-server/receiver-zipkin-kafka/pom.xml index f54ecc6684c..a8968153150 100644 --- a/zipkin-server/receiver-zipkin-kafka/pom.xml +++ b/zipkin-server/receiver-zipkin-kafka/pom.xml @@ -15,15 +15,9 @@ io.zipkin - zipkin-server-core + receiver-zipkin-core ${project.version} - - - org.apache.skywalking - zipkin-receiver-plugin - ${skywalking.version} - \ No newline at end of file diff --git a/zipkin-server/receiver-zipkin-kafka/src/main/java/zipkin/server/receiver/zipkin/kafka/ZipkinKafkaReceiverProvider.java b/zipkin-server/receiver-zipkin-kafka/src/main/java/zipkin/server/receiver/zipkin/kafka/ZipkinKafkaReceiverProvider.java index d20fa1422a9..7b3625ed6ef 100644 --- a/zipkin-server/receiver-zipkin-kafka/src/main/java/zipkin/server/receiver/zipkin/kafka/ZipkinKafkaReceiverProvider.java +++ b/zipkin-server/receiver-zipkin-kafka/src/main/java/zipkin/server/receiver/zipkin/kafka/ZipkinKafkaReceiverProvider.java @@ -15,19 +15,19 @@ package zipkin.server.receiver.zipkin.kafka; import org.apache.skywalking.oap.server.core.CoreModule; -import org.apache.skywalking.oap.server.core.config.ConfigService; import org.apache.skywalking.oap.server.library.module.ModuleConfig; import org.apache.skywalking.oap.server.library.module.ModuleDefine; import org.apache.skywalking.oap.server.library.module.ModuleProvider; import org.apache.skywalking.oap.server.library.module.ModuleStartException; import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException; +import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService; +import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverModule; import org.apache.skywalking.oap.server.receiver.zipkin.kafka.KafkaHandler; import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward; -import zipkin.server.core.services.ZipkinConfigService; public class ZipkinKafkaReceiverProvider extends ModuleProvider { private ZipkinKafkaReceiverConfig moduleConfig; - private SpanForward spanForward; + private SpanForwardService spanForward; private KafkaHandler kafkaHandler; @Override @@ -61,13 +61,12 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException { @Override public void start() throws ServiceNotProvidedException, ModuleStartException { - final ConfigService service = getManager().find(CoreModule.NAME).provider().getService(ConfigService.class); - this.spanForward = new SpanForward(((ZipkinConfigService)service).toZipkinReceiverConfig(), getManager()); + this.spanForward = getManager().find(ZipkinReceiverModule.NAME).provider().getService(SpanForwardService.class); } @Override public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException { - kafkaHandler = new KafkaHandler(moduleConfig.toSkyWalkingConfig(), this.spanForward, getManager()); + kafkaHandler = new KafkaHandler(moduleConfig.toSkyWalkingConfig(), ((SpanForward) this.spanForward), getManager()); kafkaHandler.start(); } diff --git a/zipkin-server/receiver-zipkin-kafka/src/test/java/zipkin/server/receiver/zipkin/kafka/ITKafkaReceiver.java b/zipkin-server/receiver-zipkin-kafka/src/test/java/zipkin/server/receiver/zipkin/kafka/ITKafkaReceiver.java index f6d725163b2..93ae3679228 100644 --- a/zipkin-server/receiver-zipkin-kafka/src/test/java/zipkin/server/receiver/zipkin/kafka/ITKafkaReceiver.java +++ b/zipkin-server/receiver-zipkin-kafka/src/test/java/zipkin/server/receiver/zipkin/kafka/ITKafkaReceiver.java @@ -18,11 +18,10 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.apache.skywalking.oap.server.core.CoreModule; -import org.apache.skywalking.oap.server.core.CoreModuleProvider; -import org.apache.skywalking.oap.server.core.config.ConfigService; import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.module.ModuleStartException; +import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService; +import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverModule; import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward; import org.apache.skywalking.oap.server.telemetry.TelemetryModule; import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator; @@ -39,8 +38,7 @@ import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import org.powermock.reflect.Whitebox; -import zipkin.server.core.CoreModuleConfig; -import zipkin.server.core.services.ZipkinConfigService; +import zipkin.server.receiver.zipkin.core.ZipkinReceiverCoreProvider; import zipkin2.Span; import zipkin2.TestObjects; import zipkin2.codec.SpanBytesEncoder; @@ -80,7 +78,7 @@ public void setup() throws ModuleStartException { config.setKafkaHandlerThreadPoolSize(2); config.setKafkaHandlerThreadPoolQueueSize(100); - moduleManager = setupModuleManager(); + moduleManager = setupModuleManager(forward); final ZipkinKafkaReceiverProvider provider = new ZipkinKafkaReceiverProvider(); provider.setManager(moduleManager); @@ -91,7 +89,6 @@ public void setup() throws ModuleStartException { spans.add(invocationOnMock.getArgument(0, ArrayList.class)); return null; }).when(forward).send(any()); - Whitebox.setInternalState(provider, SpanForward.class, forward); provider.notifyAfterCompleted(); kafka.prepareTopics(config.getKafkaTopic(), 1); @@ -121,21 +118,20 @@ public void tearDown() { } } - private ModuleManager setupModuleManager() { + private ModuleManager setupModuleManager(SpanForward forward) { ModuleManager moduleManager = Mockito.mock(ModuleManager.class); - CoreModule coreModule = Mockito.spy(CoreModule.class); - CoreModuleProvider moduleProvider = Mockito.mock(CoreModuleProvider.class); - Whitebox.setInternalState(coreModule, "loadedProvider", moduleProvider); - Mockito.when(moduleManager.find(CoreModule.NAME)).thenReturn(coreModule); + final ZipkinReceiverModule zipkinReceiverModule = Mockito.spy(ZipkinReceiverModule.class); + final ZipkinReceiverCoreProvider receiverProvider = Mockito.mock(ZipkinReceiverCoreProvider.class); + Whitebox.setInternalState(zipkinReceiverModule, "loadedProvider", receiverProvider); + Mockito.when(moduleManager.find(ZipkinReceiverModule.NAME)).thenReturn(zipkinReceiverModule); + Mockito.when(zipkinReceiverModule.provider().getService(SpanForwardService.class)).thenReturn(forward); TelemetryModule telemetryModule = Mockito.spy(TelemetryModule.class); NoneTelemetryProvider noneTelemetryProvider = Mockito.mock(NoneTelemetryProvider.class); Whitebox.setInternalState(telemetryModule, "loadedProvider", noneTelemetryProvider); Mockito.when(moduleManager.find(TelemetryModule.NAME)).thenReturn(telemetryModule); - Mockito.when(moduleProvider.getService(ConfigService.class)) - .thenReturn(new ZipkinConfigService(new CoreModuleConfig(), moduleProvider)); Mockito.when(noneTelemetryProvider.getService(MetricsCreator.class)) .thenReturn(new MetricsCreatorNoop()); diff --git a/zipkin-server/receiver-zipkin-rabbitmq/pom.xml b/zipkin-server/receiver-zipkin-rabbitmq/pom.xml index 904a4c0c0da..ec2933ee564 100644 --- a/zipkin-server/receiver-zipkin-rabbitmq/pom.xml +++ b/zipkin-server/receiver-zipkin-rabbitmq/pom.xml @@ -19,16 +19,10 @@ io.zipkin - zipkin-server-core + receiver-zipkin-core ${project.version} - - org.apache.skywalking - zipkin-receiver-plugin - ${skywalking.version} - - com.rabbitmq amqp-client diff --git a/zipkin-server/receiver-zipkin-rabbitmq/src/main/java/zipkin/server/receriver/zipkin/rabbitmq/ZipkinRabbitMQHandler.java b/zipkin-server/receiver-zipkin-rabbitmq/src/main/java/zipkin/server/receriver/zipkin/rabbitmq/ZipkinRabbitMQHandler.java index d6448ef54a0..a453c69d2a7 100644 --- a/zipkin-server/receiver-zipkin-rabbitmq/src/main/java/zipkin/server/receriver/zipkin/rabbitmq/ZipkinRabbitMQHandler.java +++ b/zipkin-server/receiver-zipkin-rabbitmq/src/main/java/zipkin/server/receriver/zipkin/rabbitmq/ZipkinRabbitMQHandler.java @@ -24,7 +24,7 @@ import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.util.CollectionUtils; import org.apache.skywalking.oap.server.library.util.StringUtil; -import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward; +import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService; import org.apache.skywalking.oap.server.telemetry.TelemetryModule; import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics; import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator; @@ -45,13 +45,13 @@ public class ZipkinRabbitMQHandler { private final ZipkinRabbitMQConfig config; - private final SpanForward spanForward; + private final SpanForwardService spanForward; private final ConnectionFactory connectionFactory = new ConnectionFactory(); private final HistogramMetrics histogram; - public ZipkinRabbitMQHandler(ZipkinRabbitMQConfig config, SpanForward spanForward, ModuleManager moduleManager) throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException { + public ZipkinRabbitMQHandler(ZipkinRabbitMQConfig config, SpanForwardService spanForward, ModuleManager moduleManager) throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException { this.config = config; this.spanForward = spanForward; diff --git a/zipkin-server/receiver-zipkin-rabbitmq/src/main/java/zipkin/server/receriver/zipkin/rabbitmq/ZipkinRabbitMQProvider.java b/zipkin-server/receiver-zipkin-rabbitmq/src/main/java/zipkin/server/receriver/zipkin/rabbitmq/ZipkinRabbitMQProvider.java index 1852f1413a0..e089e7d0620 100644 --- a/zipkin-server/receiver-zipkin-rabbitmq/src/main/java/zipkin/server/receriver/zipkin/rabbitmq/ZipkinRabbitMQProvider.java +++ b/zipkin-server/receiver-zipkin-rabbitmq/src/main/java/zipkin/server/receriver/zipkin/rabbitmq/ZipkinRabbitMQProvider.java @@ -15,18 +15,17 @@ package zipkin.server.receriver.zipkin.rabbitmq; import org.apache.skywalking.oap.server.core.CoreModule; -import org.apache.skywalking.oap.server.core.config.ConfigService; import org.apache.skywalking.oap.server.library.module.ModuleConfig; import org.apache.skywalking.oap.server.library.module.ModuleDefine; import org.apache.skywalking.oap.server.library.module.ModuleProvider; import org.apache.skywalking.oap.server.library.module.ModuleStartException; import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException; -import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward; -import zipkin.server.core.services.ZipkinConfigService; +import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService; +import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverModule; public class ZipkinRabbitMQProvider extends ModuleProvider { private ZipkinRabbitMQConfig moduleConfig; - private SpanForward spanForward; + private SpanForwardService spanForward; @Override public String name() { @@ -59,8 +58,7 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException { @Override public void start() throws ServiceNotProvidedException, ModuleStartException { - final ConfigService service = getManager().find(CoreModule.NAME).provider().getService(ConfigService.class); - this.spanForward = new SpanForward(((ZipkinConfigService)service).toZipkinReceiverConfig(), getManager()); + this.spanForward = getManager().find(ZipkinReceiverModule.NAME).provider().getService(SpanForwardService.class); } @Override diff --git a/zipkin-server/receiver-zipkin-rabbitmq/src/test/java/zipkin/server/receiver/zipkin/rabbitmq/ITRabbitMQReceiver.java b/zipkin-server/receiver-zipkin-rabbitmq/src/test/java/zipkin/server/receiver/zipkin/rabbitmq/ITRabbitMQReceiver.java index bc9ee4ac6e6..9006f7db5df 100644 --- a/zipkin-server/receiver-zipkin-rabbitmq/src/test/java/zipkin/server/receiver/zipkin/rabbitmq/ITRabbitMQReceiver.java +++ b/zipkin-server/receiver-zipkin-rabbitmq/src/test/java/zipkin/server/receiver/zipkin/rabbitmq/ITRabbitMQReceiver.java @@ -17,11 +17,10 @@ import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; -import org.apache.skywalking.oap.server.core.CoreModule; -import org.apache.skywalking.oap.server.core.CoreModuleProvider; -import org.apache.skywalking.oap.server.core.config.ConfigService; import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.module.ModuleStartException; +import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService; +import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverModule; import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward; import org.apache.skywalking.oap.server.telemetry.TelemetryModule; import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator; @@ -37,8 +36,7 @@ import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import org.powermock.reflect.Whitebox; -import zipkin.server.core.CoreModuleConfig; -import zipkin.server.core.services.ZipkinConfigService; +import zipkin.server.receiver.zipkin.core.ZipkinReceiverCoreProvider; import zipkin.server.receriver.zipkin.rabbitmq.ZipkinRabbitMQConfig; import zipkin.server.receriver.zipkin.rabbitmq.ZipkinRabbitMQProvider; import zipkin2.Span; @@ -77,7 +75,7 @@ public void setup() throws ModuleStartException, IOException, TimeoutException { config.setAddresses(Collections.singletonList(rabbitMQ.host() + ":" + rabbitMQ.port())); config.setQueue("test"); - moduleManager = setupModuleManager(); + moduleManager = setupModuleManager(forward); final ZipkinRabbitMQProvider provider = new ZipkinRabbitMQProvider(); provider.setManager(moduleManager); @@ -88,7 +86,6 @@ public void setup() throws ModuleStartException, IOException, TimeoutException { exceptedSpans.add(invocationOnMock.getArgument(0, ArrayList.class)); return null; }).when(forward).send(any()); - Whitebox.setInternalState(provider, SpanForward.class, forward); provider.notifyAfterCompleted(); ConnectionFactory factory = new ConnectionFactory(); @@ -121,21 +118,20 @@ void produceSpans(byte[] spans, String queue) throws Exception { } } - private ModuleManager setupModuleManager() { + private ModuleManager setupModuleManager(SpanForward forward) { ModuleManager moduleManager = Mockito.mock(ModuleManager.class); - CoreModule coreModule = Mockito.spy(CoreModule.class); - CoreModuleProvider moduleProvider = Mockito.mock(CoreModuleProvider.class); - Whitebox.setInternalState(coreModule, "loadedProvider", moduleProvider); - Mockito.when(moduleManager.find(CoreModule.NAME)).thenReturn(coreModule); + final ZipkinReceiverModule zipkinReceiverModule = Mockito.spy(ZipkinReceiverModule.class); + final ZipkinReceiverCoreProvider receiverProvider = Mockito.mock(ZipkinReceiverCoreProvider.class); + Whitebox.setInternalState(zipkinReceiverModule, "loadedProvider", receiverProvider); + Mockito.when(moduleManager.find(ZipkinReceiverModule.NAME)).thenReturn(zipkinReceiverModule); + Mockito.when(zipkinReceiverModule.provider().getService(SpanForwardService.class)).thenReturn(forward); TelemetryModule telemetryModule = Mockito.spy(TelemetryModule.class); NoneTelemetryProvider noneTelemetryProvider = Mockito.mock(NoneTelemetryProvider.class); Whitebox.setInternalState(telemetryModule, "loadedProvider", noneTelemetryProvider); Mockito.when(moduleManager.find(TelemetryModule.NAME)).thenReturn(telemetryModule); - Mockito.when(moduleProvider.getService(ConfigService.class)) - .thenReturn(new ZipkinConfigService(new CoreModuleConfig(), moduleProvider)); Mockito.when(noneTelemetryProvider.getService(MetricsCreator.class)) .thenReturn(new MetricsCreatorNoop()); diff --git a/zipkin-server/receiver-zipkin-scribe/pom.xml b/zipkin-server/receiver-zipkin-scribe/pom.xml index fca7e3786e1..8c6422c9db8 100644 --- a/zipkin-server/receiver-zipkin-scribe/pom.xml +++ b/zipkin-server/receiver-zipkin-scribe/pom.xml @@ -15,16 +15,10 @@ io.zipkin - zipkin-server-core + receiver-zipkin-core ${project.version} - - org.apache.skywalking - zipkin-receiver-plugin - ${skywalking.version} - - ${armeria.groupId} armeria-thrift0.15 diff --git a/zipkin-server/receiver-zipkin-scribe/src/main/java/zipkin/server/receiver/zipkin/scribe/ScribeSpanConsumer.java b/zipkin-server/receiver-zipkin-scribe/src/main/java/zipkin/server/receiver/zipkin/scribe/ScribeSpanConsumer.java index 74fdf6d3096..ae7c9c77b5c 100644 --- a/zipkin-server/receiver-zipkin-scribe/src/main/java/zipkin/server/receiver/zipkin/scribe/ScribeSpanConsumer.java +++ b/zipkin-server/receiver-zipkin-scribe/src/main/java/zipkin/server/receiver/zipkin/scribe/ScribeSpanConsumer.java @@ -14,7 +14,7 @@ package zipkin.server.receiver.zipkin.scribe; import org.apache.skywalking.oap.server.library.module.ModuleManager; -import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward; +import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService; import org.apache.skywalking.oap.server.telemetry.TelemetryModule; import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics; import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator; @@ -32,12 +32,12 @@ import java.util.List; final class ScribeSpanConsumer implements Scribe.AsyncIface { - private final SpanForward spanForward; + private final SpanForwardService spanForward; private final String category; private final HistogramMetrics histogram; - ScribeSpanConsumer(SpanForward spanForward, String category, ModuleManager moduleManager) { + ScribeSpanConsumer(SpanForwardService spanForward, String category, ModuleManager moduleManager) { this.spanForward = spanForward; this.category = category; diff --git a/zipkin-server/receiver-zipkin-scribe/src/main/java/zipkin/server/receiver/zipkin/scribe/ZipkinScribeProvider.java b/zipkin-server/receiver-zipkin-scribe/src/main/java/zipkin/server/receiver/zipkin/scribe/ZipkinScribeProvider.java index 89e462c7216..55fc216bd05 100644 --- a/zipkin-server/receiver-zipkin-scribe/src/main/java/zipkin/server/receiver/zipkin/scribe/ZipkinScribeProvider.java +++ b/zipkin-server/receiver-zipkin-scribe/src/main/java/zipkin/server/receiver/zipkin/scribe/ZipkinScribeProvider.java @@ -14,19 +14,17 @@ package zipkin.server.receiver.zipkin.scribe; -import org.apache.skywalking.oap.server.core.CoreModule; -import org.apache.skywalking.oap.server.core.config.ConfigService; import org.apache.skywalking.oap.server.library.module.ModuleConfig; import org.apache.skywalking.oap.server.library.module.ModuleDefine; import org.apache.skywalking.oap.server.library.module.ModuleProvider; import org.apache.skywalking.oap.server.library.module.ModuleStartException; import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException; -import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward; -import zipkin.server.core.services.ZipkinConfigService; +import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService; +import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverModule; public class ZipkinScribeProvider extends ModuleProvider { private ZipkinScribeConfig moduleConfig; - private SpanForward spanForward; + private SpanForwardService spanForward; @Override public String name() { @@ -59,8 +57,7 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException { @Override public void start() throws ServiceNotProvidedException, ModuleStartException { - final ConfigService service = getManager().find(CoreModule.NAME).provider().getService(ConfigService.class); - this.spanForward = new SpanForward(((ZipkinConfigService)service).toZipkinReceiverConfig(), getManager()); + this.spanForward = getManager().find(ZipkinReceiverModule.NAME).provider().getService(SpanForwardService.class); } @Override diff --git a/zipkin-server/server-core/src/main/java/zipkin/server/core/services/ZipkinConfigService.java b/zipkin-server/server-core/src/main/java/zipkin/server/core/services/ZipkinConfigService.java index 2c1403b5da4..7cfdc8aa3de 100644 --- a/zipkin-server/server-core/src/main/java/zipkin/server/core/services/ZipkinConfigService.java +++ b/zipkin-server/server-core/src/main/java/zipkin/server/core/services/ZipkinConfigService.java @@ -30,10 +30,4 @@ public boolean getSearchEnable() { return moduleConfig.getSearchEnable(); } - public ZipkinReceiverConfig toZipkinReceiverConfig() { - final ZipkinReceiverConfig config = new ZipkinReceiverConfig(); - config.setSearchableTracesTags(moduleConfig.getSearchableTracesTags()); - config.setSampleRate((int) (moduleConfig.getTraceSampleRate() * 10000)); - return config; - } } diff --git a/zipkin-server/server-starter/pom.xml b/zipkin-server/server-starter/pom.xml index 23abdceb191..0f4baa32c62 100644 --- a/zipkin-server/server-starter/pom.xml +++ b/zipkin-server/server-starter/pom.xml @@ -84,6 +84,11 @@ + + io.zipkin + receiver-zipkin-core + ${project.version} + io.zipkin receiver-zipkin-http @@ -109,6 +114,11 @@ receiver-zipkin-scribe ${project.version} + + io.zipkin + receiver-otlp-trace + ${project.version} + diff --git a/zipkin-server/server-starter/src/main/resources/application.yml b/zipkin-server/server-starter/src/main/resources/application.yml index d855028ec28..8a92ff64a05 100644 --- a/zipkin-server/server-starter/src/main/resources/application.yml +++ b/zipkin-server/server-starter/src/main/resources/application.yml @@ -29,11 +29,6 @@ core: l1FlushPeriod: ${ZIPKIN_CORE_L1_AGGREGATION_FLUSH_PERIOD:500} # The threshold of session time. Unit is ms. Default value is 70s. storageSessionTimeout: ${ZIPKIN_CORE_STORAGE_SESSION_TIMEOUT:70000} - # Defines a set of span tag keys which are searchable. - # The max length of key=value should be less than 256 or will be dropped. - searchableTracesTags: ${ZIPKIN_SEARCHABLE_TAG_KEYS:http.method} - # The trace sample rate precision is 0.0001, should be between 0 and 1 - traceSampleRate: ${ZIPKIN_SAMPLE_RATE:1} # The number of threads used to prepare metrics data to the storage. prepareThreads: ${ZIPKIN_PREPARE_THREADS:2} # The period of doing data persistence. Unit is second.Default value is 25s @@ -168,6 +163,15 @@ storage: &storage zipkin-dependency-storage-ext: *storage +receiver-zipkin: + selector: zipkin + zipkin: + # Defines a set of span tag keys which are searchable. + # The max length of key=value should be less than 256 or will be dropped. + searchableTracesTags: ${ZIPKIN_SEARCHABLE_TAG_KEYS:http.method} + # The trace sample rate precision is 0.0001, should be between 0 and 1 + traceSampleRate: ${ZIPKIN_SAMPLE_RATE:1} + receiver-zipkin-http: selector: ${ZIPKIN_RECEIVER_ZIPKIN_HTTP:default} default: @@ -228,6 +232,20 @@ receiver-zipkin-scribe: category: ${ZIPKIN_SCRIBE_CATEGORY:zipkin} port: ${ZIPKIN_COLLECTOR_PORT:9410} +receiver-otel: + selector: ${SW_OTEL_RECEIVER:zipkin} + zipkin: + gRPCHost: ${ZIPKIN_OTEL_GRPC_HOST:0.0.0.0} + gRPCPort: ${ZIPKIN_OTEL_GRPC_PORT:0} + gRPCThreadPoolQueueSize: ${ZIPKIN_OTEL_GRPC_POOL_QUEUE_SIZE:-1} + gRPCThreadPoolSize: ${ZIPKIN_OTEL_GRPC_THREAD_POOL_SIZE:-1} + gRPCSslEnabled: ${ZIPKIN_OTEL_GRPC_SSL_ENABLED:false} + gRPCSslKeyPath: ${ZIPKIN_OTEL_GRPC_SSL_KEY_PATH:""} + gRPCSslCertChainPath: ${ZIPKIN_OTEL_GRPC_SSL_CERT_CHAIN_PATH:""} + gRPCSslTrustedCAPath: ${ZIPKIN_OTEL_GRPC_SSL_TRUSTED_CA_PATH:""} + gRPCMaxConcurrentCallsPerConnection: ${ZIPKIN_OTEL_GRPC_MAX_CONCURRENT_CALL:0} + gRPCMaxMessageSize: ${ZIPKIN_OTEL_GRPC_MAX_MESSAGE_SIZE:0} + ## This module is for Zipkin query API and support zipkin-lens UI query-zipkin: selector: ${ZIPKIN_QUERY_ZIPKIN:zipkin} diff --git a/zipkin-server/storage-cassandra/pom.xml b/zipkin-server/storage-cassandra/pom.xml index 3c96072f802..e7c53188b5f 100644 --- a/zipkin-server/storage-cassandra/pom.xml +++ b/zipkin-server/storage-cassandra/pom.xml @@ -16,7 +16,7 @@ io.zipkin - zipkin-server-core + receiver-zipkin-core ${project.version} diff --git a/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/CassandraTagAutocompleteDAO.java b/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/CassandraTagAutocompleteDAO.java index f4d761e6dd7..acf9e414edb 100644 --- a/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/CassandraTagAutocompleteDAO.java +++ b/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/CassandraTagAutocompleteDAO.java @@ -14,22 +14,20 @@ package zipkin.server.storage.cassandra.dao; -import org.apache.skywalking.oap.server.core.Const; -import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.TagAutocompleteData; import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.TagType; -import org.apache.skywalking.oap.server.core.config.ConfigService; import org.apache.skywalking.oap.server.core.query.input.Duration; import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService; +import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverModule; import org.apache.skywalking.oap.server.storage.plugin.jdbc.common.dao.JDBCTagAutoCompleteQueryDAO; -import zipkin.server.core.services.ZipkinConfigService; +import zipkin.server.receiver.zipkin.core.SpanForwardCore; import zipkin.server.storage.cassandra.CassandraClient; import zipkin.server.storage.cassandra.CassandraTableHelper; import java.util.HashSet; import java.util.Set; import java.util.stream.Collectors; -import java.util.stream.Stream; import static java.util.Objects.nonNull; @@ -51,9 +49,8 @@ private Set getTagAutocompleteKeys() { if (tagAutocompleteKeys != null) { return tagAutocompleteKeys; } - final ConfigService service = moduleManager.find(CoreModule.NAME).provider().getService(ConfigService.class); - tagAutocompleteKeys = Stream.of((((ZipkinConfigService) service).toZipkinReceiverConfig().getSearchableTracesTags()) - .split(Const.COMMA)).collect(Collectors.toSet()); + final SpanForwardService service = moduleManager.find(ZipkinReceiverModule.NAME).provider().getService(SpanForwardService.class); + tagAutocompleteKeys = ((SpanForwardCore) service).getTagAutocompleteKeys(); return tagAutocompleteKeys; }