diff --git a/zipkin-server/pom.xml b/zipkin-server/pom.xml index f2bd95d2cf2..37d1099a76f 100644 --- a/zipkin-server/pom.xml +++ b/zipkin-server/pom.xml @@ -46,6 +46,8 @@ ../skywalking/oap-server/server-core ../skywalking/oap-server/server-receiver-plugin/receiver-proto ../skywalking/oap-server/server-receiver-plugin/zipkin-receiver-plugin + ../skywalking/oap-server/server-receiver-plugin/otel-receiver-plugin + ../skywalking/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin ../skywalking/oap-server/server-cluster-plugin/cluster-standalone-plugin ../skywalking/oap-server/server-cluster-plugin/cluster-consul-plugin ../skywalking/oap-server/server-cluster-plugin/cluster-etcd-plugin @@ -59,6 +61,9 @@ ../skywalking/oap-server/server-testing ../skywalking/oap-server/server-configuration/configuration-api ../skywalking/oap-server/ai-pipeline + ../skywalking/oap-server/analyzer/meter-analyzer + ../skywalking/oap-server/analyzer/log-analyzer + ../skywalking/oap-server/analyzer/agent-analyzer server-core server-starter @@ -73,6 +78,8 @@ telemetry-zipkin zipkin-dependency zipkin-storage-ext + receiver-zipkin-core + receiver-otlp-trace diff --git a/zipkin-server/receiver-otlp-trace/pom.xml b/zipkin-server/receiver-otlp-trace/pom.xml new file mode 100644 index 00000000000..02219d319d9 --- /dev/null +++ b/zipkin-server/receiver-otlp-trace/pom.xml @@ -0,0 +1,37 @@ + + + 4.0.0 + + zipkin-server-parent + io.zipkin + 2.24.4-SNAPSHOT + + + receiver-otlp-trace + OTLP Trace Receiver + + + + org.apache.skywalking + otel-receiver-plugin + ${skywalking.version} + + + org.apache.skywalking + skywalking-sharing-server-plugin + + + org.apache.skywalking + meter-analyzer + + + org.apache.skywalking + log-analyzer + + + + + + \ No newline at end of file diff --git a/zipkin-server/receiver-otlp-trace/src/main/java/zipkin/server/receiver/otlp/OTLPTraceConfig.java b/zipkin-server/receiver-otlp-trace/src/main/java/zipkin/server/receiver/otlp/OTLPTraceConfig.java new file mode 100644 index 00000000000..158c5591c1b --- /dev/null +++ b/zipkin-server/receiver-otlp-trace/src/main/java/zipkin/server/receiver/otlp/OTLPTraceConfig.java @@ -0,0 +1,20 @@ +/* + * Copyright 2015-2023 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package zipkin.server.receiver.otlp; + +import org.apache.skywalking.oap.server.library.module.ModuleConfig; + +public class OTLPTraceConfig extends ModuleConfig { +} diff --git a/zipkin-server/receiver-otlp-trace/src/main/java/zipkin/server/receiver/otlp/OTLPTraceProvider.java b/zipkin-server/receiver-otlp-trace/src/main/java/zipkin/server/receiver/otlp/OTLPTraceProvider.java new file mode 100644 index 00000000000..5028828a338 --- /dev/null +++ b/zipkin-server/receiver-otlp-trace/src/main/java/zipkin/server/receiver/otlp/OTLPTraceProvider.java @@ -0,0 +1,81 @@ +/* + * Copyright 2015-2023 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package zipkin.server.receiver.otlp; + +import org.apache.skywalking.oap.server.core.CoreModule; +import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister; +import org.apache.skywalking.oap.server.library.module.ModuleConfig; +import org.apache.skywalking.oap.server.library.module.ModuleDefine; +import org.apache.skywalking.oap.server.library.module.ModuleProvider; +import org.apache.skywalking.oap.server.library.module.ModuleStartException; +import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException; +import org.apache.skywalking.oap.server.receiver.otel.OtelMetricReceiverConfig; +import org.apache.skywalking.oap.server.receiver.otel.OtelMetricReceiverModule; +import org.apache.skywalking.oap.server.receiver.otel.otlp.OpenTelemetryMetricRequestProcessor; +import org.apache.skywalking.oap.server.receiver.otel.otlp.OpenTelemetryTraceHandler; +import zipkin.server.receiver.otlp.handler.OTLPTraceHandler; + +public class OTLPTraceProvider extends ModuleProvider { + private OTLPTraceConfig moduleConfig; + private OpenTelemetryTraceHandler traceHandler; + + @Override + public String name() { + return "zipkin"; + } + + @Override + public Class module() { + return OtelMetricReceiverModule.class; + } + + @Override + public ConfigCreator newConfigCreator() { + return new ConfigCreator() { + + @Override + public Class type() { + return OTLPTraceConfig.class; + } + + @Override + public void onInitialized(OTLPTraceConfig initialized) { + moduleConfig = initialized; + } + }; + } + + @Override + public void prepare() throws ServiceNotProvidedException, ModuleStartException { + this.registerServiceImplementation(OpenTelemetryMetricRequestProcessor.class, + new OpenTelemetryMetricRequestProcessor(getManager(), new OtelMetricReceiverConfig())); + } + + @Override + public void start() throws ServiceNotProvidedException, ModuleStartException { + GRPCHandlerRegister handlerRegister = getManager().find(CoreModule.NAME).provider().getService(GRPCHandlerRegister.class); + traceHandler = new OTLPTraceHandler(handlerRegister, getManager()); + traceHandler.active(); + } + + @Override + public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException { + } + + @Override + public String[] requiredModules() { + return new String[0]; + } +} diff --git a/zipkin-server/receiver-otlp-trace/src/main/java/zipkin/server/receiver/otlp/handler/OTLPTraceHandler.java b/zipkin-server/receiver-otlp-trace/src/main/java/zipkin/server/receiver/otlp/handler/OTLPTraceHandler.java new file mode 100644 index 00000000000..ff5e3e8de9f --- /dev/null +++ b/zipkin-server/receiver-otlp-trace/src/main/java/zipkin/server/receiver/otlp/handler/OTLPTraceHandler.java @@ -0,0 +1,33 @@ +/* + * Copyright 2015-2023 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package zipkin.server.receiver.otlp.handler; + +import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister; +import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.library.module.ModuleStartException; +import org.apache.skywalking.oap.server.receiver.otel.otlp.OpenTelemetryTraceHandler; + +public class OTLPTraceHandler extends OpenTelemetryTraceHandler { + private final GRPCHandlerRegister register; + public OTLPTraceHandler(GRPCHandlerRegister register, ModuleManager manager) { + super(manager); + this.register = register; + } + + @Override + public void active() throws ModuleStartException { + register.addHandler(this); + } +} diff --git a/zipkin-server/receiver-otlp-trace/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider b/zipkin-server/receiver-otlp-trace/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider new file mode 100755 index 00000000000..cff78c2087a --- /dev/null +++ b/zipkin-server/receiver-otlp-trace/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# + +zipkin.server.receiver.otlp.OTLPTraceProvider \ No newline at end of file diff --git a/zipkin-server/receiver-zipkin-activemq/pom.xml b/zipkin-server/receiver-zipkin-activemq/pom.xml index b400b6ae2a5..2dda22ee0ea 100644 --- a/zipkin-server/receiver-zipkin-activemq/pom.xml +++ b/zipkin-server/receiver-zipkin-activemq/pom.xml @@ -19,16 +19,10 @@ io.zipkin - zipkin-server-core + receiver-zipkin-core ${project.version} - - org.apache.skywalking - zipkin-receiver-plugin - ${skywalking.version} - - org.apache.activemq activemq-client diff --git a/zipkin-server/receiver-zipkin-activemq/src/main/java/zipkin/server/receiver/zipkin/activemq/ActiveMQHandler.java b/zipkin-server/receiver-zipkin-activemq/src/main/java/zipkin/server/receiver/zipkin/activemq/ActiveMQHandler.java index d72de0d9c9a..dea971ecaea 100644 --- a/zipkin-server/receiver-zipkin-activemq/src/main/java/zipkin/server/receiver/zipkin/activemq/ActiveMQHandler.java +++ b/zipkin-server/receiver-zipkin-activemq/src/main/java/zipkin/server/receiver/zipkin/activemq/ActiveMQHandler.java @@ -20,7 +20,7 @@ import org.apache.activemq.transport.TransportListener; import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.util.StringUtil; -import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward; +import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService; import org.apache.skywalking.oap.server.telemetry.TelemetryModule; import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics; import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics; @@ -54,7 +54,7 @@ public class ActiveMQHandler implements TransportListener, MessageListener, Clos private static final Logger log = LoggerFactory.getLogger(ActiveMQHandler.class.getName()); private final ZipkinActiveMQConfig config; - private final SpanForward spanForward; + private final SpanForwardService spanForward; private final ActiveMQConnectionFactory connectionFactory; static final CheckResult @@ -70,11 +70,11 @@ public class ActiveMQHandler implements TransportListener, MessageListener, Clos volatile CheckResult checkResult = CheckResult.OK; - public ActiveMQHandler(ZipkinActiveMQConfig config, SpanForward spanForward, ModuleManager moduleManager) { + public ActiveMQHandler(ZipkinActiveMQConfig config, SpanForwardService spanForward, ModuleManager moduleManager) { this(config, createConnectionFactory(config), spanForward, moduleManager); } - public ActiveMQHandler(ZipkinActiveMQConfig config, ActiveMQConnectionFactory connectionFactory, SpanForward spanForward, ModuleManager moduleManager) { + public ActiveMQHandler(ZipkinActiveMQConfig config, ActiveMQConnectionFactory connectionFactory, SpanForwardService spanForward, ModuleManager moduleManager) { this.config = config; this.spanForward = spanForward; this.connectionFactory = connectionFactory; diff --git a/zipkin-server/receiver-zipkin-activemq/src/main/java/zipkin/server/receiver/zipkin/activemq/ZipkinActiveMQProvider.java b/zipkin-server/receiver-zipkin-activemq/src/main/java/zipkin/server/receiver/zipkin/activemq/ZipkinActiveMQProvider.java index d2430e0695d..2338872f6b4 100644 --- a/zipkin-server/receiver-zipkin-activemq/src/main/java/zipkin/server/receiver/zipkin/activemq/ZipkinActiveMQProvider.java +++ b/zipkin-server/receiver-zipkin-activemq/src/main/java/zipkin/server/receiver/zipkin/activemq/ZipkinActiveMQProvider.java @@ -15,18 +15,17 @@ package zipkin.server.receiver.zipkin.activemq; import org.apache.skywalking.oap.server.core.CoreModule; -import org.apache.skywalking.oap.server.core.config.ConfigService; import org.apache.skywalking.oap.server.library.module.ModuleConfig; import org.apache.skywalking.oap.server.library.module.ModuleDefine; import org.apache.skywalking.oap.server.library.module.ModuleProvider; import org.apache.skywalking.oap.server.library.module.ModuleStartException; import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException; -import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward; -import zipkin.server.core.services.ZipkinConfigService; +import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService; +import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverModule; public class ZipkinActiveMQProvider extends ModuleProvider { private ZipkinActiveMQConfig moduleConfig; - private SpanForward spanForward; + private SpanForwardService spanForward; @Override public String name() { @@ -59,8 +58,7 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException { @Override public void start() throws ServiceNotProvidedException, ModuleStartException { - final ConfigService service = getManager().find(CoreModule.NAME).provider().getService(ConfigService.class); - this.spanForward = new SpanForward(((ZipkinConfigService)service).toZipkinReceiverConfig(), getManager()); + this.spanForward = getManager().find(ZipkinReceiverModule.NAME).provider().getService(SpanForwardService.class); } @Override diff --git a/zipkin-server/receiver-zipkin-core/pom.xml b/zipkin-server/receiver-zipkin-core/pom.xml new file mode 100644 index 00000000000..00fed3985fd --- /dev/null +++ b/zipkin-server/receiver-zipkin-core/pom.xml @@ -0,0 +1,29 @@ + + + 4.0.0 + + zipkin-server-parent + io.zipkin + 2.24.4-SNAPSHOT + + + receiver-zipkin-core + Zipkin Receiver Core + + + + io.zipkin + zipkin-server-core + ${project.version} + + + + org.apache.skywalking + zipkin-receiver-plugin + ${skywalking.version} + + + + \ No newline at end of file diff --git a/zipkin-server/receiver-zipkin-core/src/main/java/zipkin/server/receiver/zipkin/core/SpanForwardCore.java b/zipkin-server/receiver-zipkin-core/src/main/java/zipkin/server/receiver/zipkin/core/SpanForwardCore.java new file mode 100644 index 00000000000..b688046840d --- /dev/null +++ b/zipkin-server/receiver-zipkin-core/src/main/java/zipkin/server/receiver/zipkin/core/SpanForwardCore.java @@ -0,0 +1,36 @@ +/* + * Copyright 2015-2023 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package zipkin.server.receiver.zipkin.core; + +import org.apache.skywalking.oap.server.core.Const; +import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig; +import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +public class SpanForwardCore extends SpanForward { + private final ZipkinReceiverConfig config; + public SpanForwardCore(ZipkinReceiverConfig config, ModuleManager manager) { + super(config, manager); + this.config = config; + } + + public Set getTagAutocompleteKeys() { + return new HashSet<>(Arrays.asList(config.getSearchableTracesTags().split(Const.COMMA))); + } +} diff --git a/zipkin-server/receiver-zipkin-core/src/main/java/zipkin/server/receiver/zipkin/core/ZipkinReceiverCoreConfig.java b/zipkin-server/receiver-zipkin-core/src/main/java/zipkin/server/receiver/zipkin/core/ZipkinReceiverCoreConfig.java new file mode 100644 index 00000000000..acb35146e01 --- /dev/null +++ b/zipkin-server/receiver-zipkin-core/src/main/java/zipkin/server/receiver/zipkin/core/ZipkinReceiverCoreConfig.java @@ -0,0 +1,53 @@ +/* + * Copyright 2015-2023 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package zipkin.server.receiver.zipkin.core; + +import org.apache.skywalking.oap.server.core.Const; +import org.apache.skywalking.oap.server.library.module.ModuleConfig; + +public class ZipkinReceiverCoreConfig extends ModuleConfig { + + /** + * The trace sample rate precision is 0.0001, should be between 0 and 1. + */ + private double traceSampleRate = 1.0f; + + /** + * Defines a set of span tag keys which are searchable. + * The max length of key=value should be less than 256 or will be dropped. + */ + private String searchableTracesTags = DEFAULT_SEARCHABLE_TAG_KEYS; + + private static final String DEFAULT_SEARCHABLE_TAG_KEYS = String.join( + Const.COMMA, + "http.method" + ); + + public double getTraceSampleRate() { + return traceSampleRate; + } + + public void setTraceSampleRate(double traceSampleRate) { + this.traceSampleRate = traceSampleRate; + } + + public String getSearchableTracesTags() { + return searchableTracesTags; + } + + public void setSearchableTracesTags(String searchableTracesTags) { + this.searchableTracesTags = searchableTracesTags; + } +} diff --git a/zipkin-server/receiver-zipkin-core/src/main/java/zipkin/server/receiver/zipkin/core/ZipkinReceiverCoreProvider.java b/zipkin-server/receiver-zipkin-core/src/main/java/zipkin/server/receiver/zipkin/core/ZipkinReceiverCoreProvider.java new file mode 100644 index 00000000000..95082236b23 --- /dev/null +++ b/zipkin-server/receiver-zipkin-core/src/main/java/zipkin/server/receiver/zipkin/core/ZipkinReceiverCoreProvider.java @@ -0,0 +1,76 @@ +/* + * Copyright 2015-2023 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package zipkin.server.receiver.zipkin.core; + +import org.apache.skywalking.oap.server.library.module.ModuleConfig; +import org.apache.skywalking.oap.server.library.module.ModuleDefine; +import org.apache.skywalking.oap.server.library.module.ModuleProvider; +import org.apache.skywalking.oap.server.library.module.ModuleStartException; +import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException; +import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService; +import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig; +import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverModule; + +public class ZipkinReceiverCoreProvider extends ModuleProvider { + private ZipkinReceiverCoreConfig moduleConfig; + + @Override + public String name() { + return "zipkin"; + } + + @Override + public Class module() { + return ZipkinReceiverModule.class; + } + + @Override + public ConfigCreator newConfigCreator() { + return new ConfigCreator() { + + @Override + public Class type() { + return ZipkinReceiverCoreConfig.class; + } + + @Override + public void onInitialized(ZipkinReceiverCoreConfig initialized) { + moduleConfig = initialized; + } + }; + } + + @Override + public void prepare() throws ServiceNotProvidedException, ModuleStartException { + final ZipkinReceiverConfig config = new ZipkinReceiverConfig(); + config.setSearchableTracesTags(moduleConfig.getSearchableTracesTags()); + config.setSampleRate((int) (moduleConfig.getTraceSampleRate() * 10000)); + + this.registerServiceImplementation(SpanForwardService.class, new SpanForwardCore(config, getManager())); + } + + @Override + public void start() throws ServiceNotProvidedException, ModuleStartException { + } + + @Override + public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException { + } + + @Override + public String[] requiredModules() { + return new String[0]; + } +} diff --git a/zipkin-server/receiver-zipkin-core/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider b/zipkin-server/receiver-zipkin-core/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider new file mode 100755 index 00000000000..1e28be08cf5 --- /dev/null +++ b/zipkin-server/receiver-zipkin-core/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# + +zipkin.server.receiver.zipkin.core.ZipkinReceiverCoreProvider \ No newline at end of file diff --git a/zipkin-server/receiver-zipkin-http/pom.xml b/zipkin-server/receiver-zipkin-http/pom.xml index 612fab6aeb0..bf32ec7294d 100644 --- a/zipkin-server/receiver-zipkin-http/pom.xml +++ b/zipkin-server/receiver-zipkin-http/pom.xml @@ -15,15 +15,9 @@ io.zipkin - zipkin-server-core + receiver-zipkin-core ${project.version} - - - org.apache.skywalking - zipkin-receiver-plugin - ${skywalking.version} - \ No newline at end of file diff --git a/zipkin-server/receiver-zipkin-http/src/main/java/zipkin/server/receiver/zipkin/http/ZipkinHTTPReceiverConfig.java b/zipkin-server/receiver-zipkin-http/src/main/java/zipkin/server/receiver/zipkin/http/ZipkinHTTPReceiverConfig.java index 4d9ca221525..675f129047f 100644 --- a/zipkin-server/receiver-zipkin-http/src/main/java/zipkin/server/receiver/zipkin/http/ZipkinHTTPReceiverConfig.java +++ b/zipkin-server/receiver-zipkin-http/src/main/java/zipkin/server/receiver/zipkin/http/ZipkinHTTPReceiverConfig.java @@ -17,71 +17,4 @@ import org.apache.skywalking.oap.server.library.module.ModuleConfig; public class ZipkinHTTPReceiverConfig extends ModuleConfig { - private String restHost; - private int restPort; - private String restContextPath; - private int restMaxThreads = 200; - private long restIdleTimeOut = 30000; - private int restAcceptQueueSize = 0; - /** - * The maximum size in bytes allowed for request headers. - * Use -1 to disable it. - */ - private int restMaxRequestHeaderSize = 8192; - - public String getRestHost() { - return restHost; - } - - public void setRestHost(String restHost) { - this.restHost = restHost; - } - - public int getRestPort() { - return restPort; - } - - public void setRestPort(int restPort) { - this.restPort = restPort; - } - - public String getRestContextPath() { - return restContextPath; - } - - public void setRestContextPath(String restContextPath) { - this.restContextPath = restContextPath; - } - - public int getRestMaxThreads() { - return restMaxThreads; - } - - public void setRestMaxThreads(int restMaxThreads) { - this.restMaxThreads = restMaxThreads; - } - - public long getRestIdleTimeOut() { - return restIdleTimeOut; - } - - public void setRestIdleTimeOut(long restIdleTimeOut) { - this.restIdleTimeOut = restIdleTimeOut; - } - - public int getRestAcceptQueueSize() { - return restAcceptQueueSize; - } - - public void setRestAcceptQueueSize(int restAcceptQueueSize) { - this.restAcceptQueueSize = restAcceptQueueSize; - } - - public int getRestMaxRequestHeaderSize() { - return restMaxRequestHeaderSize; - } - - public void setRestMaxRequestHeaderSize(int restMaxRequestHeaderSize) { - this.restMaxRequestHeaderSize = restMaxRequestHeaderSize; - } } diff --git a/zipkin-server/receiver-zipkin-http/src/main/java/zipkin/server/receiver/zipkin/http/ZipkinHTTPReceiverProvider.java b/zipkin-server/receiver-zipkin-http/src/main/java/zipkin/server/receiver/zipkin/http/ZipkinHTTPReceiverProvider.java index c9a3e4cabaa..a9f7985bf93 100644 --- a/zipkin-server/receiver-zipkin-http/src/main/java/zipkin/server/receiver/zipkin/http/ZipkinHTTPReceiverProvider.java +++ b/zipkin-server/receiver-zipkin-http/src/main/java/zipkin/server/receiver/zipkin/http/ZipkinHTTPReceiverProvider.java @@ -16,26 +16,22 @@ import com.linecorp.armeria.common.HttpMethod; import org.apache.skywalking.oap.server.core.CoreModule; -import org.apache.skywalking.oap.server.core.config.ConfigService; import org.apache.skywalking.oap.server.core.server.HTTPHandlerRegister; import org.apache.skywalking.oap.server.library.module.ModuleConfig; import org.apache.skywalking.oap.server.library.module.ModuleDefine; import org.apache.skywalking.oap.server.library.module.ModuleProvider; import org.apache.skywalking.oap.server.library.module.ModuleStartException; import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException; -import org.apache.skywalking.oap.server.library.server.http.HTTPServer; -import org.apache.skywalking.oap.server.library.server.http.HTTPServerConfig; +import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService; +import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverModule; import org.apache.skywalking.oap.server.receiver.zipkin.handler.ZipkinSpanHTTPHandler; import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward; -import zipkin.server.core.services.HTTPConfigurableServer; -import zipkin.server.core.services.ZipkinConfigService; import java.util.Arrays; public class ZipkinHTTPReceiverProvider extends ModuleProvider { private ZipkinHTTPReceiverConfig moduleConfig; private ZipkinSpanHTTPHandler httpHandler; - private HTTPServer httpServer; @Override public String name() { @@ -64,40 +60,19 @@ public void onInitialized(ZipkinHTTPReceiverConfig initialized) { @Override public void prepare() throws ServiceNotProvidedException, ModuleStartException { - if (moduleConfig.getRestPort() > 0) { - HTTPServerConfig httpServerConfig = HTTPServerConfig.builder() - .host(moduleConfig.getRestHost()) - .port(moduleConfig.getRestPort()) - .contextPath(moduleConfig.getRestContextPath()) - .idleTimeOut(moduleConfig.getRestIdleTimeOut()) - .maxThreads(moduleConfig.getRestMaxThreads()) - .acceptQueueSize(moduleConfig.getRestAcceptQueueSize()) - .maxRequestHeaderSize(moduleConfig.getRestMaxRequestHeaderSize()) - .build(); - httpServer = new HTTPConfigurableServer(httpServerConfig); - httpServer.initialize(); - } } @Override public void start() throws ServiceNotProvidedException, ModuleStartException { - final ConfigService service = getManager().find(CoreModule.NAME).provider().getService(ConfigService.class); - final SpanForward spanForward = new SpanForward(((ZipkinConfigService)service).toZipkinReceiverConfig(), getManager()); - httpHandler = new ZipkinSpanHTTPHandler(spanForward, getManager()); + final SpanForwardService spanForward = getManager().find(ZipkinReceiverModule.NAME).provider().getService(SpanForwardService.class); + httpHandler = new ZipkinSpanHTTPHandler(((SpanForward) spanForward), getManager()); - if (httpServer != null) { - httpServer.addHandler(httpHandler, Arrays.asList(HttpMethod.POST, HttpMethod.GET)); - } else { - final HTTPHandlerRegister httpRegister = getManager().find(CoreModule.NAME).provider().getService(HTTPHandlerRegister.class); - httpRegister.addHandler(httpHandler, Arrays.asList(HttpMethod.POST, HttpMethod.GET)); - } + final HTTPHandlerRegister httpRegister = getManager().find(CoreModule.NAME).provider().getService(HTTPHandlerRegister.class); + httpRegister.addHandler(httpHandler, Arrays.asList(HttpMethod.POST, HttpMethod.GET)); } @Override public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException { - if (httpServer != null) { - httpServer.start(); - } } @Override @@ -106,8 +81,4 @@ public String[] requiredModules() { CoreModule.NAME, }; } - - public ZipkinSpanHTTPHandler getHttpHandler() { - return httpHandler; - } } diff --git a/zipkin-server/receiver-zipkin-http/src/test/java/zipkin/server/receiver/zipkin/http/ITHTTPReceiver.java b/zipkin-server/receiver-zipkin-http/src/test/java/zipkin/server/receiver/zipkin/http/ITHTTPReceiver.java index 0d72caaee4e..41298e9c411 100644 --- a/zipkin-server/receiver-zipkin-http/src/test/java/zipkin/server/receiver/zipkin/http/ITHTTPReceiver.java +++ b/zipkin-server/receiver-zipkin-http/src/test/java/zipkin/server/receiver/zipkin/http/ITHTTPReceiver.java @@ -16,13 +16,14 @@ import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.core.CoreModuleProvider; -import org.apache.skywalking.oap.server.core.config.ConfigService; import org.apache.skywalking.oap.server.core.server.HTTPHandlerRegister; import org.apache.skywalking.oap.server.core.server.HTTPHandlerRegisterImpl; import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.module.ModuleStartException; import org.apache.skywalking.oap.server.library.server.http.HTTPServer; import org.apache.skywalking.oap.server.library.server.http.HTTPServerConfig; +import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService; +import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverModule; import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward; import org.apache.skywalking.oap.server.telemetry.TelemetryModule; import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator; @@ -37,8 +38,7 @@ import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import org.powermock.reflect.Whitebox; -import zipkin.server.core.CoreModuleConfig; -import zipkin.server.core.services.ZipkinConfigService; +import zipkin.server.receiver.zipkin.core.ZipkinReceiverCoreProvider; import zipkin2.Span; import zipkin2.TestObjects; import zipkin2.codec.SpanBytesEncoder; @@ -72,12 +72,11 @@ public class ITHTTPReceiver { public void setup() throws ModuleStartException { final HTTPServer httpServer = new HTTPServer(HTTPServerConfig.builder().host("0.0.0.0").port(port).contextPath("/").build()); httpServer.initialize(); - moduleManager = setupModuleManager(httpServer); + moduleManager = setupModuleManager(httpServer, forward); final ZipkinHTTPReceiverProvider provider = new ZipkinHTTPReceiverProvider(); provider.setManager(moduleManager); final ZipkinHTTPReceiverConfig config = new ZipkinHTTPReceiverConfig(); - config.setRestPort(-1); Whitebox.setInternalState(provider, ZipkinHTTPReceiverConfig.class, config); provider.prepare(); provider.start(); @@ -86,7 +85,6 @@ public void setup() throws ModuleStartException { spans.add(invocationOnMock.getArgument(0, ArrayList.class)); return null; }).when(forward).send(any()); - Whitebox.setInternalState(provider.getHttpHandler(), SpanForward.class, forward); provider.notifyAfterCompleted(); } @@ -116,7 +114,7 @@ public void test() throws Exception { assertThat(spans.take()).containsExactly(CLIENT_SPAN); } - private ModuleManager setupModuleManager(HTTPServer httpServer) { + private ModuleManager setupModuleManager(HTTPServer httpServer, SpanForward forward) { ModuleManager moduleManager = Mockito.mock(ModuleManager.class); CoreModule coreModule = Mockito.spy(CoreModule.class); @@ -125,13 +123,17 @@ private ModuleManager setupModuleManager(HTTPServer httpServer) { Mockito.when(moduleManager.find(CoreModule.NAME)).thenReturn(coreModule); Mockito.when(coreModule.provider().getService(HTTPHandlerRegister.class)).thenReturn(new HTTPHandlerRegisterImpl(httpServer)); + final ZipkinReceiverModule zipkinReceiverModule = Mockito.spy(ZipkinReceiverModule.class); + final ZipkinReceiverCoreProvider receiverProvider = Mockito.mock(ZipkinReceiverCoreProvider.class); + Whitebox.setInternalState(zipkinReceiverModule, "loadedProvider", receiverProvider); + Mockito.when(moduleManager.find(ZipkinReceiverModule.NAME)).thenReturn(zipkinReceiverModule); + Mockito.when(zipkinReceiverModule.provider().getService(SpanForwardService.class)).thenReturn(forward); + TelemetryModule telemetryModule = Mockito.spy(TelemetryModule.class); NoneTelemetryProvider noneTelemetryProvider = Mockito.mock(NoneTelemetryProvider.class); Whitebox.setInternalState(telemetryModule, "loadedProvider", noneTelemetryProvider); Mockito.when(moduleManager.find(TelemetryModule.NAME)).thenReturn(telemetryModule); - Mockito.when(moduleProvider.getService(ConfigService.class)) - .thenReturn(new ZipkinConfigService(new CoreModuleConfig(), moduleProvider)); Mockito.when(noneTelemetryProvider.getService(MetricsCreator.class)) .thenReturn(new MetricsCreatorNoop()); diff --git a/zipkin-server/receiver-zipkin-kafka/pom.xml b/zipkin-server/receiver-zipkin-kafka/pom.xml index f54ecc6684c..a8968153150 100644 --- a/zipkin-server/receiver-zipkin-kafka/pom.xml +++ b/zipkin-server/receiver-zipkin-kafka/pom.xml @@ -15,15 +15,9 @@ io.zipkin - zipkin-server-core + receiver-zipkin-core ${project.version} - - - org.apache.skywalking - zipkin-receiver-plugin - ${skywalking.version} - \ No newline at end of file diff --git a/zipkin-server/receiver-zipkin-kafka/src/main/java/zipkin/server/receiver/zipkin/kafka/ZipkinKafkaReceiverProvider.java b/zipkin-server/receiver-zipkin-kafka/src/main/java/zipkin/server/receiver/zipkin/kafka/ZipkinKafkaReceiverProvider.java index d20fa1422a9..7b3625ed6ef 100644 --- a/zipkin-server/receiver-zipkin-kafka/src/main/java/zipkin/server/receiver/zipkin/kafka/ZipkinKafkaReceiverProvider.java +++ b/zipkin-server/receiver-zipkin-kafka/src/main/java/zipkin/server/receiver/zipkin/kafka/ZipkinKafkaReceiverProvider.java @@ -15,19 +15,19 @@ package zipkin.server.receiver.zipkin.kafka; import org.apache.skywalking.oap.server.core.CoreModule; -import org.apache.skywalking.oap.server.core.config.ConfigService; import org.apache.skywalking.oap.server.library.module.ModuleConfig; import org.apache.skywalking.oap.server.library.module.ModuleDefine; import org.apache.skywalking.oap.server.library.module.ModuleProvider; import org.apache.skywalking.oap.server.library.module.ModuleStartException; import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException; +import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService; +import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverModule; import org.apache.skywalking.oap.server.receiver.zipkin.kafka.KafkaHandler; import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward; -import zipkin.server.core.services.ZipkinConfigService; public class ZipkinKafkaReceiverProvider extends ModuleProvider { private ZipkinKafkaReceiverConfig moduleConfig; - private SpanForward spanForward; + private SpanForwardService spanForward; private KafkaHandler kafkaHandler; @Override @@ -61,13 +61,12 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException { @Override public void start() throws ServiceNotProvidedException, ModuleStartException { - final ConfigService service = getManager().find(CoreModule.NAME).provider().getService(ConfigService.class); - this.spanForward = new SpanForward(((ZipkinConfigService)service).toZipkinReceiverConfig(), getManager()); + this.spanForward = getManager().find(ZipkinReceiverModule.NAME).provider().getService(SpanForwardService.class); } @Override public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException { - kafkaHandler = new KafkaHandler(moduleConfig.toSkyWalkingConfig(), this.spanForward, getManager()); + kafkaHandler = new KafkaHandler(moduleConfig.toSkyWalkingConfig(), ((SpanForward) this.spanForward), getManager()); kafkaHandler.start(); } diff --git a/zipkin-server/receiver-zipkin-kafka/src/test/java/zipkin/server/receiver/zipkin/kafka/ITKafkaReceiver.java b/zipkin-server/receiver-zipkin-kafka/src/test/java/zipkin/server/receiver/zipkin/kafka/ITKafkaReceiver.java index f6d725163b2..93ae3679228 100644 --- a/zipkin-server/receiver-zipkin-kafka/src/test/java/zipkin/server/receiver/zipkin/kafka/ITKafkaReceiver.java +++ b/zipkin-server/receiver-zipkin-kafka/src/test/java/zipkin/server/receiver/zipkin/kafka/ITKafkaReceiver.java @@ -18,11 +18,10 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.apache.skywalking.oap.server.core.CoreModule; -import org.apache.skywalking.oap.server.core.CoreModuleProvider; -import org.apache.skywalking.oap.server.core.config.ConfigService; import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.module.ModuleStartException; +import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService; +import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverModule; import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward; import org.apache.skywalking.oap.server.telemetry.TelemetryModule; import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator; @@ -39,8 +38,7 @@ import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import org.powermock.reflect.Whitebox; -import zipkin.server.core.CoreModuleConfig; -import zipkin.server.core.services.ZipkinConfigService; +import zipkin.server.receiver.zipkin.core.ZipkinReceiverCoreProvider; import zipkin2.Span; import zipkin2.TestObjects; import zipkin2.codec.SpanBytesEncoder; @@ -80,7 +78,7 @@ public void setup() throws ModuleStartException { config.setKafkaHandlerThreadPoolSize(2); config.setKafkaHandlerThreadPoolQueueSize(100); - moduleManager = setupModuleManager(); + moduleManager = setupModuleManager(forward); final ZipkinKafkaReceiverProvider provider = new ZipkinKafkaReceiverProvider(); provider.setManager(moduleManager); @@ -91,7 +89,6 @@ public void setup() throws ModuleStartException { spans.add(invocationOnMock.getArgument(0, ArrayList.class)); return null; }).when(forward).send(any()); - Whitebox.setInternalState(provider, SpanForward.class, forward); provider.notifyAfterCompleted(); kafka.prepareTopics(config.getKafkaTopic(), 1); @@ -121,21 +118,20 @@ public void tearDown() { } } - private ModuleManager setupModuleManager() { + private ModuleManager setupModuleManager(SpanForward forward) { ModuleManager moduleManager = Mockito.mock(ModuleManager.class); - CoreModule coreModule = Mockito.spy(CoreModule.class); - CoreModuleProvider moduleProvider = Mockito.mock(CoreModuleProvider.class); - Whitebox.setInternalState(coreModule, "loadedProvider", moduleProvider); - Mockito.when(moduleManager.find(CoreModule.NAME)).thenReturn(coreModule); + final ZipkinReceiverModule zipkinReceiverModule = Mockito.spy(ZipkinReceiverModule.class); + final ZipkinReceiverCoreProvider receiverProvider = Mockito.mock(ZipkinReceiverCoreProvider.class); + Whitebox.setInternalState(zipkinReceiverModule, "loadedProvider", receiverProvider); + Mockito.when(moduleManager.find(ZipkinReceiverModule.NAME)).thenReturn(zipkinReceiverModule); + Mockito.when(zipkinReceiverModule.provider().getService(SpanForwardService.class)).thenReturn(forward); TelemetryModule telemetryModule = Mockito.spy(TelemetryModule.class); NoneTelemetryProvider noneTelemetryProvider = Mockito.mock(NoneTelemetryProvider.class); Whitebox.setInternalState(telemetryModule, "loadedProvider", noneTelemetryProvider); Mockito.when(moduleManager.find(TelemetryModule.NAME)).thenReturn(telemetryModule); - Mockito.when(moduleProvider.getService(ConfigService.class)) - .thenReturn(new ZipkinConfigService(new CoreModuleConfig(), moduleProvider)); Mockito.when(noneTelemetryProvider.getService(MetricsCreator.class)) .thenReturn(new MetricsCreatorNoop()); diff --git a/zipkin-server/receiver-zipkin-rabbitmq/pom.xml b/zipkin-server/receiver-zipkin-rabbitmq/pom.xml index 904a4c0c0da..ec2933ee564 100644 --- a/zipkin-server/receiver-zipkin-rabbitmq/pom.xml +++ b/zipkin-server/receiver-zipkin-rabbitmq/pom.xml @@ -19,16 +19,10 @@ io.zipkin - zipkin-server-core + receiver-zipkin-core ${project.version} - - org.apache.skywalking - zipkin-receiver-plugin - ${skywalking.version} - - com.rabbitmq amqp-client diff --git a/zipkin-server/receiver-zipkin-rabbitmq/src/main/java/zipkin/server/receriver/zipkin/rabbitmq/ZipkinRabbitMQHandler.java b/zipkin-server/receiver-zipkin-rabbitmq/src/main/java/zipkin/server/receriver/zipkin/rabbitmq/ZipkinRabbitMQHandler.java index d6448ef54a0..a453c69d2a7 100644 --- a/zipkin-server/receiver-zipkin-rabbitmq/src/main/java/zipkin/server/receriver/zipkin/rabbitmq/ZipkinRabbitMQHandler.java +++ b/zipkin-server/receiver-zipkin-rabbitmq/src/main/java/zipkin/server/receriver/zipkin/rabbitmq/ZipkinRabbitMQHandler.java @@ -24,7 +24,7 @@ import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.util.CollectionUtils; import org.apache.skywalking.oap.server.library.util.StringUtil; -import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward; +import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService; import org.apache.skywalking.oap.server.telemetry.TelemetryModule; import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics; import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator; @@ -45,13 +45,13 @@ public class ZipkinRabbitMQHandler { private final ZipkinRabbitMQConfig config; - private final SpanForward spanForward; + private final SpanForwardService spanForward; private final ConnectionFactory connectionFactory = new ConnectionFactory(); private final HistogramMetrics histogram; - public ZipkinRabbitMQHandler(ZipkinRabbitMQConfig config, SpanForward spanForward, ModuleManager moduleManager) throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException { + public ZipkinRabbitMQHandler(ZipkinRabbitMQConfig config, SpanForwardService spanForward, ModuleManager moduleManager) throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException { this.config = config; this.spanForward = spanForward; diff --git a/zipkin-server/receiver-zipkin-rabbitmq/src/main/java/zipkin/server/receriver/zipkin/rabbitmq/ZipkinRabbitMQProvider.java b/zipkin-server/receiver-zipkin-rabbitmq/src/main/java/zipkin/server/receriver/zipkin/rabbitmq/ZipkinRabbitMQProvider.java index 1852f1413a0..e089e7d0620 100644 --- a/zipkin-server/receiver-zipkin-rabbitmq/src/main/java/zipkin/server/receriver/zipkin/rabbitmq/ZipkinRabbitMQProvider.java +++ b/zipkin-server/receiver-zipkin-rabbitmq/src/main/java/zipkin/server/receriver/zipkin/rabbitmq/ZipkinRabbitMQProvider.java @@ -15,18 +15,17 @@ package zipkin.server.receriver.zipkin.rabbitmq; import org.apache.skywalking.oap.server.core.CoreModule; -import org.apache.skywalking.oap.server.core.config.ConfigService; import org.apache.skywalking.oap.server.library.module.ModuleConfig; import org.apache.skywalking.oap.server.library.module.ModuleDefine; import org.apache.skywalking.oap.server.library.module.ModuleProvider; import org.apache.skywalking.oap.server.library.module.ModuleStartException; import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException; -import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward; -import zipkin.server.core.services.ZipkinConfigService; +import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService; +import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverModule; public class ZipkinRabbitMQProvider extends ModuleProvider { private ZipkinRabbitMQConfig moduleConfig; - private SpanForward spanForward; + private SpanForwardService spanForward; @Override public String name() { @@ -59,8 +58,7 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException { @Override public void start() throws ServiceNotProvidedException, ModuleStartException { - final ConfigService service = getManager().find(CoreModule.NAME).provider().getService(ConfigService.class); - this.spanForward = new SpanForward(((ZipkinConfigService)service).toZipkinReceiverConfig(), getManager()); + this.spanForward = getManager().find(ZipkinReceiverModule.NAME).provider().getService(SpanForwardService.class); } @Override diff --git a/zipkin-server/receiver-zipkin-rabbitmq/src/test/java/zipkin/server/receiver/zipkin/rabbitmq/ITRabbitMQReceiver.java b/zipkin-server/receiver-zipkin-rabbitmq/src/test/java/zipkin/server/receiver/zipkin/rabbitmq/ITRabbitMQReceiver.java index bc9ee4ac6e6..9006f7db5df 100644 --- a/zipkin-server/receiver-zipkin-rabbitmq/src/test/java/zipkin/server/receiver/zipkin/rabbitmq/ITRabbitMQReceiver.java +++ b/zipkin-server/receiver-zipkin-rabbitmq/src/test/java/zipkin/server/receiver/zipkin/rabbitmq/ITRabbitMQReceiver.java @@ -17,11 +17,10 @@ import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; -import org.apache.skywalking.oap.server.core.CoreModule; -import org.apache.skywalking.oap.server.core.CoreModuleProvider; -import org.apache.skywalking.oap.server.core.config.ConfigService; import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.module.ModuleStartException; +import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService; +import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverModule; import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward; import org.apache.skywalking.oap.server.telemetry.TelemetryModule; import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator; @@ -37,8 +36,7 @@ import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import org.powermock.reflect.Whitebox; -import zipkin.server.core.CoreModuleConfig; -import zipkin.server.core.services.ZipkinConfigService; +import zipkin.server.receiver.zipkin.core.ZipkinReceiverCoreProvider; import zipkin.server.receriver.zipkin.rabbitmq.ZipkinRabbitMQConfig; import zipkin.server.receriver.zipkin.rabbitmq.ZipkinRabbitMQProvider; import zipkin2.Span; @@ -77,7 +75,7 @@ public void setup() throws ModuleStartException, IOException, TimeoutException { config.setAddresses(Collections.singletonList(rabbitMQ.host() + ":" + rabbitMQ.port())); config.setQueue("test"); - moduleManager = setupModuleManager(); + moduleManager = setupModuleManager(forward); final ZipkinRabbitMQProvider provider = new ZipkinRabbitMQProvider(); provider.setManager(moduleManager); @@ -88,7 +86,6 @@ public void setup() throws ModuleStartException, IOException, TimeoutException { exceptedSpans.add(invocationOnMock.getArgument(0, ArrayList.class)); return null; }).when(forward).send(any()); - Whitebox.setInternalState(provider, SpanForward.class, forward); provider.notifyAfterCompleted(); ConnectionFactory factory = new ConnectionFactory(); @@ -121,21 +118,20 @@ void produceSpans(byte[] spans, String queue) throws Exception { } } - private ModuleManager setupModuleManager() { + private ModuleManager setupModuleManager(SpanForward forward) { ModuleManager moduleManager = Mockito.mock(ModuleManager.class); - CoreModule coreModule = Mockito.spy(CoreModule.class); - CoreModuleProvider moduleProvider = Mockito.mock(CoreModuleProvider.class); - Whitebox.setInternalState(coreModule, "loadedProvider", moduleProvider); - Mockito.when(moduleManager.find(CoreModule.NAME)).thenReturn(coreModule); + final ZipkinReceiverModule zipkinReceiverModule = Mockito.spy(ZipkinReceiverModule.class); + final ZipkinReceiverCoreProvider receiverProvider = Mockito.mock(ZipkinReceiverCoreProvider.class); + Whitebox.setInternalState(zipkinReceiverModule, "loadedProvider", receiverProvider); + Mockito.when(moduleManager.find(ZipkinReceiverModule.NAME)).thenReturn(zipkinReceiverModule); + Mockito.when(zipkinReceiverModule.provider().getService(SpanForwardService.class)).thenReturn(forward); TelemetryModule telemetryModule = Mockito.spy(TelemetryModule.class); NoneTelemetryProvider noneTelemetryProvider = Mockito.mock(NoneTelemetryProvider.class); Whitebox.setInternalState(telemetryModule, "loadedProvider", noneTelemetryProvider); Mockito.when(moduleManager.find(TelemetryModule.NAME)).thenReturn(telemetryModule); - Mockito.when(moduleProvider.getService(ConfigService.class)) - .thenReturn(new ZipkinConfigService(new CoreModuleConfig(), moduleProvider)); Mockito.when(noneTelemetryProvider.getService(MetricsCreator.class)) .thenReturn(new MetricsCreatorNoop()); diff --git a/zipkin-server/receiver-zipkin-scribe/pom.xml b/zipkin-server/receiver-zipkin-scribe/pom.xml index fca7e3786e1..8c6422c9db8 100644 --- a/zipkin-server/receiver-zipkin-scribe/pom.xml +++ b/zipkin-server/receiver-zipkin-scribe/pom.xml @@ -15,16 +15,10 @@ io.zipkin - zipkin-server-core + receiver-zipkin-core ${project.version} - - org.apache.skywalking - zipkin-receiver-plugin - ${skywalking.version} - - ${armeria.groupId} armeria-thrift0.15 diff --git a/zipkin-server/receiver-zipkin-scribe/src/main/java/zipkin/server/receiver/zipkin/scribe/ScribeSpanConsumer.java b/zipkin-server/receiver-zipkin-scribe/src/main/java/zipkin/server/receiver/zipkin/scribe/ScribeSpanConsumer.java index 74fdf6d3096..ae7c9c77b5c 100644 --- a/zipkin-server/receiver-zipkin-scribe/src/main/java/zipkin/server/receiver/zipkin/scribe/ScribeSpanConsumer.java +++ b/zipkin-server/receiver-zipkin-scribe/src/main/java/zipkin/server/receiver/zipkin/scribe/ScribeSpanConsumer.java @@ -14,7 +14,7 @@ package zipkin.server.receiver.zipkin.scribe; import org.apache.skywalking.oap.server.library.module.ModuleManager; -import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward; +import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService; import org.apache.skywalking.oap.server.telemetry.TelemetryModule; import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics; import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator; @@ -32,12 +32,12 @@ import java.util.List; final class ScribeSpanConsumer implements Scribe.AsyncIface { - private final SpanForward spanForward; + private final SpanForwardService spanForward; private final String category; private final HistogramMetrics histogram; - ScribeSpanConsumer(SpanForward spanForward, String category, ModuleManager moduleManager) { + ScribeSpanConsumer(SpanForwardService spanForward, String category, ModuleManager moduleManager) { this.spanForward = spanForward; this.category = category; diff --git a/zipkin-server/receiver-zipkin-scribe/src/main/java/zipkin/server/receiver/zipkin/scribe/ZipkinScribeProvider.java b/zipkin-server/receiver-zipkin-scribe/src/main/java/zipkin/server/receiver/zipkin/scribe/ZipkinScribeProvider.java index 89e462c7216..55fc216bd05 100644 --- a/zipkin-server/receiver-zipkin-scribe/src/main/java/zipkin/server/receiver/zipkin/scribe/ZipkinScribeProvider.java +++ b/zipkin-server/receiver-zipkin-scribe/src/main/java/zipkin/server/receiver/zipkin/scribe/ZipkinScribeProvider.java @@ -14,19 +14,17 @@ package zipkin.server.receiver.zipkin.scribe; -import org.apache.skywalking.oap.server.core.CoreModule; -import org.apache.skywalking.oap.server.core.config.ConfigService; import org.apache.skywalking.oap.server.library.module.ModuleConfig; import org.apache.skywalking.oap.server.library.module.ModuleDefine; import org.apache.skywalking.oap.server.library.module.ModuleProvider; import org.apache.skywalking.oap.server.library.module.ModuleStartException; import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException; -import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward; -import zipkin.server.core.services.ZipkinConfigService; +import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService; +import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverModule; public class ZipkinScribeProvider extends ModuleProvider { private ZipkinScribeConfig moduleConfig; - private SpanForward spanForward; + private SpanForwardService spanForward; @Override public String name() { @@ -59,8 +57,7 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException { @Override public void start() throws ServiceNotProvidedException, ModuleStartException { - final ConfigService service = getManager().find(CoreModule.NAME).provider().getService(ConfigService.class); - this.spanForward = new SpanForward(((ZipkinConfigService)service).toZipkinReceiverConfig(), getManager()); + this.spanForward = getManager().find(ZipkinReceiverModule.NAME).provider().getService(SpanForwardService.class); } @Override diff --git a/zipkin-server/server-core/pom.xml b/zipkin-server/server-core/pom.xml index 02b3525b5a0..32e2b5ca570 100644 --- a/zipkin-server/server-core/pom.xml +++ b/zipkin-server/server-core/pom.xml @@ -29,6 +29,11 @@ classgraph 4.8.162 + + ${armeria.groupId} + armeria-grpc + ${armeria.version} + \ No newline at end of file diff --git a/zipkin-server/server-core/src/main/java/zipkin/server/core/CoreModuleConfig.java b/zipkin-server/server-core/src/main/java/zipkin/server/core/CoreModuleConfig.java index dbbb0dd71be..80bfb05e7cb 100644 --- a/zipkin-server/server-core/src/main/java/zipkin/server/core/CoreModuleConfig.java +++ b/zipkin-server/server-core/src/main/java/zipkin/server/core/CoreModuleConfig.java @@ -17,29 +17,21 @@ import org.apache.skywalking.oap.server.library.module.ModuleConfig; public class CoreModuleConfig extends ModuleConfig { - private String gRPCHost; - private int gRPCPort; - private boolean gRPCSslEnabled = false; - private String gRPCSslKeyPath; - private String gRPCSslCertChainPath; - private String gRPCSslTrustedCAPath; - private int gRPCThreadPoolSize; - private int gRPCThreadPoolQueueSize; - private int gRPCMaxConcurrentCallsPerConnection; - private int gRPCMaxMessageSize; - - private String restHost; - private int restPort; - private String restContextPath; - private int restMaxThreads = 200; - private long restIdleTimeOut = 30000; - private int restAcceptQueueSize = 0; + private String serverHost; + private int serverPort; + private int serverMaxThreads = 200; + private long serverIdleTimeOut = 30000; + private int serverAcceptQueueSize = 0; /** * The maximum size in bytes allowed for request headers. * Use -1 to disable it. */ - private int restMaxRequestHeaderSize = 8192; - + private int serverMaxRequestHeaderSize = 8192; + private boolean serverEnableTLS = false; + private String serverTLSKeyPath; + private String serverTLSCertChainPath; + private String clusterSslTrustedCAPath; + /** * The max length of the service name. */ @@ -221,147 +213,91 @@ public void setRemoteTimeout(int remoteTimeout) { this.remoteTimeout = remoteTimeout; } - public String getGRPCHost() { - return gRPCHost; - } - - public void setGRPCHost(String gRPCHost) { - this.gRPCHost = gRPCHost; - } - - public int getGRPCPort() { - return gRPCPort; - } - - public void setGRPCPort(int gRPCPort) { - this.gRPCPort = gRPCPort; - } - - public boolean getGRPCSslEnabled() { - return gRPCSslEnabled; - } - - public void setGRPCSslEnabled(boolean gRPCSslEnabled) { - this.gRPCSslEnabled = gRPCSslEnabled; - } - - public String getGRPCSslKeyPath() { - return gRPCSslKeyPath; - } - - public void setGRPCSslKeyPath(String gRPCSslKeyPath) { - this.gRPCSslKeyPath = gRPCSslKeyPath; - } - - public String getGRPCSslCertChainPath() { - return gRPCSslCertChainPath; - } - - public void setGRPCSslCertChainPath(String gRPCSslCertChainPath) { - this.gRPCSslCertChainPath = gRPCSslCertChainPath; - } - - public String getGRPCSslTrustedCAPath() { - return gRPCSslTrustedCAPath; - } - - public void setGRPCSslTrustedCAPath(String gRPCSslTrustedCAPath) { - this.gRPCSslTrustedCAPath = gRPCSslTrustedCAPath; - } - - public int getGRPCThreadPoolSize() { - return gRPCThreadPoolSize; - } - - public void setGRPCThreadPoolSize(int gRPCThreadPoolSize) { - this.gRPCThreadPoolSize = gRPCThreadPoolSize; - } - - public int getGRPCThreadPoolQueueSize() { - return gRPCThreadPoolQueueSize; + public boolean getSearchEnable() { + return searchEnable; } - public void setGRPCThreadPoolQueueSize(int gRPCThreadPoolQueueSize) { - this.gRPCThreadPoolQueueSize = gRPCThreadPoolQueueSize; + public void setSearchEnable(boolean searchEnable) { + this.searchEnable = searchEnable; } - public int getGRPCMaxConcurrentCallsPerConnection() { - return gRPCMaxConcurrentCallsPerConnection; + public String getServerHost() { + return serverHost; } - public void setGRPCMaxConcurrentCallsPerConnection(int gRPCMaxConcurrentCallsPerConnection) { - this.gRPCMaxConcurrentCallsPerConnection = gRPCMaxConcurrentCallsPerConnection; + public void setServerHost(String serverHost) { + this.serverHost = serverHost; } - public int getGRPCMaxMessageSize() { - return gRPCMaxMessageSize; + public int getServerPort() { + return serverPort; } - public void setGRPCMaxMessageSize(int gRPCMaxMessageSize) { - this.gRPCMaxMessageSize = gRPCMaxMessageSize; + public void setServerPort(int serverPort) { + this.serverPort = serverPort; } - public String getRestHost() { - return restHost; + public int getServerMaxThreads() { + return serverMaxThreads; } - public void setRestHost(String restHost) { - this.restHost = restHost; + public void setServerMaxThreads(int serverMaxThreads) { + this.serverMaxThreads = serverMaxThreads; } - public int getRestPort() { - return restPort; + public long getServerIdleTimeOut() { + return serverIdleTimeOut; } - public void setRestPort(int restPort) { - this.restPort = restPort; + public void setServerIdleTimeOut(long serverIdleTimeOut) { + this.serverIdleTimeOut = serverIdleTimeOut; } - public String getRestContextPath() { - return restContextPath; + public int getServerAcceptQueueSize() { + return serverAcceptQueueSize; } - public void setRestContextPath(String restContextPath) { - this.restContextPath = restContextPath; + public void setServerAcceptQueueSize(int serverAcceptQueueSize) { + this.serverAcceptQueueSize = serverAcceptQueueSize; } - public int getRestMaxThreads() { - return restMaxThreads; + public int getServerMaxRequestHeaderSize() { + return serverMaxRequestHeaderSize; } - public void setRestMaxThreads(int restMaxThreads) { - this.restMaxThreads = restMaxThreads; + public void setServerMaxRequestHeaderSize(int serverMaxRequestHeaderSize) { + this.serverMaxRequestHeaderSize = serverMaxRequestHeaderSize; } - public long getRestIdleTimeOut() { - return restIdleTimeOut; + public boolean getServerEnableTLS() { + return serverEnableTLS; } - public void setRestIdleTimeOut(long restIdleTimeOut) { - this.restIdleTimeOut = restIdleTimeOut; + public void setServerEnableTLS(boolean serverEnableTLS) { + this.serverEnableTLS = serverEnableTLS; } - public int getRestAcceptQueueSize() { - return restAcceptQueueSize; + public String getServerTLSKeyPath() { + return serverTLSKeyPath; } - public void setRestAcceptQueueSize(int restAcceptQueueSize) { - this.restAcceptQueueSize = restAcceptQueueSize; + public void setServerTLSKeyPath(String serverTLSKeyPath) { + this.serverTLSKeyPath = serverTLSKeyPath; } - public int getRestMaxRequestHeaderSize() { - return restMaxRequestHeaderSize; + public String getServerTLSCertChainPath() { + return serverTLSCertChainPath; } - public void setRestMaxRequestHeaderSize(int restMaxRequestHeaderSize) { - this.restMaxRequestHeaderSize = restMaxRequestHeaderSize; + public void setServerTLSCertChainPath(String serverTLSCertChainPath) { + this.serverTLSCertChainPath = serverTLSCertChainPath; } - public boolean getSearchEnable() { - return searchEnable; + public String getClusterSslTrustedCAPath() { + return clusterSslTrustedCAPath; } - public void setSearchEnable(boolean searchEnable) { - this.searchEnable = searchEnable; + public void setClusterSslTrustedCAPath(String clusterSslTrustedCAPath) { + this.clusterSslTrustedCAPath = clusterSslTrustedCAPath; } } diff --git a/zipkin-server/server-core/src/main/java/zipkin/server/core/CoreModuleProvider.java b/zipkin-server/server-core/src/main/java/zipkin/server/core/CoreModuleProvider.java index 53b91e86cf4..d6c6c79f59f 100644 --- a/zipkin-server/server-core/src/main/java/zipkin/server/core/CoreModuleProvider.java +++ b/zipkin-server/server-core/src/main/java/zipkin/server/core/CoreModuleProvider.java @@ -18,7 +18,6 @@ import org.apache.skywalking.oap.server.core.RunningMode; import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem; import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor; -import org.apache.skywalking.oap.server.core.annotation.AnnotationScan; import org.apache.skywalking.oap.server.core.cache.NetworkAddressAliasCache; import org.apache.skywalking.oap.server.core.cache.ProfileTaskCache; import org.apache.skywalking.oap.server.core.cluster.ClusterCoordinator; @@ -58,7 +57,6 @@ import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager; import org.apache.skywalking.oap.server.core.remote.health.HealthCheckServiceHandler; import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister; -import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegisterImpl; import org.apache.skywalking.oap.server.core.server.HTTPHandlerRegister; import org.apache.skywalking.oap.server.core.server.HTTPHandlerRegisterImpl; import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; @@ -79,8 +77,6 @@ import org.apache.skywalking.oap.server.library.module.ModuleProvider; import org.apache.skywalking.oap.server.library.module.ModuleStartException; import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException; -import org.apache.skywalking.oap.server.library.server.ServerException; -import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer; import org.apache.skywalking.oap.server.library.server.http.HTTPServer; import org.apache.skywalking.oap.server.library.server.http.HTTPServerConfig; import org.apache.skywalking.oap.server.telemetry.api.TelemetryRelatedContext; @@ -102,8 +98,7 @@ public class CoreModuleProvider extends ModuleProvider { private EndpointNameGrouping endpointNameGrouping; private ZipkinSourceReceiverImpl receiver; private RemoteClientManager remoteClientManager; - private GRPCServer grpcServer; - private HTTPServer httpServer; + private HTTPServer server; public CoreModuleProvider() { this.annotationScan = new ZipkinAnnotationScan(); @@ -151,48 +146,25 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException { annotationScan.registerListener(new ZipkinStreamAnnotationListener(getManager(), moduleConfig.getSearchEnable())); HTTPServerConfig httpServerConfig = HTTPServerConfig.builder() - .host(moduleConfig.getRestHost()) - .port(moduleConfig.getRestPort()) - .contextPath(moduleConfig.getRestContextPath()) - .idleTimeOut(moduleConfig.getRestIdleTimeOut()) - .maxThreads(moduleConfig.getRestMaxThreads()) - .acceptQueueSize( - moduleConfig.getRestAcceptQueueSize()) - .maxRequestHeaderSize( - moduleConfig.getRestMaxRequestHeaderSize()) + .host(moduleConfig.getServerHost()) + .port(moduleConfig.getServerPort()) + .contextPath("/") + .idleTimeOut(moduleConfig.getServerIdleTimeOut()) + .maxThreads(moduleConfig.getServerMaxThreads()) + .acceptQueueSize(moduleConfig.getServerAcceptQueueSize()) + .maxRequestHeaderSize(moduleConfig.getServerMaxRequestHeaderSize()) + .enableTLS(moduleConfig.getServerEnableTLS()) + .tlsKeyPath(moduleConfig.getServerTLSKeyPath()) + .tlsCertChainPath(moduleConfig.getServerTLSCertChainPath()) .build(); - httpServer = new HTTPConfigurableServer(httpServerConfig); - httpServer.initialize(); + server = new HTTPConfigurableServer(httpServerConfig); + server.initialize(); // "/info" handler - httpServer.addHandler(new HTTPInfoHandler(), Arrays.asList(HttpMethod.GET, HttpMethod.POST)); + server.addHandler(new HTTPInfoHandler(), Arrays.asList(HttpMethod.GET, HttpMethod.POST)); - // grpc - if (moduleConfig.getGRPCSslEnabled()) { - grpcServer = new GRPCServer(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(), - moduleConfig.getGRPCSslCertChainPath(), - moduleConfig.getGRPCSslKeyPath(), - null - ); - } else { - grpcServer = new GRPCServer(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort()); - } - if (moduleConfig.getGRPCMaxConcurrentCallsPerConnection() > 0) { - grpcServer.setMaxConcurrentCallsPerConnection(moduleConfig.getGRPCMaxConcurrentCallsPerConnection()); - } - if (moduleConfig.getGRPCMaxMessageSize() > 0) { - grpcServer.setMaxMessageSize(moduleConfig.getGRPCMaxMessageSize()); - } - if (moduleConfig.getGRPCThreadPoolQueueSize() > 0) { - grpcServer.setThreadPoolQueueSize(moduleConfig.getGRPCThreadPoolQueueSize()); - } - if (moduleConfig.getGRPCThreadPoolSize() > 0) { - grpcServer.setThreadPoolSize(moduleConfig.getGRPCThreadPoolSize()); - } - grpcServer.initialize(); - - if (moduleConfig.getGRPCSslEnabled()) { + if (moduleConfig.getServerEnableTLS()) { this.remoteClientManager = new RemoteClientManager(getManager(), moduleConfig.getRemoteTimeout(), - moduleConfig.getGRPCSslTrustedCAPath() + moduleConfig.getClusterSslTrustedCAPath() ); } else { this.remoteClientManager = new RemoteClientManager(getManager(), moduleConfig.getRemoteTimeout()); @@ -203,8 +175,8 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException { this.registerServiceImplementation(ConfigService.class, new ZipkinConfigService(moduleConfig, this)); this.registerServiceImplementation(ServerStatusService.class, new ServerStatusService(getManager())); this.registerServiceImplementation(DownSamplingConfigService.class, new DownSamplingConfigService(Collections.emptyList())); - this.registerServiceImplementation(GRPCHandlerRegister.class, new GRPCHandlerRegisterImpl(grpcServer)); - this.registerServiceImplementation(HTTPHandlerRegister.class, new HTTPHandlerRegisterImpl(httpServer)); + this.registerServiceImplementation(GRPCHandlerRegister.class, new GRPCHandlerRegisterAdapter(server)); + this.registerServiceImplementation(HTTPHandlerRegister.class, new HTTPHandlerRegisterImpl(server)); this.registerServiceImplementation(IComponentLibraryCatalogService.class, new EmptyComponentLibraryCatalogService()); this.registerServiceImplementation(SourceReceiver.class, receiver); final WorkerInstancesService instancesService = new WorkerInstancesService(); @@ -258,8 +230,8 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException { @Override public void start() throws ServiceNotProvidedException, ModuleStartException { - grpcServer.addHandler(new RemoteServiceHandler(getManager())); - grpcServer.addHandler(new HealthCheckServiceHandler()); + server.addHandler(new RemoteServiceHandler(getManager()), Arrays.asList(HttpMethod.GET)); + server.addHandler(new HealthCheckServiceHandler(), Arrays.asList(HttpMethod.GET)); try { receiver.scan(); @@ -268,7 +240,7 @@ public void start() throws ServiceNotProvidedException, ModuleStartException { throw new ModuleStartException(e.getMessage(), e); } - Address gRPCServerInstanceAddress = new Address(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(), true); + Address gRPCServerInstanceAddress = new Address(moduleConfig.getServerHost(), moduleConfig.getServerPort(), true); TelemetryRelatedContext.INSTANCE.setId(gRPCServerInstanceAddress.toString()); ClusterCoordinator coordinator = this.getManager() .find(ClusterModule.NAME) @@ -282,14 +254,9 @@ public void start() throws ServiceNotProvidedException, ModuleStartException { @Override public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException { - try { - if (!RunningMode.isInitMode()) { - grpcServer.start(); - httpServer.start(); - remoteClientManager.start(); - } - } catch (ServerException e) { - throw new ModuleStartException(e.getMessage(), e); + if (!RunningMode.isInitMode()) { + server.start(); + remoteClientManager.start(); } final org.apache.skywalking.oap.server.core.CoreModuleConfig swConfig = this.moduleConfig.toSkyWalkingConfig(); diff --git a/zipkin-server/server-core/src/main/java/zipkin/server/core/GRPCHandlerRegisterAdapter.java b/zipkin-server/server-core/src/main/java/zipkin/server/core/GRPCHandlerRegisterAdapter.java new file mode 100644 index 00000000000..208042b982f --- /dev/null +++ b/zipkin-server/server-core/src/main/java/zipkin/server/core/GRPCHandlerRegisterAdapter.java @@ -0,0 +1,47 @@ +/* + * Copyright 2015-2023 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.server.core; + +import com.linecorp.armeria.common.HttpMethod; +import io.grpc.BindableService; +import io.grpc.ServerInterceptor; +import io.grpc.ServerServiceDefinition; +import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister; +import org.apache.skywalking.oap.server.library.server.http.HTTPServer; + +import java.util.Arrays; + +public class GRPCHandlerRegisterAdapter implements GRPCHandlerRegister { + + private final HTTPServer server; + + public GRPCHandlerRegisterAdapter(HTTPServer server) { + this.server = server; + } + + @Override + public void addHandler(BindableService handler) { + server.addHandler(handler, Arrays.asList(HttpMethod.GET)); + } + + @Override + public void addHandler(ServerServiceDefinition definition) { + server.addHandler(definition, Arrays.asList(HttpMethod.GET)); + } + + @Override + public void addFilter(ServerInterceptor interceptor) { + server.addHandler(interceptor, Arrays.asList(HttpMethod.GET)); + } +} diff --git a/zipkin-server/server-core/src/main/java/zipkin/server/core/services/HTTPConfigurableServer.java b/zipkin-server/server-core/src/main/java/zipkin/server/core/services/HTTPConfigurableServer.java index 05d9e219e0f..6121a3794bd 100644 --- a/zipkin-server/server-core/src/main/java/zipkin/server/core/services/HTTPConfigurableServer.java +++ b/zipkin-server/server-core/src/main/java/zipkin/server/core/services/HTTPConfigurableServer.java @@ -16,6 +16,10 @@ import com.linecorp.armeria.common.HttpMethod; import com.linecorp.armeria.server.ServerBuilder; +import com.linecorp.armeria.server.grpc.GrpcService; +import io.grpc.BindableService; +import io.grpc.ServerInterceptor; +import io.grpc.ServerServiceDefinition; import org.apache.skywalking.oap.server.library.server.http.HTTPServer; import org.apache.skywalking.oap.server.library.server.http.HTTPServerConfig; @@ -32,6 +36,15 @@ public void addHandler(Object handler, List httpMethods) { ((ServerConfiguration) handler).configure(sb); allowedMethods.addAll(httpMethods); return; + } else if (handler instanceof BindableService) { + sb.service(GrpcService.builder().addService((BindableService) handler).build()); + return; + } else if (handler instanceof ServerServiceDefinition) { + sb.service(GrpcService.builder().addService((ServerServiceDefinition) handler).build()); + return; + } else if (handler instanceof ServerInterceptor) { + sb.service(GrpcService.builder().intercept((ServerInterceptor) handler).build()); + return; } super.addHandler(handler, httpMethods); } diff --git a/zipkin-server/server-core/src/main/java/zipkin/server/core/services/ZipkinConfigService.java b/zipkin-server/server-core/src/main/java/zipkin/server/core/services/ZipkinConfigService.java index 2c1403b5da4..7cfdc8aa3de 100644 --- a/zipkin-server/server-core/src/main/java/zipkin/server/core/services/ZipkinConfigService.java +++ b/zipkin-server/server-core/src/main/java/zipkin/server/core/services/ZipkinConfigService.java @@ -30,10 +30,4 @@ public boolean getSearchEnable() { return moduleConfig.getSearchEnable(); } - public ZipkinReceiverConfig toZipkinReceiverConfig() { - final ZipkinReceiverConfig config = new ZipkinReceiverConfig(); - config.setSearchableTracesTags(moduleConfig.getSearchableTracesTags()); - config.setSampleRate((int) (moduleConfig.getTraceSampleRate() * 10000)); - return config; - } } diff --git a/zipkin-server/server-starter/pom.xml b/zipkin-server/server-starter/pom.xml index 23abdceb191..0f4baa32c62 100644 --- a/zipkin-server/server-starter/pom.xml +++ b/zipkin-server/server-starter/pom.xml @@ -84,6 +84,11 @@ + + io.zipkin + receiver-zipkin-core + ${project.version} + io.zipkin receiver-zipkin-http @@ -109,6 +114,11 @@ receiver-zipkin-scribe ${project.version} + + io.zipkin + receiver-otlp-trace + ${project.version} + diff --git a/zipkin-server/server-starter/src/main/resources/application.yml b/zipkin-server/server-starter/src/main/resources/application.yml index d855028ec28..7d5e8b5e1f2 100644 --- a/zipkin-server/server-starter/src/main/resources/application.yml +++ b/zipkin-server/server-starter/src/main/resources/application.yml @@ -29,32 +29,21 @@ core: l1FlushPeriod: ${ZIPKIN_CORE_L1_AGGREGATION_FLUSH_PERIOD:500} # The threshold of session time. Unit is ms. Default value is 70s. storageSessionTimeout: ${ZIPKIN_CORE_STORAGE_SESSION_TIMEOUT:70000} - # Defines a set of span tag keys which are searchable. - # The max length of key=value should be less than 256 or will be dropped. - searchableTracesTags: ${ZIPKIN_SEARCHABLE_TAG_KEYS:http.method} - # The trace sample rate precision is 0.0001, should be between 0 and 1 - traceSampleRate: ${ZIPKIN_SAMPLE_RATE:1} # The number of threads used to prepare metrics data to the storage. prepareThreads: ${ZIPKIN_PREPARE_THREADS:2} # The period of doing data persistence. Unit is second.Default value is 25s persistentPeriod: ${ZIPKIN_PERSISTENT_PERIOD:25} - gRPCHost: ${ZIPKIN_GRPC_HOST:0.0.0.0} - gRPCPort: ${ZIPKIN_GRPC_PORT:11800} - gRPCThreadPoolQueueSize: ${ZIPKIN_GRPC_POOL_QUEUE_SIZE:-1} - gRPCThreadPoolSize: ${ZIPKIN_GRPC_THREAD_POOL_SIZE:-1} - gRPCSslEnabled: ${ZIPKIN_GRPC_SSL_ENABLED:false} - gRPCSslKeyPath: ${ZIPKIN_GRPC_SSL_KEY_PATH:""} - gRPCSslCertChainPath: ${ZIPKIN_GRPC_SSL_CERT_CHAIN_PATH:""} - gRPCSslTrustedCAPath: ${ZIPKIN_GRPC_SSL_TRUSTED_CA_PATH:""} - gRPCMaxConcurrentCallsPerConnection: ${ZIPKIN_GRPC_MAX_CONCURRENT_CALL:0} - gRPCMaxMessageSize: ${ZIPKIN_GRPC_MAX_MESSAGE_SIZE:0} - restHost: ${ZIPKIN_REST_HOST:0.0.0.0} - restPort: ${ZIPKIN_REST_PORT:9411} - restContextPath: ${ZIPKIN_REST_CONTEXT_PATH:/} - restMaxThreads: ${ZIPKIN_REST_MAX_THREADS:200} - restIdleTimeOut: ${ZIPKIN_REST_IDLE_TIMEOUT:30000} - restAcceptQueueSize: ${ZIPKIN_REST_QUEUE_SIZE:0} - restMaxRequestHeaderSize: ${ZIPKIN_REST_MAX_REQUEST_HEADER_SIZE:8192} + serverHost: ${ZIPKIN_SERVER_HOST:0.0.0.0} + serverPort: ${ZIPKIN_SERVER_PORT:9411} + serverMaxThreads: ${ZIPKIN_SERVER_MAX_THREADS:200} + serverIdleTimeOut: ${ZIPKIN_SERVER_IDLE_TIMEOUT:30000} + serverAcceptQueueSize: ${ZIPKIN_SERVER_QUEUE_SIZE:0} + serverMaxRequestHeaderSize: ${ZIPKIN_SERVER_MAX_REQUEST_HEADER_SIZE:8192} + serverThreadPoolQueueSize: ${ZIPKIN_SERVER_POOL_QUEUE_SIZE:-1} + serverEnableTLS: ${ZIPKIN_SERVER_SSL_ENABLED:false} + serverTLSKeyPath: ${ZIPKIN_SERVER_SSL_KEY_PATH:""} + serverTLSCertChainPath: ${ZIPKIN_SERVER_SSL_CERT_CHAIN_PATH:""} + clusterSslTrustedCAPath: ${ZIPKIN_SERVER_SSL_TRUSTED_CA_PATH:""} searchEnable: ${ZIPKIN_SEARCH_ENABLED:true} storage: &storage @@ -168,17 +157,18 @@ storage: &storage zipkin-dependency-storage-ext: *storage +receiver-zipkin: + selector: zipkin + zipkin: + # Defines a set of span tag keys which are searchable. + # The max length of key=value should be less than 256 or will be dropped. + searchableTracesTags: ${ZIPKIN_SEARCHABLE_TAG_KEYS:http.method} + # The trace sample rate precision is 0.0001, should be between 0 and 1 + traceSampleRate: ${ZIPKIN_SAMPLE_RATE:1} + receiver-zipkin-http: selector: ${ZIPKIN_RECEIVER_ZIPKIN_HTTP:default} default: - restHost: ${ZIPKIN_RECEIVER_HTTP_REST_HOST:0.0.0.0} - # The port number of the HTTP service, the default HTTP service in the core would be used if the value is smaller than 0. - restPort: ${ZIPKIN_RECEIVER_HTTP_REST_PORT:-1} - restContextPath: ${ZIPKIN_RECEIVER_HTTP_REST_CONTEXT_PATH:/} - restMaxThreads: ${ZIPKIN_RECEIVER_HTTP_REST_MAX_THREADS:200} - restIdleTimeOut: ${ZIPKIN_RECEIVER_HTTP_REST_IDLE_TIMEOUT:30000} - restAcceptQueueSize: ${ZIPKIN_RECEIVER_HTTP_REST_QUEUE_SIZE:0} - restMaxRequestHeaderSize: ${ZIPKIN_RECEIVER_HTTP_REST_MAX_REQUEST_HEADER_SIZE:8192} receiver-zipkin-kafka: selector: ${ZIPKIN_RECEIVER_ZIPKIN_KAFKA:-} @@ -228,6 +218,10 @@ receiver-zipkin-scribe: category: ${ZIPKIN_SCRIBE_CATEGORY:zipkin} port: ${ZIPKIN_COLLECTOR_PORT:9410} +receiver-otel: + selector: ${SW_OTEL_RECEIVER:zipkin} + zipkin: + ## This module is for Zipkin query API and support zipkin-lens UI query-zipkin: selector: ${ZIPKIN_QUERY_ZIPKIN:zipkin} diff --git a/zipkin-server/storage-cassandra/pom.xml b/zipkin-server/storage-cassandra/pom.xml index 3c96072f802..e7c53188b5f 100644 --- a/zipkin-server/storage-cassandra/pom.xml +++ b/zipkin-server/storage-cassandra/pom.xml @@ -16,7 +16,7 @@ io.zipkin - zipkin-server-core + receiver-zipkin-core ${project.version} diff --git a/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/CassandraTagAutocompleteDAO.java b/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/CassandraTagAutocompleteDAO.java index f4d761e6dd7..acf9e414edb 100644 --- a/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/CassandraTagAutocompleteDAO.java +++ b/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/CassandraTagAutocompleteDAO.java @@ -14,22 +14,20 @@ package zipkin.server.storage.cassandra.dao; -import org.apache.skywalking.oap.server.core.Const; -import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.TagAutocompleteData; import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.TagType; -import org.apache.skywalking.oap.server.core.config.ConfigService; import org.apache.skywalking.oap.server.core.query.input.Duration; import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService; +import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverModule; import org.apache.skywalking.oap.server.storage.plugin.jdbc.common.dao.JDBCTagAutoCompleteQueryDAO; -import zipkin.server.core.services.ZipkinConfigService; +import zipkin.server.receiver.zipkin.core.SpanForwardCore; import zipkin.server.storage.cassandra.CassandraClient; import zipkin.server.storage.cassandra.CassandraTableHelper; import java.util.HashSet; import java.util.Set; import java.util.stream.Collectors; -import java.util.stream.Stream; import static java.util.Objects.nonNull; @@ -51,9 +49,8 @@ private Set getTagAutocompleteKeys() { if (tagAutocompleteKeys != null) { return tagAutocompleteKeys; } - final ConfigService service = moduleManager.find(CoreModule.NAME).provider().getService(ConfigService.class); - tagAutocompleteKeys = Stream.of((((ZipkinConfigService) service).toZipkinReceiverConfig().getSearchableTracesTags()) - .split(Const.COMMA)).collect(Collectors.toSet()); + final SpanForwardService service = moduleManager.find(ZipkinReceiverModule.NAME).provider().getService(SpanForwardService.class); + tagAutocompleteKeys = ((SpanForwardCore) service).getTagAutocompleteKeys(); return tagAutocompleteKeys; } diff --git a/zipkin-server/storage-cassandra/src/test/java/zipkin/server/storage/cassandra/ITCassandraStorage.java b/zipkin-server/storage-cassandra/src/test/java/zipkin/server/storage/cassandra/ITCassandraStorage.java index bb66f3df247..b33ab735712 100644 --- a/zipkin-server/storage-cassandra/src/test/java/zipkin/server/storage/cassandra/ITCassandraStorage.java +++ b/zipkin-server/storage-cassandra/src/test/java/zipkin/server/storage/cassandra/ITCassandraStorage.java @@ -24,8 +24,8 @@ import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.module.ModuleNotFoundException; import org.apache.skywalking.oap.server.library.module.ModuleStartException; -import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig; -import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward; +import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService; +import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverModule; import org.joda.time.DateTime; import org.junit.Assert; import org.junit.jupiter.api.BeforeAll; @@ -54,16 +54,14 @@ public class ITCassandraStorage { CassandraExtension cassandra = new CassandraExtension(); private final ModuleManager moduleManager = new ModuleManager(); - private SpanForward forward; + private SpanForwardService forward; private ITagAutoCompleteQueryDAO tagAutoCompleteQueryDAO; private IZipkinQueryDAO zipkinQueryDAO; @BeforeAll public void setup() throws ModuleNotFoundException, ModuleConfigException, ModuleStartException { final ApplicationConfigLoader loader = new ApplicationConfigLoader(); moduleManager.init(loader.load()); - final ZipkinReceiverConfig config = new ZipkinReceiverConfig(); - config.setSearchableTracesTags("http.path"); - this.forward = new SpanForward(config, moduleManager); + this.forward = moduleManager.find(ZipkinReceiverModule.NAME).provider().getService(SpanForwardService.class); this.tagAutoCompleteQueryDAO = moduleManager.find(StorageModule.NAME).provider().getService(ITagAutoCompleteQueryDAO.class); this.zipkinQueryDAO = moduleManager.find(StorageModule.NAME).provider().getService(IZipkinQueryDAO.class); } diff --git a/zipkin-server/storage-cassandra/src/test/resources/application.yml b/zipkin-server/storage-cassandra/src/test/resources/application.yml index df0e4c489a8..8477b4f0d8c 100644 --- a/zipkin-server/storage-cassandra/src/test/resources/application.yml +++ b/zipkin-server/storage-cassandra/src/test/resources/application.yml @@ -29,32 +29,22 @@ core: l1FlushPeriod: ${ZIPKIN_CORE_L1_AGGREGATION_FLUSH_PERIOD:500} # The threshold of session time. Unit is ms. Default value is 70s. storageSessionTimeout: ${ZIPKIN_CORE_STORAGE_SESSION_TIMEOUT:70000} - # Defines a set of span tag keys which are searchable. - # The max length of key=value should be less than 256 or will be dropped. - searchableTracesTags: ${ZIPKIN_SEARCHABLE_TAG_KEYS:http.path} - # The trace sample rate precision is 1/10000, should be between 0 and 10000 - traceSampleRate: ${ZIPKIN_SAMPLE_RATE:10000} # The number of threads used to prepare metrics data to the storage. prepareThreads: ${ZIPKIN_PREPARE_THREADS:2} # The period of doing data persistence. Unit is second.Default value is 25s persistentPeriod: ${ZIPKIN_PERSISTENT_PERIOD:1} - gRPCHost: ${ZIPKIN_GRPC_HOST:0.0.0.0} - gRPCPort: ${ZIPKIN_GRPC_PORT:11800} - gRPCThreadPoolQueueSize: ${ZIPKIN_GRPC_POOL_QUEUE_SIZE:-1} - gRPCThreadPoolSize: ${ZIPKIN_GRPC_THREAD_POOL_SIZE:-1} - gRPCSslEnabled: ${ZIPKIN_GRPC_SSL_ENABLED:false} - gRPCSslKeyPath: ${ZIPKIN_GRPC_SSL_KEY_PATH:""} - gRPCSslCertChainPath: ${ZIPKIN_GRPC_SSL_CERT_CHAIN_PATH:""} - gRPCSslTrustedCAPath: ${ZIPKIN_GRPC_SSL_TRUSTED_CA_PATH:""} - gRPCMaxConcurrentCallsPerConnection: ${ZIPKIN_GRPC_MAX_CONCURRENT_CALL:0} - gRPCMaxMessageSize: ${ZIPKIN_GRPC_MAX_MESSAGE_SIZE:0} - restHost: ${ZIPKIN_REST_HOST:0.0.0.0} - restPort: ${ZIPKIN_REST_PORT:9411} - restContextPath: ${ZIPKIN_REST_CONTEXT_PATH:/} - restMaxThreads: ${ZIPKIN_REST_MAX_THREADS:200} - restIdleTimeOut: ${ZIPKIN_REST_IDLE_TIMEOUT:30000} - restAcceptQueueSize: ${ZIPKIN_REST_QUEUE_SIZE:0} - restMaxRequestHeaderSize: ${ZIPKIN_REST_MAX_REQUEST_HEADER_SIZE:8192} + serverHost: ${ZIPKIN_SERVER_HOST:0.0.0.0} + serverPort: ${ZIPKIN_SERVER_PORT:9411} + serverMaxThreads: ${ZIPKIN_SERVER_MAX_THREADS:200} + serverIdleTimeOut: ${ZIPKIN_SERVER_IDLE_TIMEOUT:30000} + serverAcceptQueueSize: ${ZIPKIN_SERVER_QUEUE_SIZE:0} + serverMaxRequestHeaderSize: ${ZIPKIN_SERVER_MAX_REQUEST_HEADER_SIZE:8192} + serverThreadPoolQueueSize: ${ZIPKIN_SERVER_POOL_QUEUE_SIZE:-1} + serverEnableTLS: ${ZIPKIN_SERVER_SSL_ENABLED:false} + serverTLSKeyPath: ${ZIPKIN_SERVER_SSL_KEY_PATH:""} + serverTLSCertChainPath: ${ZIPKIN_SERVER_SSL_CERT_CHAIN_PATH:""} + clusterSslTrustedCAPath: ${ZIPKIN_SERVER_SSL_TRUSTED_CA_PATH:""} + searchEnable: ${ZIPKIN_SEARCH_ENABLED:true} storage: selector: ${ZIPKIN_STORAGE:cassandra} @@ -74,6 +64,15 @@ storage: maxSizeOfBatchCql: ${ZIPKIN_STORAGE_CASSANDRA_MAX_SIZE_OF_BATCH_CQL:2000} asyncBatchPersistentPoolSize: ${ZIPKIN_STORAGE_CASSANDRA_ASYNC_BATCH_PERSISTENT_POOL_SIZE:4} +receiver-zipkin: + selector: zipkin + zipkin: + # Defines a set of span tag keys which are searchable. + # The max length of key=value should be less than 256 or will be dropped. + searchableTracesTags: ${ZIPKIN_SEARCHABLE_TAG_KEYS:http.path} + # The trace sample rate precision is 0.0001, should be between 0 and 1 + traceSampleRate: ${ZIPKIN_SAMPLE_RATE:1} + telemetry: selector: ${ZIPKIN_TELEMETRY:none} none: