Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-11580][SDK] Remove the Manager address fetche logic #11581

Merged
merged 1 commit into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.inlong.sdk.dataproxy.network.Sender;
import org.apache.inlong.sdk.dataproxy.network.SequentialID;
import org.apache.inlong.sdk.dataproxy.threads.IndexCollectThread;
import org.apache.inlong.sdk.dataproxy.threads.ManagerFetcherThread;
import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;

import org.slf4j.Logger;
Expand All @@ -52,7 +51,6 @@ public class DefaultMessageSender implements MessageSender {
private static final ConcurrentHashMap<Integer, DefaultMessageSender> CACHE_SENDER =
new ConcurrentHashMap<>();
private static final AtomicBoolean MANAGER_FETCHER_THREAD_STARTED = new AtomicBoolean(false);
private static ManagerFetcherThread managerFetcherThread;
private static final SequentialID idGenerator = new SequentialID();
private final Sender sender;
private final IndexCollectThread indexCol;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ public class ProxyClientConfig {
private String managerIP = "";
private String managerAddress;

private String managerIpLocalPath = System.getProperty("user.dir") + "/.inlong/.managerIps";
private String managerUrl = "";
private int proxyUpdateIntervalMinutes;
private int proxyUpdateMaxRetry;
Expand All @@ -54,7 +53,6 @@ public class ProxyClientConfig {
private String authSecretKey;
private String protocolType;

private boolean enableSaveManagerVIps = false;
// metric configure
private MetricConfig metricConfig = new MetricConfig();

Expand Down Expand Up @@ -211,28 +209,6 @@ public String getManagerIP() {
return managerIP;
}

public String getManagerIpLocalPath() {
return managerIpLocalPath;
}

public void setManagerIpLocalPath(String managerIpLocalPath) throws ProxysdkException {
if (StringUtils.isEmpty(managerIpLocalPath)) {
throw new ProxysdkException("managerIpLocalPath is empty.");
}
if (managerIpLocalPath.charAt(managerIpLocalPath.length() - 1) == '/') {
managerIpLocalPath = managerIpLocalPath.substring(0, managerIpLocalPath.length() - 1);
}
this.managerIpLocalPath = managerIpLocalPath + "/.managerIps";
}

public boolean isEnableSaveManagerVIps() {
return enableSaveManagerVIps;
}

public void setEnableSaveManagerVIps(boolean enable) {
this.enableSaveManagerVIps = enable;
}

public String getConfStoreBasePath() {
return confStoreBasePath;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.inlong.sdk.dataproxy.codec;

import org.apache.inlong.sdk.dataproxy.utils.LogCounter;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
Expand All @@ -29,33 +31,37 @@

public class ProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {

private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolDecoder.class);

private static final Logger logger = LoggerFactory.getLogger(ProtocolDecoder.class);
private static final LogCounter decExptCounter = new LogCounter(10, 200000, 60 * 1000L);
@Override
protected void decode(ChannelHandlerContext ctx,
ByteBuf buffer, List<Object> out) throws Exception {
buffer.markReaderIndex();
// totallen
int totalLen = buffer.readInt();
LOGGER.debug("decode totalLen : {}", totalLen);
if (totalLen != buffer.readableBytes()) {
LOGGER.error("totalLen is not equal readableBytes.total:" + totalLen
+ ";readableBytes:" + buffer.readableBytes());
if (decExptCounter.shouldPrint()) {
logger.error("Length not equal, totalLen={},readableBytes={},from={}",
totalLen, buffer.readableBytes(), ctx.channel());
}
buffer.resetReaderIndex();
throw new Exception("totalLen is not equal readableBytes.total");
}
// msgtype
int msgType = buffer.readByte() & 0x1f;

if (msgType == 4) {
LOGGER.info("debug decode");
}
if (msgType == 3 | msgType == 5) {
if (logger.isDebugEnabled()) {
logger.debug("debug decode");
}
} else if (msgType == 3 | msgType == 5) {
// bodylen
int bodyLength = buffer.readInt();
if (bodyLength >= totalLen) {
LOGGER.error("bodyLen is greater than totalLen.totalLen:" + totalLen
+ ";bodyLen:" + bodyLength);
if (decExptCounter.shouldPrint()) {
logger.error("bodyLen greater than totalLen, totalLen={},bodyLen={},from={}",
totalLen, bodyLength, ctx.channel());
}
buffer.resetReaderIndex();
throw new Exception("bodyLen is greater than totalLen.totalLen");
}
Expand All @@ -64,33 +70,33 @@ protected void decode(ChannelHandlerContext ctx,
bodyBytes = new byte[bodyLength];
buffer.readBytes(bodyBytes);
}

// attrlen
String attrInfo = "";
int attrLength = buffer.readInt();
byte[] attrBytes = null;
if (attrLength > 0) {
attrBytes = new byte[attrLength];
byte[] attrBytes = new byte[attrLength];
buffer.readBytes(attrBytes);
attrInfo = new String(attrBytes, StandardCharsets.UTF_8);
}
EncodeObject object;
if (bodyBytes == null) {
object = new EncodeObject(new String(attrBytes, StandardCharsets.UTF_8));
object = new EncodeObject(attrInfo);
} else {
object = new EncodeObject(Collections.singletonList(bodyBytes),
new String(attrBytes, StandardCharsets.UTF_8));
object = new EncodeObject(Collections.singletonList(bodyBytes), attrInfo);
}
object.setMsgtype(5);
out.add(object);
} else if (msgType == 7) {

int seqId = buffer.readInt();
int attrLen = buffer.readShort();
byte[] attrBytes = null;
String attrInfo = "";
if (attrLen > 0) {
attrBytes = new byte[attrLen];
byte[] attrBytes = new byte[attrLen];
buffer.readBytes(attrBytes);
attrInfo = new String(attrBytes, StandardCharsets.UTF_8);
}
EncodeObject object = new EncodeObject(new String(attrBytes, StandardCharsets.UTF_8));
EncodeObject object = new EncodeObject(attrInfo);
object.setMessageId(String.valueOf(seqId));

buffer.readShort();
Expand All @@ -103,15 +109,14 @@ protected void decode(ChannelHandlerContext ctx,
buffer.skipBytes(4 + 1 + 4); // skip datatime, body_ver and body_len
final short load = buffer.readShort(); // read from body
int attrLen = buffer.readShort();
byte[] attrBytes = null;
String attrInfo = "";
if (attrLen > 0) {
attrBytes = new byte[attrLen];
byte[] attrBytes = new byte[attrLen];
buffer.readBytes(attrBytes);
attrInfo = new String(attrBytes, StandardCharsets.UTF_8);
}
buffer.skipBytes(2); // skip magic

String attrs = (attrBytes == null ? "" : new String(attrBytes, StandardCharsets.UTF_8));
EncodeObject object = new EncodeObject(attrs);
EncodeObject object = new EncodeObject(attrInfo);
object.setMsgtype(8);
object.setLoad(load);
out.add(object);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,16 +99,17 @@ private ByteBuf writeToBuf8(EncodeObject object) {
if (object.isAuth()) {
msgType |= FLAG_ALLOW_AUTH;
}
int totalLength = 1 + 4 + 1 + 4 + 2 + endAttr.getBytes("utf8").length + 2;
byte[] attrData = endAttr.getBytes(StandardCharsets.UTF_8);
int totalLength = 1 + 4 + 1 + 4 + 2 + attrData.length + 2;
buf = ByteBufAllocator.DEFAULT.buffer(4 + totalLength);
buf.writeInt(totalLength);
buf.writeByte(msgType);
buf.writeInt((int) object.getDt());
buf.writeByte(1);
buf.writeInt(0);
buf.writeShort(endAttr.getBytes("utf8").length);
if (endAttr.getBytes("utf8").length > 0) {
buf.writeBytes(endAttr.getBytes("utf8"));
buf.writeShort(attrData.length);
if (attrData.length > 0) {
buf.writeBytes(attrData);
}
buf.writeShort(0xee01);
} catch (Throwable ex) {
Expand Down Expand Up @@ -160,7 +161,8 @@ private ByteBuf constructBody(byte[] body, EncodeObject object,
if (object.isCompress()) {
msgType |= FLAG_ALLOW_COMPRESS;
}
totalLength = totalLength + body.length + endAttr.getBytes("utf8").length;
byte[] attrData = endAttr.getBytes(StandardCharsets.UTF_8);
totalLength = totalLength + body.length + attrData.length;
buf = ByteBufAllocator.DEFAULT.buffer(4 + totalLength);
buf.writeInt(totalLength);
buf.writeByte(msgType);
Expand All @@ -181,8 +183,8 @@ private ByteBuf constructBody(byte[] body, EncodeObject object,
buf.writeInt(body.length);
buf.writeBytes(body);

buf.writeShort(endAttr.getBytes("utf8").length);
buf.writeBytes(endAttr.getBytes("utf8"));
buf.writeShort(attrData.length);
buf.writeBytes(attrData);
buf.writeShort(0xee01);
}
return buf;
Expand All @@ -207,7 +209,7 @@ private ByteBuf writeToBuf7(EncodeObject object) {
ByteArrayOutputStream data = new ByteArrayOutputStream();
for (byte[] entry : object.getBodylist()) {
if (totalCnt++ > 0) {
data.write("\n".getBytes("utf8"));
data.write(AttributeConstants.LINE_FEED_SEP.getBytes(StandardCharsets.UTF_8));
}
data.write(entry);
}
Expand Down Expand Up @@ -280,14 +282,15 @@ private ByteBuf writeToBuf5(EncodeObject object) {
if (object.isEncrypt()) {
msgType |= FLAG_ALLOW_ENCRYPT;
}
totalLength = totalLength + body.length + msgAttrs.getBytes("utf8").length;
byte[] attrData = msgAttrs.getBytes(StandardCharsets.UTF_8);
totalLength = totalLength + body.length + attrData.length;
buf = ByteBufAllocator.DEFAULT.buffer(4 + totalLength);
buf.writeInt(totalLength);
buf.writeByte(msgType);
buf.writeInt(body.length);
buf.writeBytes(body);
buf.writeInt(msgAttrs.getBytes("utf8").length);
buf.writeBytes(msgAttrs.getBytes("utf8"));
buf.writeInt(attrData.length);
buf.writeBytes(attrData);
}
} catch (Throwable ex) {
if (exptCounter.shouldPrint()) {
Expand Down Expand Up @@ -344,14 +347,15 @@ private ByteBuf writeToBuf3(EncodeObject object) {
if (object.isEncrypt()) {
msgType |= FLAG_ALLOW_ENCRYPT;
}
totalLength = totalLength + body.length + msgAttrs.getBytes("utf8").length;
byte[] attrData = msgAttrs.getBytes(StandardCharsets.UTF_8);
totalLength = totalLength + body.length + attrData.length;
buf = ByteBufAllocator.DEFAULT.buffer(4 + totalLength);
buf.writeInt(totalLength);
buf.writeByte(msgType);
buf.writeInt(body.length);
buf.writeBytes(body);
buf.writeInt(msgAttrs.getBytes("utf8").length);
buf.writeBytes(msgAttrs.getBytes("utf8"));
buf.writeInt(attrData.length);
buf.writeBytes(attrData);
}
} catch (Throwable ex) {
if (exptCounter.shouldPrint()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.inlong.sdk.dataproxy.common;

public enum SendResult {
INVALID_ATTRIBUTES, // including DataProxyErrCode(100,101,102,112)
OK,
INVALID_ATTRIBUTES, // including DataProxyErrCode(100,101,102,112)
TIMEOUT,
CONNECTION_BREAK,
THREAD_INTERRUPT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.net.URLEncoder;
import java.security.interfaces.RSAPublicKey;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;

public class EncryptConfigEntry implements java.io.Serializable {
Expand Down Expand Up @@ -122,7 +123,7 @@ public EncryptInfo getRsaEncryptInfo() {

@Override
public boolean equals(Object other) {
if (other == null || !(other instanceof EncryptConfigEntry)) {
if (!(other instanceof EncryptConfigEntry)) {
return false;
}
if (other == this) {
Expand All @@ -131,7 +132,7 @@ public boolean equals(Object other) {
EncryptConfigEntry info = (EncryptConfigEntry) other;
return (this.userName.equals(info.getUserName()))
&& (this.version.equals(info.getVersion()))
&& (this.pubKey == info.getPubKey());
&& (Objects.equals(this.pubKey, info.getPubKey()));
}

public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ public class HostInfo implements Comparable<HostInfo>, java.io.Serializable {
private final String hostName;
private final int portNumber;

public HostInfo(String referenceName, String hostName, int portNumber) {
this.referenceName = referenceName;
public HostInfo(String hostName, int portNumber) {
this.hostName = hostName;
this.portNumber = portNumber;
this.referenceName = hostName + ":" + portNumber;
}

public String getReferenceName() {
Expand Down
Loading
Loading