Skip to content

Commit

Permalink
Support collect OTLP traces
Browse files Browse the repository at this point in the history
  • Loading branch information
mrproliu committed Oct 31, 2023
1 parent 04feb90 commit 77845e8
Show file tree
Hide file tree
Showing 32 changed files with 662 additions and 131 deletions.
7 changes: 7 additions & 0 deletions zipkin-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
<module>../skywalking/oap-server/server-core</module>
<module>../skywalking/oap-server/server-receiver-plugin/receiver-proto</module>
<module>../skywalking/oap-server/server-receiver-plugin/zipkin-receiver-plugin</module>
<module>../skywalking/oap-server/server-receiver-plugin/otel-receiver-plugin</module>
<module>../skywalking/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin</module>
<module>../skywalking/oap-server/server-cluster-plugin/cluster-standalone-plugin</module>
<module>../skywalking/oap-server/server-cluster-plugin/cluster-consul-plugin</module>
<module>../skywalking/oap-server/server-cluster-plugin/cluster-etcd-plugin</module>
Expand All @@ -59,6 +61,9 @@
<module>../skywalking/oap-server/server-testing</module>
<module>../skywalking/oap-server/server-configuration/configuration-api</module>
<module>../skywalking/oap-server/ai-pipeline</module>
<module>../skywalking/oap-server/analyzer/meter-analyzer</module>
<module>../skywalking/oap-server/analyzer/log-analyzer</module>
<module>../skywalking/oap-server/analyzer/agent-analyzer</module>

<module>server-core</module>
<module>server-starter</module>
Expand All @@ -73,6 +78,8 @@
<module>telemetry-zipkin</module>
<module>zipkin-dependency</module>
<module>zipkin-storage-ext</module>
<module>receiver-zipkin-core</module>
<module>receiver-otlp-trace</module>
</modules>

<dependencyManagement>
Expand Down
37 changes: 37 additions & 0 deletions zipkin-server/receiver-otlp-trace/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>zipkin-server-parent</artifactId>
<groupId>io.zipkin</groupId>
<version>2.24.4-SNAPSHOT</version>
</parent>

<artifactId>receiver-otlp-trace</artifactId>
<name>OTLP Trace Receiver</name>

