Skip to content

Commit

Permalink
[INLONG-11564][SDK] DataProxy SDK Implementation Optimization (#11581)
Browse files Browse the repository at this point in the history
Co-authored-by: gosonzhang <[email protected]>
  • Loading branch information
gosonzhang and gosonzhang authored Dec 6, 2024
1 parent 0abf31d commit dae7074
Show file tree
Hide file tree
Showing 13 changed files with 153 additions and 606 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.inlong.sdk.dataproxy.network.Sender;
import org.apache.inlong.sdk.dataproxy.network.SequentialID;
import org.apache.inlong.sdk.dataproxy.threads.IndexCollectThread;
import org.apache.inlong.sdk.dataproxy.threads.ManagerFetcherThread;
import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;

import org.slf4j.Logger;
Expand All @@ -52,7 +51,6 @@ public class DefaultMessageSender implements MessageSender {
private static final ConcurrentHashMap<Integer, DefaultMessageSender> CACHE_SENDER =
new ConcurrentHashMap<>();
private static final AtomicBoolean MANAGER_FETCHER_THREAD_STARTED = new AtomicBoolean(false);
private static ManagerFetcherThread managerFetcherThread;
private static final SequentialID idGenerator = new SequentialID();
private final Sender sender;
private final IndexCollectThread indexCol;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ public class ProxyClientConfig {
private String managerIP = "";
private String managerAddress;

private String managerIpLocalPath = System.getProperty("user.dir") + "/.inlong/.managerIps";
private String managerUrl = "";
private int proxyUpdateIntervalMinutes;
private int proxyUpdateMaxRetry;
Expand All @@ -54,7 +53,6 @@ public class ProxyClientConfig {
private String authSecretKey;
private String protocolType;

private boolean enableSaveManagerVIps = false;
// metric configure
private MetricConfig metricConfig = new MetricConfig();

Expand Down Expand Up @@ -211,28 +209,6 @@ public String getManagerIP() {
return managerIP;
}

public String getManagerIpLocalPath() {
return managerIpLocalPath;
}

public void setManagerIpLocalPath(String managerIpLocalPath) throws ProxysdkException {
if (StringUtils.isEmpty(managerIpLocalPath)) {
throw new ProxysdkException("managerIpLocalPath is empty.");
}
if (managerIpLocalPath.charAt(managerIpLocalPath.length() - 1) == '/') {
managerIpLocalPath = managerIpLocalPath.substring(0, managerIpLocalPath.length() - 1);
}
this.managerIpLocalPath = managerIpLocalPath + "/.managerIps";
}

public boolean isEnableSaveManagerVIps() {
return enableSaveManagerVIps;
}

public void setEnableSaveManagerVIps(boolean enable) {
this.enableSaveManagerVIps = enable;
}

public String getConfStoreBasePath() {
return confStoreBasePath;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.inlong.sdk.dataproxy.codec;

import org.apache.inlong.sdk.dataproxy.utils.LogCounter;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
Expand All @@ -29,33 +31,37 @@

public class ProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {

private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolDecoder.class);

private static final Logger logger = LoggerFactory.getLogger(ProtocolDecoder.class);
private static final LogCounter decExptCounter = new LogCounter(10, 200000, 60 * 1000L);
@Override
protected void decode(ChannelHandlerContext ctx,
ByteBuf buffer, List<Object> out) throws Exception {
buffer.markReaderIndex();
// totallen
int totalLen = buffer.readInt();
LOGGER.debug("decode totalLen : {}", totalLen);
if (totalLen != buffer.readableBytes()) {
LOGGER.error("totalLen is not equal readableBytes.total:" + totalLen
+ ";readableBytes:" + buffer.readableBytes());
if (decExptCounter.shouldPrint()) {
logger.error("Length not equal, totalLen={},readableBytes={},from={}",
totalLen, buffer.readableBytes(), ctx.channel());
}
buffer.resetReaderIndex();
throw new Exception("totalLen is not equal readableBytes.total");
}
// msgtype
int msgType = buffer.readByte() & 0x1f;

if (msgType == 4) {
LOGGER.info("debug decode");
}
if (msgType == 3 | msgType == 5) {
if (logger.isDebugEnabled()) {
logger.debug("debug decode");
}
} else if (msgType == 3 | msgType == 5) {
// bodylen
int bodyLength = buffer.readInt();
if (bodyLength >= totalLen) {
LOGGER.error("bodyLen is greater than totalLen.totalLen:" + totalLen
+ ";bodyLen:" + bodyLength);
if (decExptCounter.shouldPrint()) {
logger.error("bodyLen greater than totalLen, totalLen={},bodyLen={},from={}",
totalLen, bodyLength, ctx.channel());
}
buffer.resetReaderIndex();
throw new Exception("bodyLen is greater than totalLen.totalLen");
}
Expand All @@ -64,33 +70,33 @@ protected void decode(ChannelHandlerContext ctx,
bodyBytes = new byte[bodyLength];
buffer.readBytes(bodyBytes);
}

// attrlen
String attrInfo = "";
int attrLength = buffer.readInt();
byte[] attrBytes = null;
if (attrLength > 0) {
attrBytes = new byte[attrLength];
byte[] attrBytes = new byte[attrLength];
buffer.readBytes(attrBytes);
attrInfo = new String(attrBytes, StandardCharsets.UTF_8);
}
EncodeObject object;
if (bodyBytes == null) {
object = new EncodeObject(new String(attrBytes, StandardCharsets.UTF_8));
object = new EncodeObject(attrInfo);
} else {
object = new EncodeObject(Collections.singletonList(bodyBytes),
new String(attrBytes, StandardCharsets.UTF_8));
object = new EncodeObject(Collections.singletonList(bodyBytes), attrInfo);
}
object.setMsgtype(5);
out.add(object);
} else if (msgType == 7) {

int seqId = buffer.readInt();
int attrLen = buffer.readShort();
byte[] attrBytes = null;
String attrInfo = "";
if (attrLen > 0) {
attrBytes = new byte[attrLen];
byte[] attrBytes = new byte[attrLen];
buffer.readBytes(attrBytes);
attrInfo = new String(attrBytes, StandardCharsets.UTF_8);
}
EncodeObject object = new EncodeObject(new String(attrBytes, StandardCharsets.UTF_8));
EncodeObject object = new EncodeObject(attrInfo);
object.setMessageId(String.valueOf(seqId));

buffer.readShort();
Expand All @@ -103,15 +109,14 @@ protected void decode(ChannelHandlerContext ctx,
buffer.skipBytes(4 + 1 + 4); // skip datatime, body_ver and body_len
final short load = buffer.readShort(); // read from body
int attrLen = buffer.readShort();
byte[] attrBytes = null;
String attrInfo = "";
if (attrLen > 0) {
attrBytes = new byte[attrLen];
byte[] attrBytes = new byte[attrLen];
buffer.readBytes(attrBytes);
attrInfo = new String(attrBytes, StandardCharsets.UTF_8);
}
buffer.skipBytes(2); // skip magic

String attrs = (attrBytes == null ? "" : new String(attrBytes, StandardCharsets.UTF_8));
EncodeObject object = new EncodeObject(attrs);
EncodeObject object = new EncodeObject(attrInfo);
object.setMsgtype(8);
object.setLoad(load);
out.add(object);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,16 +99,17 @@ private ByteBuf writeToBuf8(EncodeObject object) {
if (object.isAuth()) {
msgType |= FLAG_ALLOW_AUTH;
}
int totalLength = 1 + 4 + 1 + 4 + 2 + endAttr.getBytes("utf8").length + 2;
byte[] attrData = endAttr.getBytes(StandardCharsets.UTF_8);
int totalLength = 1 + 4 + 1 + 4 + 2 + attrData.length + 2;
buf = ByteBufAllocator.DEFAULT.buffer(4 + totalLength);
buf.writeInt(totalLength);
buf.writeByte(msgType);
buf.writeInt((int) object.getDt());
buf.writeByte(1);
buf.writeInt(0);
buf.writeShort(endAttr.getBytes("utf8").length);
if (endAttr.getBytes("utf8").length > 0) {
buf.writeBytes(endAttr.getBytes("utf8"));
buf.writeShort(attrData.length);
if (attrData.length > 0) {
buf.writeBytes(attrData);
}
buf.writeShort(0xee01);
} catch (Throwable ex) {
Expand Down Expand Up @@ -160,7 +161,8 @@ private ByteBuf constructBody(byte[] body, EncodeObject object,
if (object.isCompress()) {
msgType |= FLAG_ALLOW_COMPRESS;
}
totalLength = totalLength + body.length + endAttr.getBytes("utf8").length;
byte[] attrData = endAttr.getBytes(StandardCharsets.UTF_8);
totalLength = totalLength + body.length + attrData.length;
buf = ByteBufAllocator.DEFAULT.buffer(4 + totalLength);
buf.writeInt(totalLength);
buf.writeByte(msgType);
Expand All @@ -181,8 +183,8 @@ private ByteBuf constructBody(byte[] body, EncodeObject object,
buf.writeInt(body.length);
buf.writeBytes(body);

buf.writeShort(endAttr.getBytes("utf8").length);
buf.writeBytes(endAttr.getBytes("utf8"));
buf.writeShort(attrData.length);
buf.writeBytes(attrData);
buf.writeShort(0xee01);
}
return buf;
Expand All @@ -207,7 +209,7 @@ private ByteBuf writeToBuf7(EncodeObject object) {
ByteArrayOutputStream data = new ByteArrayOutputStream();
for (byte[] entry : object.getBodylist()) {
if (totalCnt++ > 0) {
data.write("\n".getBytes("utf8"));
data.write(AttributeConstants.LINE_FEED_SEP.getBytes(StandardCharsets.UTF_8));
}
data.write(entry);
}
Expand Down Expand Up @@ -280,14 +282,15 @@ private ByteBuf writeToBuf5(EncodeObject object) {
if (object.isEncrypt()) {
msgType |= FLAG_ALLOW_ENCRYPT;
}
totalLength = totalLength + body.length + msgAttrs.getBytes("utf8").length;
byte[] attrData = msgAttrs.getBytes(StandardCharsets.UTF_8);
totalLength = totalLength + body.length + attrData.length;
buf = ByteBufAllocator.DEFAULT.buffer(4 + totalLength);
buf.writeInt(totalLength);
buf.writeByte(msgType);
buf.writeInt(body.length);
buf.writeBytes(body);
buf.writeInt(msgAttrs.getBytes("utf8").length);
buf.writeBytes(msgAttrs.getBytes("utf8"));
buf.writeInt(attrData.length);
buf.writeBytes(attrData);
}
} catch (Throwable ex) {
if (exptCounter.shouldPrint()) {
Expand Down Expand Up @@ -344,14 +347,15 @@ private ByteBuf writeToBuf3(EncodeObject object) {
if (object.isEncrypt()) {
msgType |= FLAG_ALLOW_ENCRYPT;
}
totalLength = totalLength + body.length + msgAttrs.getBytes("utf8").length;
byte[] attrData = msgAttrs.getBytes(StandardCharsets.UTF_8);
totalLength = totalLength + body.length + attrData.length;
buf = ByteBufAllocator.DEFAULT.buffer(4 + totalLength);
buf.writeInt(totalLength);
buf.writeByte(msgType);
buf.writeInt(body.length);
buf.writeBytes(body);
buf.writeInt(msgAttrs.getBytes("utf8").length);
buf.writeBytes(msgAttrs.getBytes("utf8"));
buf.writeInt(attrData.length);
buf.writeBytes(attrData);
}
} catch (Throwable ex) {
if (exptCounter.shouldPrint()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.inlong.sdk.dataproxy.common;

public enum SendResult {
INVALID_ATTRIBUTES, // including DataProxyErrCode(100,101,102,112)
OK,
INVALID_ATTRIBUTES, // including DataProxyErrCode(100,101,102,112)
TIMEOUT,
CONNECTION_BREAK,
THREAD_INTERRUPT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.net.URLEncoder;
import java.security.interfaces.RSAPublicKey;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;

public class EncryptConfigEntry implements java.io.Serializable {
Expand Down Expand Up @@ -122,7 +123,7 @@ public EncryptInfo getRsaEncryptInfo() {

@Override
public boolean equals(Object other) {
if (other == null || !(other instanceof EncryptConfigEntry)) {
if (!(other instanceof EncryptConfigEntry)) {
return false;
}
if (other == this) {
Expand All @@ -131,7 +132,7 @@ public boolean equals(Object other) {
EncryptConfigEntry info = (EncryptConfigEntry) other;
return (this.userName.equals(info.getUserName()))
&& (this.version.equals(info.getVersion()))
&& (this.pubKey == info.getPubKey());
&& (Objects.equals(this.pubKey, info.getPubKey()));
}

public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ public class HostInfo implements Comparable<HostInfo>, java.io.Serializable {
private final String hostName;
private final int portNumber;

public HostInfo(String referenceName, String hostName, int portNumber) {
this.referenceName = referenceName;
public HostInfo(String hostName, int portNumber) {
this.hostName = hostName;
this.portNumber = portNumber;
this.referenceName = hostName + ":" + portNumber;
}

public String getReferenceName() {
Expand Down
Loading

0 comments on commit dae7074

Please sign in to comment.