diff --git a/.github/workflows/test-v3.yml b/.github/workflows/test-v3.yml index fd77a3efa90..50270097bcd 100644 --- a/.github/workflows/test-v3.yml +++ b/.github/workflows/test-v3.yml @@ -52,6 +52,7 @@ jobs: - name: receiver-zipkin-kafka - name: receiver-zipkin-rabbitmq - name: receiver-zipkin-scribe + - name: receiver-zipkin-grpc - name: storage-cassandra steps: - name: Checkout Repository diff --git a/zipkin-server/pom.xml b/zipkin-server/pom.xml index 37d1099a76f..91a38c4d363 100644 --- a/zipkin-server/pom.xml +++ b/zipkin-server/pom.xml @@ -80,6 +80,7 @@ zipkin-storage-ext receiver-zipkin-core receiver-otlp-trace + receiver-zipkin-grpc diff --git a/zipkin-server/receiver-zipkin-grpc/pom.xml b/zipkin-server/receiver-zipkin-grpc/pom.xml new file mode 100644 index 00000000000..091ca492757 --- /dev/null +++ b/zipkin-server/receiver-zipkin-grpc/pom.xml @@ -0,0 +1,27 @@ + + + 4.0.0 + + zipkin-server-parent + io.zipkin + 2.24.4-SNAPSHOT + + + receiver-zipkin-grpc + Zipkin gRPC Receiver + + + + io.zipkin + receiver-zipkin-core + ${project.version} + + + ${armeria.groupId} + armeria-grpc-protocol + ${armeria.version} + + + \ No newline at end of file diff --git a/zipkin-server/receiver-zipkin-grpc/src/main/java/zipkin/server/receiver/zipkin/grpc/ZipkinGRPCHandler.java b/zipkin-server/receiver-zipkin-grpc/src/main/java/zipkin/server/receiver/zipkin/grpc/ZipkinGRPCHandler.java new file mode 100644 index 00000000000..087fe79cb00 --- /dev/null +++ b/zipkin-server/receiver-zipkin-grpc/src/main/java/zipkin/server/receiver/zipkin/grpc/ZipkinGRPCHandler.java @@ -0,0 +1,111 @@ +/* + * Copyright 2015-2023 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package zipkin.server.receiver.zipkin.grpc; + +import com.linecorp.armeria.common.CommonPools; +import com.linecorp.armeria.server.ServiceRequestContext; +import com.linecorp.armeria.server.grpc.protocol.AbstractUnsafeUnaryGrpcService; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService; +import org.apache.skywalking.oap.server.telemetry.TelemetryModule; +import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics; +import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics; +import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator; +import org.apache.skywalking.oap.server.telemetry.api.MetricsTag; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import zipkin2.Callback; +import zipkin2.codec.SpanBytesDecoder; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executor; + +public class ZipkinGRPCHandler extends AbstractUnsafeUnaryGrpcService { + private static final Logger log = LoggerFactory.getLogger(ZipkinGRPCHandler.class.getName()); + + private final SpanForwardService spanForward; + + private final CounterMetrics msgDroppedIncr; + private final CounterMetrics errorCounter; + private final HistogramMetrics histogram; + + public ZipkinGRPCHandler(SpanForwardService spanForward, ModuleManager moduleManager) { + this.spanForward = spanForward; + + MetricsCreator metricsCreator = moduleManager.find(TelemetryModule.NAME) + .provider() + .getService(MetricsCreator.class); + histogram = metricsCreator.createHistogramMetric( + "trace_in_latency", + "The process latency of trace data", + new MetricsTag.Keys("protocol"), + new MetricsTag.Values("zipkin-kafka") + ); + msgDroppedIncr = metricsCreator.createCounter( + "trace_dropped_count", "The dropped number of traces", + new MetricsTag.Keys("protocol"), new MetricsTag.Values("zipkin-grpc")); + errorCounter = metricsCreator.createCounter( + "trace_analysis_error_count", "The error number of trace analysis", + new MetricsTag.Keys("protocol"), new MetricsTag.Values("zipkin-grpc") + ); + } + + @Override + protected CompletionStage handleMessage(ServiceRequestContext context, ByteBuf message) { + if (!message.isReadable()) { + msgDroppedIncr.inc(); + return CompletableFuture.completedFuture(message); // lenient on empty messages + } + + try { + CompletableFutureCallback result = new CompletableFutureCallback(); + + // collector.accept might block so need to move off the event loop. We make sure the + // callback is context aware to continue the trace. + Executor executor = ServiceRequestContext.mapCurrent( + ctx -> ctx.makeContextAware(ctx.blockingTaskExecutor()), + CommonPools::blockingTaskExecutor); + + executor.execute(() -> { + try (HistogramMetrics.Timer ignored = histogram.createTimer()) { + spanForward.send(SpanBytesDecoder.PROTO3.decodeList(message.nioBuffer())); + result.onSuccess(null); + } catch (Exception e) { + log.error("Failed to handle message", e); + errorCounter.inc(); + result.onError(e); + } + }); + return result; + } finally { + message.release(); + } + } + + static final class CompletableFutureCallback extends CompletableFuture + implements Callback { + + @Override public void onSuccess(Void value) { + complete(Unpooled.EMPTY_BUFFER); + } + + @Override public void onError(Throwable t) { + completeExceptionally(t); + } + } +} diff --git a/zipkin-server/receiver-zipkin-grpc/src/main/java/zipkin/server/receiver/zipkin/grpc/ZipkinGRPCModule.java b/zipkin-server/receiver-zipkin-grpc/src/main/java/zipkin/server/receiver/zipkin/grpc/ZipkinGRPCModule.java new file mode 100644 index 00000000000..004f7e4c428 --- /dev/null +++ b/zipkin-server/receiver-zipkin-grpc/src/main/java/zipkin/server/receiver/zipkin/grpc/ZipkinGRPCModule.java @@ -0,0 +1,29 @@ +/* + * Copyright 2015-2023 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package zipkin.server.receiver.zipkin.grpc; + +import org.apache.skywalking.oap.server.library.module.ModuleDefine; + +public class ZipkinGRPCModule extends ModuleDefine { + public static final String NAME = "receiver-zipkin-grpc"; + public ZipkinGRPCModule() { + super(NAME); + } + + @Override + public Class[] services() { + return new Class[0]; + } +} diff --git a/zipkin-server/receiver-zipkin-grpc/src/main/java/zipkin/server/receiver/zipkin/grpc/ZipkinGRPCProvider.java b/zipkin-server/receiver-zipkin-grpc/src/main/java/zipkin/server/receiver/zipkin/grpc/ZipkinGRPCProvider.java new file mode 100644 index 00000000000..9a57c74e25c --- /dev/null +++ b/zipkin-server/receiver-zipkin-grpc/src/main/java/zipkin/server/receiver/zipkin/grpc/ZipkinGRPCProvider.java @@ -0,0 +1,84 @@ +/* + * Copyright 2015-2023 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package zipkin.server.receiver.zipkin.grpc; + +import com.linecorp.armeria.common.HttpMethod; +import org.apache.skywalking.oap.server.core.CoreModule; +import org.apache.skywalking.oap.server.core.server.HTTPHandlerRegister; +import org.apache.skywalking.oap.server.library.module.ModuleConfig; +import org.apache.skywalking.oap.server.library.module.ModuleDefine; +import org.apache.skywalking.oap.server.library.module.ModuleProvider; +import org.apache.skywalking.oap.server.library.module.ModuleStartException; +import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException; +import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService; +import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverModule; +import zipkin.server.core.services.HTTPConfigurableServer; + +import java.util.Arrays; + +public class ZipkinGRPCProvider extends ModuleProvider { + private ZipkinGRPCReceiverConfig moduleConfig; + + @Override + public String name() { + return "default"; + } + + @Override + public Class module() { + return ZipkinGRPCModule.class; + } + + @Override + public ConfigCreator newConfigCreator() { + return new ConfigCreator() { + + @Override + public Class type() { + return ZipkinGRPCReceiverConfig.class; + } + + @Override + public void onInitialized(ZipkinGRPCReceiverConfig initialized) { + moduleConfig = initialized; + } + }; + } + + @Override + public void prepare() throws ServiceNotProvidedException, ModuleStartException { + } + + @Override + public void start() throws ServiceNotProvidedException, ModuleStartException { + final SpanForwardService spanForward = getManager().find(ZipkinReceiverModule.NAME).provider().getService(SpanForwardService.class); + final ZipkinGRPCHandler receiver = new ZipkinGRPCHandler(spanForward, getManager()); + + final HTTPHandlerRegister httpRegister = getManager().find(CoreModule.NAME).provider().getService(HTTPHandlerRegister.class); + httpRegister.addHandler( + (HTTPConfigurableServer.ServerConfiguration) builder -> builder.service("/zipkin.proto3.SpanService/Report", receiver), + Arrays.asList(HttpMethod.POST, HttpMethod.GET)); + } + + @Override + public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException { + + } + + @Override + public String[] requiredModules() { + return new String[0]; + } +} diff --git a/zipkin-server/receiver-zipkin-grpc/src/main/java/zipkin/server/receiver/zipkin/grpc/ZipkinGRPCReceiverConfig.java b/zipkin-server/receiver-zipkin-grpc/src/main/java/zipkin/server/receiver/zipkin/grpc/ZipkinGRPCReceiverConfig.java new file mode 100644 index 00000000000..05affe6b5c9 --- /dev/null +++ b/zipkin-server/receiver-zipkin-grpc/src/main/java/zipkin/server/receiver/zipkin/grpc/ZipkinGRPCReceiverConfig.java @@ -0,0 +1,20 @@ +/* + * Copyright 2015-2023 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package zipkin.server.receiver.zipkin.grpc; + +import org.apache.skywalking.oap.server.library.module.ModuleConfig; + +public class ZipkinGRPCReceiverConfig extends ModuleConfig { +} diff --git a/zipkin-server/receiver-zipkin-grpc/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleDefine b/zipkin-server/receiver-zipkin-grpc/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleDefine new file mode 100644 index 00000000000..4134dfc1cdc --- /dev/null +++ b/zipkin-server/receiver-zipkin-grpc/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleDefine @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# + +zipkin.server.receiver.zipkin.grpc.ZipkinGRPCModule diff --git a/zipkin-server/receiver-zipkin-grpc/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider b/zipkin-server/receiver-zipkin-grpc/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider new file mode 100755 index 00000000000..066d171e06f --- /dev/null +++ b/zipkin-server/receiver-zipkin-grpc/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# + +zipkin.server.receiver.zipkin.grpc.ZipkinGRPCProvider \ No newline at end of file diff --git a/zipkin-server/receiver-zipkin-grpc/src/test/java/zipkin/server/receiver/zipkin/grpc/ITZipkinGRPCHandler.java b/zipkin-server/receiver-zipkin-grpc/src/test/java/zipkin/server/receiver/zipkin/grpc/ITZipkinGRPCHandler.java new file mode 100644 index 00000000000..1e4492e8f7e --- /dev/null +++ b/zipkin-server/receiver-zipkin-grpc/src/test/java/zipkin/server/receiver/zipkin/grpc/ITZipkinGRPCHandler.java @@ -0,0 +1,123 @@ +/* + * Copyright 2015-2023 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package zipkin.server.receiver.zipkin.grpc; + +import com.linecorp.armeria.client.Clients; +import com.linecorp.armeria.client.grpc.protocol.UnaryGrpcClient; +import org.apache.skywalking.oap.server.core.CoreModule; +import org.apache.skywalking.oap.server.core.CoreModuleProvider; +import org.apache.skywalking.oap.server.core.server.HTTPHandlerRegister; +import org.apache.skywalking.oap.server.core.server.HTTPHandlerRegisterImpl; +import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.library.module.ModuleStartException; +import org.apache.skywalking.oap.server.library.server.http.HTTPServer; +import org.apache.skywalking.oap.server.library.server.http.HTTPServerConfig; +import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService; +import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverModule; +import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward; +import org.apache.skywalking.oap.server.telemetry.TelemetryModule; +import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator; +import org.apache.skywalking.oap.server.telemetry.none.MetricsCreatorNoop; +import org.apache.skywalking.oap.server.telemetry.none.NoneTelemetryProvider; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; +import org.powermock.reflect.Whitebox; +import zipkin.server.core.services.HTTPConfigurableServer; +import zipkin.server.receiver.zipkin.core.ZipkinReceiverCoreProvider; +import zipkin2.Span; +import zipkin2.TestObjects; +import zipkin2.codec.SpanBytesEncoder; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.LinkedBlockingQueue; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +@Timeout(60) +@ExtendWith(MockitoExtension.class) +public class ITZipkinGRPCHandler { + + private static final int port = 8000; + private ModuleManager moduleManager; + @Mock + private SpanForward forward; + private LinkedBlockingQueue> spans = new LinkedBlockingQueue<>(); + + @BeforeEach + public void setup() throws ModuleStartException { + final HTTPServer httpServer = new HTTPConfigurableServer(HTTPServerConfig.builder().host("0.0.0.0").port(port).contextPath("/").build()); + httpServer.initialize(); + moduleManager = setupModuleManager(httpServer, forward); + + final ZipkinGRPCProvider provider = new ZipkinGRPCProvider(); + provider.setManager(moduleManager); + final ZipkinGRPCReceiverConfig config = new ZipkinGRPCReceiverConfig(); + Whitebox.setInternalState(provider, ZipkinGRPCReceiverConfig.class, config); + provider.prepare(); + provider.start(); + httpServer.start(); + doAnswer(invocationOnMock -> { + spans.add(invocationOnMock.getArgument(0, ArrayList.class)); + return null; + }).when(forward).send(any()); + provider.notifyAfterCompleted(); + } + + @Test + public void test() throws Exception { + UnaryGrpcClient client = Clients.newClient("gproto+http://127.0.0.1:" + port, UnaryGrpcClient.class); + final CompletableFuture result = client.execute("zipkin.proto3.SpanService/Report", SpanBytesEncoder.PROTO3.encodeList(TestObjects.TRACE)); + result.get(); + assertThat(spans.take()).containsAll(TestObjects.TRACE); + } + + private ModuleManager setupModuleManager(HTTPServer httpServer, SpanForward forward) { + ModuleManager moduleManager = Mockito.mock(ModuleManager.class); + + CoreModule coreModule = Mockito.spy(CoreModule.class); + CoreModuleProvider moduleProvider = Mockito.mock(CoreModuleProvider.class); + Whitebox.setInternalState(coreModule, "loadedProvider", moduleProvider); + Mockito.when(moduleManager.find(CoreModule.NAME)).thenReturn(coreModule); + Mockito.when(coreModule.provider().getService(HTTPHandlerRegister.class)).thenReturn(new HTTPHandlerRegisterImpl(httpServer)); + + final ZipkinReceiverModule zipkinReceiverModule = Mockito.spy(ZipkinReceiverModule.class); + final ZipkinReceiverCoreProvider receiverProvider = Mockito.mock(ZipkinReceiverCoreProvider.class); + Whitebox.setInternalState(zipkinReceiverModule, "loadedProvider", receiverProvider); + Mockito.when(moduleManager.find(ZipkinReceiverModule.NAME)).thenReturn(zipkinReceiverModule); + Mockito.when(zipkinReceiverModule.provider().getService(SpanForwardService.class)).thenReturn(forward); + + TelemetryModule telemetryModule = Mockito.spy(TelemetryModule.class); + NoneTelemetryProvider noneTelemetryProvider = Mockito.mock(NoneTelemetryProvider.class); + Whitebox.setInternalState(telemetryModule, "loadedProvider", noneTelemetryProvider); + Mockito.when(moduleManager.find(TelemetryModule.NAME)).thenReturn(telemetryModule); + + Mockito.when(noneTelemetryProvider.getService(MetricsCreator.class)) + .thenReturn(new MetricsCreatorNoop()); + + return moduleManager; + } + +} diff --git a/zipkin-server/receiver-zipkin-grpc/src/test/resources/log4j2.xml b/zipkin-server/receiver-zipkin-grpc/src/test/resources/log4j2.xml new file mode 100644 index 00000000000..fd44be4ed94 --- /dev/null +++ b/zipkin-server/receiver-zipkin-grpc/src/test/resources/log4j2.xml @@ -0,0 +1,36 @@ + + + + + + + + + + + + + + + + + + + + diff --git a/zipkin-server/server-starter/pom.xml b/zipkin-server/server-starter/pom.xml index 0f4baa32c62..fe66380c289 100644 --- a/zipkin-server/server-starter/pom.xml +++ b/zipkin-server/server-starter/pom.xml @@ -114,6 +114,11 @@ receiver-zipkin-scribe ${project.version} + + io.zipkin + receiver-zipkin-grpc + ${project.version} + io.zipkin receiver-otlp-trace diff --git a/zipkin-server/server-starter/src/main/resources/application.yml b/zipkin-server/server-starter/src/main/resources/application.yml index 7d5e8b5e1f2..93d36eaab11 100644 --- a/zipkin-server/server-starter/src/main/resources/application.yml +++ b/zipkin-server/server-starter/src/main/resources/application.yml @@ -218,6 +218,10 @@ receiver-zipkin-scribe: category: ${ZIPKIN_SCRIBE_CATEGORY:zipkin} port: ${ZIPKIN_COLLECTOR_PORT:9410} +receiver-zipkin-grpc: + selector: ${ZIPKIN_RECEIVER_ZIPKIN_GRPC:-} + default: + receiver-otel: selector: ${SW_OTEL_RECEIVER:zipkin} zipkin: