From 82d0071665038c8cc6bdd14cef173f1a71649671 Mon Sep 17 00:00:00 2001 From: gkatzioura Date: Sun, 17 Dec 2023 23:01:30 +0000 Subject: [PATCH] Adds pub/sub collector and sender (#200) fixes #109 --- collector-pubsub/pom.xml | 75 +++++ .../collector/pubsub/PubSubCollector.java | 176 +++++++++++ .../collector/pubsub/SpanCallback.java | 38 +++ .../collector/pubsub/SpanMessageReceiver.java | 41 +++ .../collector/pubsub/SubscriberSettings.java | 129 ++++++++ .../collector/pubsub/PubSubCollectorTest.java | 132 ++++++++ .../pubsub/QueueBasedSubscriberImpl.java | 67 ++++ .../pubsub/StreamingPullStreamObserver.java | 102 ++++++ pom.xml | 13 + sender-pubsub/pom.xml | 73 +++++ .../zipkin2/reporter/pubsub/PubSubSender.java | 294 ++++++++++++++++++ .../PubSubSenderInitializationException.java | 33 ++ .../reporter/pubsub/PubSubSenderTest.java | 225 ++++++++++++++ 13 files changed, 1398 insertions(+) create mode 100644 collector-pubsub/pom.xml create mode 100644 collector-pubsub/src/main/java/zipkin2/collector/pubsub/PubSubCollector.java create mode 100644 collector-pubsub/src/main/java/zipkin2/collector/pubsub/SpanCallback.java create mode 100644 collector-pubsub/src/main/java/zipkin2/collector/pubsub/SpanMessageReceiver.java create mode 100644 collector-pubsub/src/main/java/zipkin2/collector/pubsub/SubscriberSettings.java create mode 100644 collector-pubsub/src/test/java/zipkin2/collector/pubsub/PubSubCollectorTest.java create mode 100644 collector-pubsub/src/test/java/zipkin2/collector/pubsub/QueueBasedSubscriberImpl.java create mode 100644 collector-pubsub/src/test/java/zipkin2/collector/pubsub/StreamingPullStreamObserver.java create mode 100644 sender-pubsub/pom.xml create mode 100644 sender-pubsub/src/main/java/zipkin2/reporter/pubsub/PubSubSender.java create mode 100644 sender-pubsub/src/main/java/zipkin2/reporter/pubsub/PubSubSenderInitializationException.java create mode 100644 sender-pubsub/src/test/java/zipkin2/reporter/pubsub/PubSubSenderTest.java diff --git a/collector-pubsub/pom.xml b/collector-pubsub/pom.xml new file mode 100644 index 0000000..7452254 --- /dev/null +++ b/collector-pubsub/pom.xml @@ -0,0 +1,75 @@ + + + + + zipkin-gcp-parent + io.zipkin.gcp + 1.0.3-SNAPSHOT + + 4.0.0 + + collector-pubsub + + + ${project.basedir}/.. + + + + + + com.google.cloud + libraries-bom + 24.1.2 + pom + import + + + + + + + com.google.cloud + google-cloud-pubsub + + + io.zipkin.zipkin2 + zipkin-collector + ${zipkin.version} + + + io.grpc + grpc-testing + 1.43.2 + test + + + com.google.api.grpc + grpc-google-cloud-pubsub-v1 + 1.97.1 + test + + + org.awaitility + awaitility + ${awaitility.version} + test + + + + \ No newline at end of file diff --git a/collector-pubsub/src/main/java/zipkin2/collector/pubsub/PubSubCollector.java b/collector-pubsub/src/main/java/zipkin2/collector/pubsub/PubSubCollector.java new file mode 100644 index 0000000..0c81840 --- /dev/null +++ b/collector-pubsub/src/main/java/zipkin2/collector/pubsub/PubSubCollector.java @@ -0,0 +1,176 @@ +/* + * Copyright 2016-2022 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.collector.pubsub; + +import java.io.IOException; + +import com.google.api.gax.core.ExecutorProvider; +import com.google.api.gax.rpc.ApiException; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.cloud.pubsub.v1.SubscriptionAdminClient; + +import zipkin2.CheckResult; +import zipkin2.codec.Encoding; +import zipkin2.collector.Collector; +import zipkin2.collector.CollectorComponent; +import zipkin2.collector.CollectorMetrics; +import zipkin2.collector.CollectorSampler; +import zipkin2.storage.StorageComponent; + +public class PubSubCollector extends CollectorComponent { + + public static final class Builder extends CollectorComponent.Builder { + + String subscription; + Encoding encoding = Encoding.JSON; + ExecutorProvider executorProvider; + SubscriptionAdminClient subscriptionAdminClient; + SubscriberSettings subscriberSettings; + + Collector.Builder delegate = Collector.newBuilder(PubSubCollector.class); + CollectorMetrics metrics = CollectorMetrics.NOOP_METRICS; + + public Builder(PubSubCollector pubSubCollector) { + this.subscription = pubSubCollector.subscription; + this.encoding = pubSubCollector.encoding; + this.executorProvider = pubSubCollector.executorProvider; + } + + @Override + public Builder storage(StorageComponent storageComponent) { + delegate.storage(storageComponent); + return this; + } + + @Override + public Builder metrics(CollectorMetrics metrics) { + if (metrics == null) throw new NullPointerException("metrics == null"); + delegate.metrics(this.metrics = metrics.forTransport("pubsub")); + return this; + } + + @Override + public Builder sampler(CollectorSampler collectorSampler) { + delegate.sampler(collectorSampler); + return this; + } + + /** PubSub subscription to receive spans. */ + public Builder subscription(String subscription) { + if (subscription == null) throw new NullPointerException("subscription == null"); + this.subscription = subscription; + return this; + } + + /** + * Use this to change the encoding used in messages. Default is {@linkplain Encoding#JSON} + * + *

Note: If ultimately sending to Zipkin, version 2.8+ is required to process protobuf. + */ + public Builder encoding(Encoding encoding) { + if (encoding == null) throw new NullPointerException("encoding == null"); + this.encoding = encoding; + return this; + } + + /** ExecutorProvider for PubSub operations **/ + public Builder executorProvider(ExecutorProvider executorProvider) { + if (executorProvider == null) throw new NullPointerException("executorProvider == null"); + this.executorProvider = executorProvider; + return this; + } + + public Builder subscriptionAdminClient(SubscriptionAdminClient subscriptionAdminClient) { + if (subscriptionAdminClient == null) throw new NullPointerException("subscriptionAdminClient == null"); + this.subscriptionAdminClient = subscriptionAdminClient; + return this; + } + + public Builder subscriberSettings(SubscriberSettings subscriberSettings) { + if (subscriberSettings == null) throw new NullPointerException("subscriberSettings == null"); + this.subscriberSettings = subscriberSettings; + return this; + } + + @Override + public PubSubCollector build() { + return new PubSubCollector(this); + } + + Builder() {} + } + + final Collector collector; + final CollectorMetrics metrics; + final String subscription; + final Encoding encoding; + Subscriber subscriber; + final ExecutorProvider executorProvider; + final SubscriptionAdminClient subscriptionAdminClient; + final SubscriberSettings subscriberSettings; + + PubSubCollector(Builder builder) { + this.collector = builder.delegate.build(); + this.metrics = builder.metrics; + this.subscription = builder.subscription; + this.encoding = builder.encoding; + this.executorProvider = builder.executorProvider; + this.subscriptionAdminClient = builder.subscriptionAdminClient; + this.subscriberSettings = builder.subscriberSettings; + } + + @Override + public CollectorComponent start() { + Subscriber.Builder builder = Subscriber.newBuilder(subscription, new SpanMessageReceiver(collector, metrics)); + subscriber = applyConfigurations(builder).build(); + subscriber.startAsync().awaitRunning(); + return this; + } + + private Subscriber.Builder applyConfigurations(Subscriber.Builder builder) { + if(subscriberSettings==null) { + return builder; + } + + subscriberSettings.getChannelProvider().ifPresent(builder::setChannelProvider); + subscriberSettings.getHeaderProvider().ifPresent(builder::setHeaderProvider); + subscriberSettings.getFlowControlSettings().ifPresent(builder::setFlowControlSettings); + builder.setUseLegacyFlowControl(subscriberSettings.isUseLegacyFlowControl()); + subscriberSettings.getMaxAckExtensionPeriod().ifPresent(builder::setMaxAckExtensionPeriod); + subscriberSettings.getMaxDurationPerAckExtension().ifPresent(builder::setMaxDurationPerAckExtension); + subscriberSettings.getExecutorProvider().ifPresent(builder::setExecutorProvider); + subscriberSettings.getCredentialsProvider().ifPresent(builder::setCredentialsProvider); + subscriberSettings.getSystemExecutorProvider().ifPresent(builder::setSystemExecutorProvider); + builder.setParallelPullCount(subscriberSettings.getParallelPullCount()); + subscriberSettings.getEndpoint().ifPresent(builder::setEndpoint); + + return builder; + } + + @Override + public CheckResult check() { + try { + subscriptionAdminClient.getSubscription(subscription); + return CheckResult.OK; + } catch (ApiException e) { + return CheckResult.failed(e); + } + } + + @Override + public void close() throws IOException { + subscriber.stopAsync().awaitTerminated(); + } + +} diff --git a/collector-pubsub/src/main/java/zipkin2/collector/pubsub/SpanCallback.java b/collector-pubsub/src/main/java/zipkin2/collector/pubsub/SpanCallback.java new file mode 100644 index 0000000..2a6fea4 --- /dev/null +++ b/collector-pubsub/src/main/java/zipkin2/collector/pubsub/SpanCallback.java @@ -0,0 +1,38 @@ +/* + * Copyright 2016-2022 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.collector.pubsub; + +import com.google.cloud.pubsub.v1.AckReplyConsumer; + +import zipkin2.Callback; + +final class SpanCallback implements Callback { + + private final AckReplyConsumer ackReplyConsumer; + + public SpanCallback(AckReplyConsumer ackReplyConsumer) { + this.ackReplyConsumer = ackReplyConsumer; + } + + @Override + public void onSuccess(Void value) { + ackReplyConsumer.ack(); + } + + @Override + public void onError(Throwable throwable) { + ackReplyConsumer.nack(); + } + +} diff --git a/collector-pubsub/src/main/java/zipkin2/collector/pubsub/SpanMessageReceiver.java b/collector-pubsub/src/main/java/zipkin2/collector/pubsub/SpanMessageReceiver.java new file mode 100644 index 0000000..7b3f02d --- /dev/null +++ b/collector-pubsub/src/main/java/zipkin2/collector/pubsub/SpanMessageReceiver.java @@ -0,0 +1,41 @@ +/* + * Copyright 2016-2022 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.collector.pubsub; + +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.pubsub.v1.PubsubMessage; + +import zipkin2.collector.Collector; +import zipkin2.collector.CollectorMetrics; + +final class SpanMessageReceiver implements MessageReceiver { + + final Collector collector; + final CollectorMetrics metrics; + + public SpanMessageReceiver(Collector collector, CollectorMetrics metrics) { + this.collector = collector; + this.metrics = metrics; + } + + @Override + public void receiveMessage(PubsubMessage pubsubMessage, AckReplyConsumer ackReplyConsumer) { + byte[] serialized = pubsubMessage.getData().toByteArray(); + metrics.incrementMessages(); + metrics.incrementBytes(serialized.length); + collector.acceptSpans(serialized, new SpanCallback(ackReplyConsumer)); + } + +} diff --git a/collector-pubsub/src/main/java/zipkin2/collector/pubsub/SubscriberSettings.java b/collector-pubsub/src/main/java/zipkin2/collector/pubsub/SubscriberSettings.java new file mode 100644 index 0000000..1251808 --- /dev/null +++ b/collector-pubsub/src/main/java/zipkin2/collector/pubsub/SubscriberSettings.java @@ -0,0 +1,129 @@ +/* + * Copyright 2016-2022 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.collector.pubsub; + +import java.util.Optional; + +import org.threeten.bp.Duration; + +import com.google.api.core.ApiClock; +import com.google.api.gax.batching.FlowControlSettings; +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.core.ExecutorProvider; +import com.google.api.gax.rpc.HeaderProvider; +import com.google.api.gax.rpc.TransportChannelProvider; + +public class SubscriberSettings { + + private Optional channelProvider = Optional.empty(); + private Optional headerProvider = Optional.empty(); + private Optional flowControlSettings = Optional.empty(); + private boolean useLegacyFlowControl = false; + private Optional maxAckExtensionPeriod = Optional.empty(); + private Optional maxDurationPerAckExtension = Optional.empty(); + private Optional executorProvider = Optional.empty(); + private Optional credentialsProvider = Optional.empty(); + private Optional systemExecutorProvider = Optional.empty(); + private int parallelPullCount = 1; + private Optional endpoint = Optional.empty(); + + public Optional getChannelProvider() { + return channelProvider; + } + + public void setChannelProvider(TransportChannelProvider channelProvider) { + this.channelProvider = Optional.of(channelProvider); + } + + public Optional getHeaderProvider() { + return headerProvider; + } + + public void setHeaderProvider(HeaderProvider headerProvider) { + this.headerProvider = Optional.of(headerProvider); + } + + public Optional getFlowControlSettings() { + return flowControlSettings; + } + + public void setFlowControlSettings(FlowControlSettings flowControlSettings) { + this.flowControlSettings = Optional.of(flowControlSettings); + } + + public boolean isUseLegacyFlowControl() { + return useLegacyFlowControl; + } + + public void setUseLegacyFlowControl(boolean useLegacyFlowControl) { + this.useLegacyFlowControl = useLegacyFlowControl; + } + + public Optional getMaxAckExtensionPeriod() { + return maxAckExtensionPeriod; + } + + public void setMaxAckExtensionPeriod(Duration maxAckExtensionPeriod) { + this.maxAckExtensionPeriod = Optional.of(maxAckExtensionPeriod); + } + + public Optional getMaxDurationPerAckExtension() { + return maxDurationPerAckExtension; + } + + public void setMaxDurationPerAckExtension(Duration maxDurationPerAckExtension) { + this.maxDurationPerAckExtension = Optional.of(maxDurationPerAckExtension); + } + + public Optional getExecutorProvider() { + return executorProvider; + } + + public void setExecutorProvider(ExecutorProvider executorProvider) { + this.executorProvider = Optional.of(executorProvider); + } + + public Optional getCredentialsProvider() { + return credentialsProvider; + } + + public void setCredentialsProvider(CredentialsProvider credentialsProvider) { + this.credentialsProvider = Optional.of(credentialsProvider); + } + + public Optional getSystemExecutorProvider() { + return systemExecutorProvider; + } + + public void setSystemExecutorProvider(ExecutorProvider systemExecutorProvider) { + this.systemExecutorProvider = Optional.of(systemExecutorProvider); + } + + public int getParallelPullCount() { + return parallelPullCount; + } + + public void setParallelPullCount(int parallelPullCount) { + this.parallelPullCount = parallelPullCount; + } + + public Optional getEndpoint() { + return endpoint; + } + + public void setEndpoint(String endpoint) { + this.endpoint = Optional.of(endpoint); + } + +} diff --git a/collector-pubsub/src/test/java/zipkin2/collector/pubsub/PubSubCollectorTest.java b/collector-pubsub/src/test/java/zipkin2/collector/pubsub/PubSubCollectorTest.java new file mode 100644 index 0000000..f1b90f8 --- /dev/null +++ b/collector-pubsub/src/test/java/zipkin2/collector/pubsub/PubSubCollectorTest.java @@ -0,0 +1,132 @@ +/* + * Copyright 2016-2022 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.collector.pubsub; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import com.google.api.gax.batching.FlowControlSettings; +import com.google.api.gax.core.ExecutorProvider; +import com.google.api.gax.core.InstantiatingExecutorProvider; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.TransportChannel; +import com.google.api.gax.rpc.TransportChannelProvider; + +import io.grpc.ManagedChannel; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.testing.GrpcCleanupRule; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import zipkin2.Span; +import static zipkin2.TestObjects.CLIENT_SPAN; +import static zipkin2.TestObjects.LOTS_OF_SPANS; +import zipkin2.codec.Encoding; +import zipkin2.collector.CollectorComponent; +import zipkin2.collector.InMemoryCollectorMetrics; +import zipkin2.storage.InMemoryStorage; + +public class PubSubCollectorTest { + + @Rule + public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); + + private InMemoryStorage store; + private InMemoryCollectorMetrics metrics; + private CollectorComponent collector; + + private QueueBasedSubscriberImpl subImplTest = new QueueBasedSubscriberImpl(); + + @Before + public void setup() throws IOException { + + String serverName = InProcessServerBuilder.generateName(); + + grpcCleanup.register(InProcessServerBuilder + .forName(serverName).directExecutor().addService(subImplTest).build().start()); + ExecutorProvider executorProvider = testExecutorProvider(); + + ManagedChannel managedChannel = InProcessChannelBuilder.forName(serverName).directExecutor().build(); + + TransportChannel transportChannel = GrpcTransportChannel.create(managedChannel); + TransportChannelProvider transportChannelProvider = FixedTransportChannelProvider.create(transportChannel); + + + store = InMemoryStorage.newBuilder().build(); + metrics = new InMemoryCollectorMetrics(); + + SubscriberSettings subscriberSettings = new SubscriberSettings(); + subscriberSettings.setChannelProvider(transportChannelProvider); + subscriberSettings.setExecutorProvider(executorProvider); + subscriberSettings.setFlowControlSettings(FlowControlSettings.newBuilder().setMaxOutstandingElementCount(1000l).build()); + + + collector = new PubSubCollector.Builder() + .subscription("projects/test-project/topics/test-subscription") + .storage(store) + .encoding(Encoding.JSON) + .executorProvider(executorProvider) + .subscriberSettings(subscriberSettings) + .metrics(metrics) + .build() + .start(); + metrics = metrics.forTransport("pubsub"); + } + + @Test + public void collectSpans() throws Exception { + List spans = Arrays.asList(LOTS_OF_SPANS[0], LOTS_OF_SPANS[1], LOTS_OF_SPANS[2]); + subImplTest.addSpans(spans); + assertSpansAccepted(spans); + } + + @Test + public void testNow() { + subImplTest.addSpan(CLIENT_SPAN); + await().atMost(10, TimeUnit.SECONDS).until(() -> store.acceptedSpanCount() == 1); + } + + @After + public void teardown() throws IOException, InterruptedException { + store.close(); + collector.close(); + } + + private InstantiatingExecutorProvider testExecutorProvider() { + return InstantiatingExecutorProvider.newBuilder() + .setExecutorThreadCount(5 * Runtime.getRuntime().availableProcessors()) + .build(); + } + + void assertSpansAccepted(List spans) throws Exception { + await().atMost(20, TimeUnit.SECONDS).until(() -> store.acceptedSpanCount() == 3); + + List someSpans = store.spanStore().getTrace(spans.get(0).traceId()).execute(); + + assertThat(metrics.messages()).as("check accept metrics.").isPositive(); + assertThat(metrics.bytes()).as("check bytes metrics.").isPositive(); + assertThat(metrics.messagesDropped()).as("check dropped metrics.").isEqualTo(0); + assertThat(someSpans).as("recorded spans should not be null").isNotNull(); + assertThat(spans).as("some spans have been recorded").containsAll(someSpans); + } + +} \ No newline at end of file diff --git a/collector-pubsub/src/test/java/zipkin2/collector/pubsub/QueueBasedSubscriberImpl.java b/collector-pubsub/src/test/java/zipkin2/collector/pubsub/QueueBasedSubscriberImpl.java new file mode 100644 index 0000000..4ad7195 --- /dev/null +++ b/collector-pubsub/src/test/java/zipkin2/collector/pubsub/QueueBasedSubscriberImpl.java @@ -0,0 +1,67 @@ +/* + * Copyright 2016-2022 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.collector.pubsub; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; + +import com.google.protobuf.ByteString; +import com.google.protobuf.Empty; +import com.google.pubsub.v1.AcknowledgeRequest; +import com.google.pubsub.v1.ModifyAckDeadlineRequest; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.PullRequest; +import com.google.pubsub.v1.PullResponse; +import com.google.pubsub.v1.ReceivedMessage; +import com.google.pubsub.v1.StreamingPullRequest; +import com.google.pubsub.v1.StreamingPullResponse; +import com.google.pubsub.v1.SubscriberGrpc; + +import io.grpc.Status; +import io.grpc.stub.StreamObserver; +import zipkin2.Span; +import zipkin2.codec.SpanBytesEncoder; + +public class QueueBasedSubscriberImpl extends SubscriberGrpc.SubscriberImplBase { + + private final BlockingQueue spanQueue = new LinkedBlockingDeque<>(); + + @Override + public StreamObserver streamingPull(StreamObserver responseObserver) { + return new StreamingPullStreamObserver(spanQueue, responseObserver); + } + + public void addSpans(List spans) { + spans.forEach(this::addSpan); + } + + public void addSpan(Span span) { + spanQueue.add(span); + } + + @Override + public void acknowledge(AcknowledgeRequest request, StreamObserver responseObserver) { + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + + @Override + public void modifyAckDeadline(ModifyAckDeadlineRequest request, StreamObserver responseObserver) { + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + +} diff --git a/collector-pubsub/src/test/java/zipkin2/collector/pubsub/StreamingPullStreamObserver.java b/collector-pubsub/src/test/java/zipkin2/collector/pubsub/StreamingPullStreamObserver.java new file mode 100644 index 0000000..3844a79 --- /dev/null +++ b/collector-pubsub/src/test/java/zipkin2/collector/pubsub/StreamingPullStreamObserver.java @@ -0,0 +1,102 @@ +/* + * Copyright 2016-2022 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.collector.pubsub; + +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; + +import com.google.common.util.concurrent.AbstractExecutionThreadService; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.ReceivedMessage; +import com.google.pubsub.v1.StreamingPullRequest; +import com.google.pubsub.v1.StreamingPullResponse; + +import io.grpc.Status; +import io.grpc.stub.ServerCallStreamObserver; +import io.grpc.stub.StreamObserver; +import zipkin2.Span; +import zipkin2.codec.SpanBytesEncoder; + +public class StreamingPullStreamObserver extends AbstractExecutionThreadService implements StreamObserver { + + private final BlockingQueue spanQueue; + private final ServerCallStreamObserver responseObserver; + + public StreamingPullStreamObserver(BlockingQueue spanQueue, StreamObserver responseObserver) { + this.spanQueue = spanQueue; + this.responseObserver = (ServerCallStreamObserver) responseObserver; + this.responseObserver.disableAutoInboundFlowControl(); + + this.responseObserver.setOnReadyHandler( + () -> { + if(!isRunning()) { + startAsync().awaitRunning(); + } + this.responseObserver.request(1); + }); + this.responseObserver.setOnCancelHandler(this::stopIfRunning); + + } + + @Override + protected void run() throws Exception { + while (isRunning()) { + if(responseObserver.isReady()) { + Span span = spanQueue.take(); + + StreamingPullResponse.Builder builder = StreamingPullResponse.newBuilder(); + + byte[] encoded = SpanBytesEncoder.JSON_V2.encodeList(Collections.singletonList(span)); + PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(ByteString.copyFrom(encoded)).build(); + ReceivedMessage receivedMessage = ReceivedMessage.newBuilder().setAckId(UUID.randomUUID().toString()).setMessage(pubsubMessage).build(); + + builder.addReceivedMessages(receivedMessage); + + responseObserver.onNext(builder.build()); + } else { + Thread.sleep(1000l); + } + } + } + + @Override + public void onNext(StreamingPullRequest streamingPullRequest) { + if(!isRunning()) { + startAsync().awaitRunning(); + } + responseObserver.request(1); + } + + @Override + public void onError(Throwable throwable) { + if (!Status.fromThrowable(throwable).getCode().equals(Status.CANCELLED.getCode())) { + stopIfRunning(); + } + } + + @Override + public void onCompleted() { + stopIfRunning(); + responseObserver.onCompleted(); + } + + private void stopIfRunning() { + if (isRunning()) { + stopAsync(); + } + } + +} diff --git a/pom.xml b/pom.xml index d699f9d..db22682 100644 --- a/pom.xml +++ b/pom.xml @@ -28,6 +28,8 @@ sender-stackdriver storage-stackdriver propagation-stackdriver + sender-pubsub + collector-pubsub @@ -150,6 +152,17 @@ + + io.zipkin.zipkin2 + zipkin-collector + ${zipkin.version} + + + io.zipkin.reporter2 + zipkin-reporter + ${zipkin-reporter.version} + + ${zipkin.groupId} zipkin-tests diff --git a/sender-pubsub/pom.xml b/sender-pubsub/pom.xml new file mode 100644 index 0000000..1da90e8 --- /dev/null +++ b/sender-pubsub/pom.xml @@ -0,0 +1,73 @@ + + + + + zipkin-gcp-parent + io.zipkin.gcp + 1.0.3-SNAPSHOT + + 4.0.0 + + zipkin-sender-pubsub + + Zipkin Sender: Google PubSub + + + ${project.basedir}/.. + 2.12.0 + + + + + + com.google.cloud + libraries-bom + 24.1.2 + pom + import + + + + + + + com.google.cloud + google-cloud-pubsub + + + io.zipkin.reporter2 + zipkin-reporter + ${zipkin-reporter.version} + + + io.grpc + grpc-testing + 1.43.2 + test + + + com.google.api.grpc + grpc-google-cloud-pubsub-v1 + 1.97.1 + test + + + + + \ No newline at end of file diff --git a/sender-pubsub/src/main/java/zipkin2/reporter/pubsub/PubSubSender.java b/sender-pubsub/src/main/java/zipkin2/reporter/pubsub/PubSubSender.java new file mode 100644 index 0000000..b0bdebb --- /dev/null +++ b/sender-pubsub/src/main/java/zipkin2/reporter/pubsub/PubSubSender.java @@ -0,0 +1,294 @@ +/* + * Copyright 2016-2022 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.reporter.pubsub; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.api.gax.core.ExecutorProvider; + +import com.google.api.gax.core.InstantiatingExecutorProvider; +import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.UnknownException; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.cloud.pubsub.v1.TopicAdminClient; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.Topic; +import com.google.pubsub.v1.TopicName; + +import io.grpc.StatusRuntimeException; +import zipkin2.Call; +import zipkin2.Callback; +import zipkin2.CheckResult; +import zipkin2.codec.Encoding; +import zipkin2.reporter.BytesMessageEncoder; +import zipkin2.reporter.Sender; + +public class PubSubSender extends Sender { + + public static PubSubSender create(String topic) { + return newBuilder().topic(topic).build(); + } + + public static Builder newBuilder() { + return new Builder(); + } + + public static final class Builder { + String topic; + int messageMaxBytes = 10 * 1024 * 1024; // 10MB PubSub limit. + Encoding encoding = Encoding.JSON; + Publisher publisher; + ExecutorProvider executorProvider; + TopicAdminClient topicAdminClient; + + Builder(PubSubSender pubSubSender) { + this.topic = pubSubSender.topic; + this.encoding = pubSubSender.encoding; + this.publisher = pubSubSender.publisher; + this.executorProvider = pubSubSender.executorProvider; + } + + /** PubSub topic to send spans. */ + public Builder topic(String topic) { + if (topic == null) throw new NullPointerException("topic == null"); + this.topic = topic; + return this; + } + + /** Maximum size of a message. PubSub max message size is 10MB */ + public Builder messageMaxBytes(int messageMaxBytes) { + this.messageMaxBytes = messageMaxBytes; + return this; + } + + /** + * Use this to change the encoding used in messages. Default is {@linkplain Encoding#JSON} + * + *

Note: If ultimately sending to Zipkin, version 2.8+ is required to process protobuf. + */ + public Builder encoding(Encoding encoding) { + if (encoding == null) throw new NullPointerException("encoding == null"); + this.encoding = encoding; + return this; + } + + public Builder publisher(Publisher publisher) { + if (publisher == null) throw new NullPointerException("publisher == null"); + this.publisher = publisher; + return this; + } + + /** ExecutorProvider for PubSub operations **/ + public Builder executorProvider(ExecutorProvider executorProvider) { + if (executorProvider == null) throw new NullPointerException("executorProvider == null"); + this.executorProvider = executorProvider; + return this; + } + + public Builder topicAdminClient(TopicAdminClient topicAdminClient) { + if (topicAdminClient == null) throw new NullPointerException("topicAdminClient == null"); + this.topicAdminClient = topicAdminClient; + return this; + } + + public PubSubSender build() { + if (topic == null) throw new NullPointerException("topic == null"); + + if (executorProvider == null) executorProvider = defaultExecutorProvider(); ; + + if(publisher == null) { + try { + publisher = Publisher.newBuilder(topic).setExecutorProvider(executorProvider).build(); + } catch (IOException e) { + throw new PubSubSenderInitializationException(e); + } + } + + if(topicAdminClient == null) { + try { + topicAdminClient = TopicAdminClient.create(); + } catch (IOException e) { + throw new PubSubSenderInitializationException(e); + } + } + + return new PubSubSender(this); + } + + private InstantiatingExecutorProvider defaultExecutorProvider() { + return InstantiatingExecutorProvider.newBuilder() + .setExecutorThreadCount(5 * Runtime.getRuntime().availableProcessors()) + .build(); + } + + Builder() { + } + } + + public Builder toBuilder() { + return new Builder(this); + } + + final String topic; + final int messageMaxBytes; + final Encoding encoding; + final Publisher publisher; + final ExecutorProvider executorProvider; + final TopicAdminClient topicAdminClient; + + volatile boolean closeCalled; + + PubSubSender(Builder builder) { + this.topic = builder.topic; + this.messageMaxBytes = builder.messageMaxBytes; + this.encoding = builder.encoding; + this.publisher = builder.publisher; + this.executorProvider = builder.executorProvider; + this.topicAdminClient = builder.topicAdminClient; + } + + /** + * If no permissions given sent back ok, f permissions and topic exist ok, if topic does not exist error + * @return + */ + @Override + public CheckResult check() { + try { + Topic topic = topicAdminClient.getTopic(TopicName.parse(this.topic)); + return CheckResult.OK; + } catch (ApiException e) { + return CheckResult.failed(e); + } + } + + @Override public Encoding encoding() { + return encoding; + } + + @Override + public int messageMaxBytes() { + return messageMaxBytes; + } + + @Override + public int messageSizeInBytes(List bytes) { + return encoding().listSizeInBytes(bytes); + } + + @Override + public Call sendSpans(List byteList) { + if (closeCalled) throw new IllegalStateException("closed"); + + byte[] messageBytes = BytesMessageEncoder.forEncoding(encoding()).encode(byteList); + PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(ByteString.copyFrom(messageBytes)).build(); + + return new PubSubCall(pubsubMessage); + } + + /** + * Shutdown on Publisher is not async thus moving the synchronized block to another function in order not to block until the shutdown is over + * @throws IOException + */ + @Override + public void close() throws IOException { + if(!setClosed()) { + return; + } + publisher.shutdown(); + } + + private synchronized boolean setClosed() { + if(closeCalled) { + return false; + } else { + closeCalled = true; + return true; + } + } + + @Override public final String toString() { + return "PubSubSender{topic=" + topic+ "}"; + } + + class PubSubCall extends Call.Base { + private final PubsubMessage message; + volatile ApiFuture future; + + public PubSubCall(PubsubMessage message) { + this.message = message; + } + + @Override + protected Void doExecute() throws IOException { + try { + publisher.publish(message).get(); + } catch (InterruptedException| ExecutionException e) { + throw new RuntimeException(e); + } + return null; + } + + @Override + protected void doEnqueue(Callback callback) { + future = publisher.publish(message); + ApiFutures.addCallback(future, new ApiFutureCallbackAdapter(callback), executorProvider.getExecutor()); + if (future.isCancelled()) throw new IllegalStateException("cancelled sending spans"); + } + + @Override + protected void doCancel() { + Future maybeFuture = future; + if (maybeFuture != null) maybeFuture.cancel(true); + } + + @Override + protected boolean doIsCanceled() { + Future maybeFuture = future; + return maybeFuture != null && maybeFuture.isCancelled(); + } + + @Override + public Call clone() { + PubsubMessage clone = PubsubMessage.newBuilder(message).build(); + return new PubSubCall(clone); + } + } + + static final class ApiFutureCallbackAdapter implements ApiFutureCallback { + + final Callback callback; + + public ApiFutureCallbackAdapter(Callback callback) { + this.callback = callback; + } + + @Override + public void onFailure(Throwable t) { + callback.onError(t); + } + + @Override + public void onSuccess(String result) { + callback.onSuccess(null); + } + } + +} diff --git a/sender-pubsub/src/main/java/zipkin2/reporter/pubsub/PubSubSenderInitializationException.java b/sender-pubsub/src/main/java/zipkin2/reporter/pubsub/PubSubSenderInitializationException.java new file mode 100644 index 0000000..1958d90 --- /dev/null +++ b/sender-pubsub/src/main/java/zipkin2/reporter/pubsub/PubSubSenderInitializationException.java @@ -0,0 +1,33 @@ +/* + * Copyright 2016-2022 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.reporter.pubsub; + +public class PubSubSenderInitializationException extends RuntimeException { + + public PubSubSenderInitializationException() { + } + + public PubSubSenderInitializationException(String message) { + super(message); + } + + public PubSubSenderInitializationException(String message, Throwable cause) { + super(message, cause); + } + + public PubSubSenderInitializationException(Throwable cause) { + super(cause); + } + +} diff --git a/sender-pubsub/src/test/java/zipkin2/reporter/pubsub/PubSubSenderTest.java b/sender-pubsub/src/test/java/zipkin2/reporter/pubsub/PubSubSenderTest.java new file mode 100644 index 0000000..52f1c3d --- /dev/null +++ b/sender-pubsub/src/test/java/zipkin2/reporter/pubsub/PubSubSenderTest.java @@ -0,0 +1,225 @@ +/* + * Copyright 2016-2022 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.reporter.pubsub; + +import java.io.IOException; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.junit.Rule; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +import com.google.api.gax.core.ExecutorProvider; +import com.google.api.gax.core.InstantiatingExecutorProvider; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.TransportChannel; +import com.google.api.gax.rpc.TransportChannelProvider; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.cloud.pubsub.v1.TopicAdminClient; +import com.google.cloud.pubsub.v1.stub.GrpcPublisherStub; +import com.google.cloud.pubsub.v1.stub.PublisherStub; +import com.google.cloud.pubsub.v1.stub.PublisherStubSettings; +import com.google.pubsub.v1.GetTopicRequest; +import com.google.pubsub.v1.PublishRequest; +import com.google.pubsub.v1.PublishResponse; +import com.google.pubsub.v1.PublisherGrpc; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.Topic; + +import io.grpc.ManagedChannel; +import io.grpc.Status; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.GrpcCleanupRule; +import static java.util.stream.Collectors.toList; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import zipkin2.Call; +import zipkin2.CheckResult; +import zipkin2.Span; +import static zipkin2.TestObjects.CLIENT_SPAN; +import zipkin2.codec.Encoding; +import zipkin2.codec.SpanBytesDecoder; +import zipkin2.codec.SpanBytesEncoder; + +class PubSubSenderTest { + + @Rule + public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); + + PubSubSender sender; + + private PublisherGrpc.PublisherImplBase publisherImplBase; + + @BeforeEach + void setUp() throws IOException, ExecutionException, InterruptedException { + publisherImplBase = mock(PublisherGrpc.PublisherImplBase.class); + + String serverName = InProcessServerBuilder.generateName(); + + grpcCleanup.register(InProcessServerBuilder + .forName(serverName).directExecutor().addService(publisherImplBase).build().start()); + + ExecutorProvider executorProvider = testExecutorProvider(); + + ManagedChannel managedChannel = InProcessChannelBuilder.forName(serverName).directExecutor().build(); + + TransportChannel transportChannel = GrpcTransportChannel.create(managedChannel); + TransportChannelProvider transportChannelProvider = FixedTransportChannelProvider.create(transportChannel); + + String topicName = "projects/test-project/topics/my-topic"; + Publisher publisher = Publisher.newBuilder(topicName) + .setExecutorProvider(executorProvider) + .setChannelProvider(transportChannelProvider) + .build(); + + PublisherStubSettings publisherStubSettings = PublisherStubSettings.newBuilder() + .setTransportChannelProvider(transportChannelProvider) + .build(); + PublisherStub publisherStub = GrpcPublisherStub.create(publisherStubSettings); + TopicAdminClient topicAdminClient = TopicAdminClient.create(publisherStub); + + sender = PubSubSender.newBuilder() + .topic(topicName) + .publisher(publisher) + .topicAdminClient(topicAdminClient) + .build(); + } + + private InstantiatingExecutorProvider testExecutorProvider() { + return InstantiatingExecutorProvider.newBuilder() + .setExecutorThreadCount(5 * Runtime.getRuntime().availableProcessors()) + .build(); + } + + @Test + public void sendsSpans() throws Exception { + ArgumentCaptor requestCaptor = + ArgumentCaptor.forClass(PublishRequest.class); + + doAnswer(invocationOnMock -> { + StreamObserver responseObserver = invocationOnMock.getArgument(1); + responseObserver.onNext(PublishResponse.newBuilder().addMessageIds(UUID.randomUUID().toString()).build()); + responseObserver.onCompleted(); + return null; + }).when(publisherImplBase).publish(requestCaptor.capture(), any(StreamObserver.class)); + + send(CLIENT_SPAN, CLIENT_SPAN).execute(); + + assertThat(extractSpans(requestCaptor.getValue())) + .containsExactly(CLIENT_SPAN, CLIENT_SPAN); + } + + @Test + public void sendsSpans_PROTO3() throws Exception { + ArgumentCaptor requestCaptor = + ArgumentCaptor.forClass(PublishRequest.class); + + doAnswer(invocationOnMock -> { + StreamObserver responseObserver = invocationOnMock.getArgument(1); + responseObserver.onNext(PublishResponse.newBuilder().addMessageIds(UUID.randomUUID().toString()).build()); + responseObserver.onCompleted(); + return null; + }).when(publisherImplBase).publish(requestCaptor.capture(), any(StreamObserver.class)); + + sender = sender.toBuilder().encoding(Encoding.PROTO3).build(); + + send(CLIENT_SPAN, CLIENT_SPAN).execute(); + + assertThat(extractSpans(requestCaptor.getValue())) + .containsExactly(CLIENT_SPAN, CLIENT_SPAN); + } + + @Test + public void sendsSpans_json_unicode() throws Exception { + ArgumentCaptor requestCaptor = + ArgumentCaptor.forClass(PublishRequest.class); + + doAnswer(invocationOnMock -> { + StreamObserver responseObserver = invocationOnMock.getArgument(1); + responseObserver.onNext(PublishResponse.newBuilder().addMessageIds(UUID.randomUUID().toString()).build()); + responseObserver.onCompleted(); + return null; + }).when(publisherImplBase).publish(requestCaptor.capture(), any(StreamObserver.class)); + + Span unicode = CLIENT_SPAN.toBuilder().putTag("error", "\uD83D\uDCA9").build(); + send(unicode).execute(); + + assertThat(extractSpans(requestCaptor.getValue())).containsExactly(unicode); + } + + @Test + public void checkPasses() throws Exception { + ArgumentCaptor captor = + ArgumentCaptor.forClass(GetTopicRequest.class); + + doAnswer(invocationOnMock -> { + StreamObserver responseObserver = invocationOnMock.getArgument(1); + responseObserver.onNext(Topic.newBuilder().setName("topic-name").build()); + responseObserver.onCompleted(); + return null; + }).when(publisherImplBase).getTopic(captor.capture(), any(StreamObserver.class)); + + CheckResult result = sender.check(); + assertThat(result.ok()).isTrue(); + } + + @Test + public void checkFailsWithStreamNotActive() throws Exception { + ArgumentCaptor captor = + ArgumentCaptor.forClass(GetTopicRequest.class); + + doAnswer(invocationOnMock -> { + StreamObserver responseObserver = invocationOnMock.getArgument(1); + responseObserver.onError(new io.grpc.StatusRuntimeException(Status.NOT_FOUND)); + return null; + }).when(publisherImplBase).getTopic(captor.capture(), any(StreamObserver.class)); + + CheckResult result = sender.check(); + assertThat(result.error()).isInstanceOf(ApiException.class); + } + + private List extractSpans(PublishRequest publishRequest) { + return publishRequest.getMessagesList() + .stream() + .flatMap(this::extractSpans) + .collect(Collectors.toList()); + } + + Stream extractSpans(PubsubMessage pubsubMessage) { + byte[] messageBytes = pubsubMessage.getData().toByteArray(); + + if (messageBytes[0] == '[') { + return SpanBytesDecoder.JSON_V2.decodeList(messageBytes).stream(); + } + return SpanBytesDecoder.PROTO3.decodeList(messageBytes).stream(); + } + + Call send(zipkin2.Span... spans) { + SpanBytesEncoder bytesEncoder = + sender.encoding() == Encoding.JSON ? SpanBytesEncoder.JSON_V2 : SpanBytesEncoder.PROTO3; + return sender.sendSpans(Stream.of(spans).map(bytesEncoder::encode).collect(toList())); + } + +}