<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>otel-receiver-plugin</artifactId>
<version>${skywalking.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.skywalking</groupId>
<artifactId>skywalking-sharing-server-plugin</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.skywalking</groupId>
<artifactId>meter-analyzer</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.skywalking</groupId>
<artifactId>log-analyzer</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright 2015-2023 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package zipkin.server.receiver.otlp;

import org.apache.skywalking.oap.server.library.module.ModuleConfig;

public class OTLPTraceConfig extends ModuleConfig {
private String gRPCHost;
/**
* Only setting the real port(not 0) makes the gRPC server online.
*/
private int gRPCPort;
private int maxConcurrentCallsPerConnection;
private int maxMessageSize;
private int gRPCThreadPoolSize;
private int gRPCThreadPoolQueueSize;
private String authentication;
private boolean gRPCSslEnabled = false;
private String gRPCSslKeyPath;
private String gRPCSslCertChainPath;
private String gRPCSslTrustedCAsPath;

public String getGRPCHost() {
return gRPCHost;
}

public void setGRPCHost(String gRPCHost) {
this.gRPCHost = gRPCHost;
}

public int getGRPCPort() {
return gRPCPort;
}

public void setGRPCPort(int gRPCPort) {
this.gRPCPort = gRPCPort;
}

public int getMaxConcurrentCallsPerConnection() {
return maxConcurrentCallsPerConnection;
}

public void setMaxConcurrentCallsPerConnection(int maxConcurrentCallsPerConnection) {
this.maxConcurrentCallsPerConnection = maxConcurrentCallsPerConnection;
}

public int getMaxMessageSize() {
return maxMessageSize;
}

public void setMaxMessageSize(int maxMessageSize) {
this.maxMessageSize = maxMessageSize;
}

public int getGRPCThreadPoolSize() {
return gRPCThreadPoolSize;
}

public void setGRPCThreadPoolSize(int gRPCThreadPoolSize) {
this.gRPCThreadPoolSize = gRPCThreadPoolSize;
}

public int getGRPCThreadPoolQueueSize() {
return gRPCThreadPoolQueueSize;
}

public void setGRPCThreadPoolQueueSize(int gRPCThreadPoolQueueSize) {
this.gRPCThreadPoolQueueSize = gRPCThreadPoolQueueSize;
}

public String getAuthentication() {
return authentication;
}

public void setAuthentication(String authentication) {
this.authentication = authentication;
}

public boolean getGRPCSslEnabled() {
return gRPCSslEnabled;
}

public void setGRPCSslEnabled(boolean gRPCSslEnabled) {
this.gRPCSslEnabled = gRPCSslEnabled;
}

public String getGRPCSslKeyPath() {
return gRPCSslKeyPath;
}

public void setGRPCSslKeyPath(String gRPCSslKeyPath) {
this.gRPCSslKeyPath = gRPCSslKeyPath;
}

public String getGRPCSslCertChainPath() {
return gRPCSslCertChainPath;
}

public void setGRPCSslCertChainPath(String gRPCSslCertChainPath) {
this.gRPCSslCertChainPath = gRPCSslCertChainPath;
}

public String getGRPCSslTrustedCAsPath() {
return gRPCSslTrustedCAsPath;
}

public void setGRPCSslTrustedCAsPath(String gRPCSslTrustedCAsPath) {
this.gRPCSslTrustedCAsPath = gRPCSslTrustedCAsPath;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright 2015-2023 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package zipkin.server.receiver.otlp;

import org.apache.logging.log4j.util.Strings;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegisterImpl;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.library.server.ServerException;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
import org.apache.skywalking.oap.server.receiver.otel.OtelMetricReceiverConfig;
import org.apache.skywalking.oap.server.receiver.otel.OtelMetricReceiverModule;
import org.apache.skywalking.oap.server.receiver.otel.otlp.OpenTelemetryMetricRequestProcessor;
import org.apache.skywalking.oap.server.receiver.otel.otlp.OpenTelemetryTraceHandler;
import zipkin.server.receiver.otlp.handler.OTLPTraceHandler;

public class OTLPTraceProvider extends ModuleProvider {
private OTLPTraceConfig moduleConfig;
private OpenTelemetryTraceHandler traceHandler;
private GRPCServer grpcServer;

@Override
public String name() {
return "zipkin";
}

@Override
public Class<? extends ModuleDefine> module() {
return OtelMetricReceiverModule.class;
}

@Override
public ConfigCreator<? extends ModuleConfig> newConfigCreator() {
return new ConfigCreator<OTLPTraceConfig>() {

@Override
public Class<OTLPTraceConfig> type() {
return OTLPTraceConfig.class;
}

@Override
public void onInitialized(OTLPTraceConfig initialized) {
moduleConfig = initialized;
}
};
}

@Override
public void prepare() throws ServiceNotProvidedException, ModuleStartException {
this.registerServiceImplementation(OpenTelemetryMetricRequestProcessor.class,
new OpenTelemetryMetricRequestProcessor(getManager(), new OtelMetricReceiverConfig()));
}

@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
GRPCHandlerRegister handlerRegister;
if (moduleConfig.getGRPCPort() > 0) {
if (moduleConfig.getGRPCSslEnabled()) {
grpcServer = new GRPCServer(
Strings.isBlank(moduleConfig.getGRPCHost()) ? "0.0.0.0" : moduleConfig.getGRPCHost(),
moduleConfig.getGRPCPort(),
moduleConfig.getGRPCSslCertChainPath(),
moduleConfig.getGRPCSslKeyPath(),
moduleConfig.getGRPCSslTrustedCAsPath()
);
} else {
grpcServer = new GRPCServer(
Strings.isBlank(moduleConfig.getGRPCHost()) ? "0.0.0.0" : moduleConfig.getGRPCHost(),
moduleConfig.getGRPCPort()
);
}
if (moduleConfig.getMaxMessageSize() > 0) {
grpcServer.setMaxMessageSize(moduleConfig.getMaxMessageSize());
}
if (moduleConfig.getMaxConcurrentCallsPerConnection() > 0) {
grpcServer.setMaxConcurrentCallsPerConnection(moduleConfig.getMaxConcurrentCallsPerConnection());
}
if (moduleConfig.getGRPCThreadPoolQueueSize() > 0) {
grpcServer.setThreadPoolQueueSize(moduleConfig.getGRPCThreadPoolQueueSize());
}
if (moduleConfig.getGRPCThreadPoolSize() > 0) {
grpcServer.setThreadPoolSize(moduleConfig.getGRPCThreadPoolSize());
}
grpcServer.initialize();

handlerRegister = new GRPCHandlerRegisterImpl(grpcServer);
} else {
handlerRegister = getManager().find(CoreModule.NAME).provider().getService(GRPCHandlerRegister.class);
}
traceHandler = new OTLPTraceHandler(handlerRegister, getManager());
traceHandler.active();
}

@Override
public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
if (grpcServer != null) {
try {
grpcServer.start();
} catch (ServerException e) {
throw new ModuleStartException(e.getMessage(), e);
}
}
}

@Override
public String[] requiredModules() {
return new String[0];
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2015-2023 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package zipkin.server.receiver.otlp.handler;

import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.receiver.otel.otlp.OpenTelemetryTraceHandler;

public class OTLPTraceHandler extends OpenTelemetryTraceHandler {
private final GRPCHandlerRegister register;
public OTLPTraceHandler(GRPCHandlerRegister register, ModuleManager manager) {
super(manager);
this.register = register;
}

@Override
public void active() throws ModuleStartException {
register.addHandler(this);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#

zipkin.server.receiver.otlp.OTLPTraceProvider
8 changes: 1 addition & 7 deletions zipkin-server/receiver-zipkin-activemq/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,10 @@
<dependencies>
<dependency>
<groupId>io.zipkin</groupId>
<artifactId>zipkin-server-core</artifactId>
<artifactId>receiver-zipkin-core</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>zipkin-receiver-plugin</artifactId>
<version>${skywalking.version}</version>
</dependency>

<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
Expand Down
Loading

0 comments on commit 77845e8

Please sign in to comment.