Skip to content

Commit aa41c04

Browse files
tigerLuHaihailu
andauthored
add list parse for rdb &pass gtid.lwm & add unknow command parse (ctripcorp#769)
* pass gtid.lwm & add unknow command parse * add unknown op lwm --------- Co-authored-by: hailu <[email protected]>
1 parent b903aab commit aa41c04

File tree

9 files changed

+387
-172
lines changed

9 files changed

+387
-172
lines changed

redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/redis/operation/RedisOpType.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public enum RedisOpType {
8383
MSETNX(true, -3),
8484

8585
// ctrip
86-
GTID_LWM(false, 3, true),
86+
GTID_LWM(false, 3, false),
8787
CTRIP_MERGE_START(false, -1, true),
8888
CTRIP_MERGE_END(false, -2, true),
8989

@@ -98,7 +98,7 @@ public enum RedisOpType {
9898
EXEC(false, 1),
9999
SCRIPT(false, -2),
100100
MOVE(false, 3),
101-
UNKNOWN(false, -1);
101+
UNKNOWN(false, -1, true);
102102

103103
// Support multi key or not
104104
private boolean supportMultiKey;

redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/redis/operation/parser/RedisOpNoneKeyEnum.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@ public enum RedisOpNoneKeyEnum {
1313
PING(RedisOpType.PING),
1414
MULT(RedisOpType.MULTI),
1515
EXEC(RedisOpType.EXEC),
16-
SCRIPT(RedisOpType.SCRIPT);
16+
SCRIPT(RedisOpType.SCRIPT),
17+
18+
UNKNOW(RedisOpType.UNKNOWN);
1719

1820
private RedisOpType redisOpType;
1921

redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/redis/rdb/RdbParseContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public interface RdbParseContext {
6060
enum RdbType {
6161

6262
STRING(RdbConstant.REDIS_RDB_TYPE_STRING, false, RdbStringParser::new),
63-
// LIST(RdbConstant.REDIS_RDB_TYPE_LIST),
63+
LIST(RdbConstant.REDIS_RDB_TYPE_LIST,false,RdbListParser::new),
6464
SET(RdbConstant.REDIS_RDB_TYPE_SET, false, RdbSetParser::new),
6565
// ZSET(RdbConstant.REDIS_RDB_TYPE_ZSET),
6666
HASH(RdbConstant.REDIS_RDB_TYPE_HASH, false, RdbHashParser::new),
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
package com.ctrip.xpipe.redis.core.redis.rdb.parser;
2+
3+
import com.ctrip.xpipe.redis.core.redis.exception.RdbParseEmptyKeyException;
4+
import com.ctrip.xpipe.redis.core.redis.operation.RedisOpType;
5+
import com.ctrip.xpipe.redis.core.redis.operation.op.RedisOpSingleKey;
6+
import com.ctrip.xpipe.redis.core.redis.rdb.RdbLength;
7+
import com.ctrip.xpipe.redis.core.redis.rdb.RdbParseContext;
8+
import com.ctrip.xpipe.redis.core.redis.rdb.RdbParser;
9+
import io.netty.buffer.ByteBuf;
10+
import org.slf4j.Logger;
11+
import org.slf4j.LoggerFactory;
12+
13+
/**
14+
* @author hailu
15+
* @date 2024/1/17 19:06
16+
*/
17+
public class RdbListParser extends AbstractRdbParser<Integer> implements RdbParser<Integer> {
18+
19+
private RdbParseContext context;
20+
21+
private RdbParser<byte[]> rdbStringParser;
22+
23+
private RdbLength len;
24+
25+
private int readCnt;
26+
27+
private STATE state = STATE.READ_INIT;
28+
29+
private static final Logger logger = LoggerFactory.getLogger(RdbListParser.class);
30+
31+
enum STATE {
32+
READ_INIT,
33+
READ_LEN,
34+
READ_VALUE,
35+
READ_END
36+
}
37+
38+
public RdbListParser(RdbParseContext parseContext) {
39+
this.context = parseContext;
40+
this.rdbStringParser = (RdbParser<byte[]>) context.getOrCreateParser(RdbParseContext.RdbType.STRING);
41+
}
42+
43+
@Override
44+
public Integer read(ByteBuf byteBuf) {
45+
46+
while (!isFinish() && byteBuf.readableBytes() > 0) {
47+
48+
switch (state) {
49+
50+
case READ_INIT:
51+
len = null;
52+
readCnt = 0;
53+
state = STATE.READ_LEN;
54+
break;
55+
56+
case READ_LEN:
57+
len = parseRdbLength(byteBuf);
58+
if (null != len) {
59+
if (len.getLenValue() > 0) {
60+
state = STATE.READ_VALUE;
61+
} else {
62+
throw new RdbParseEmptyKeyException("set key " + context.getKey());
63+
}
64+
}
65+
break;
66+
67+
case READ_VALUE:
68+
byte[] value = rdbStringParser.read(byteBuf);
69+
if (null != value) {
70+
rdbStringParser.reset();
71+
propagateCmdIfNeed(value);
72+
73+
readCnt++;
74+
if (readCnt >= len.getLenValue()) {
75+
state = STATE.READ_END;
76+
} else {
77+
state = STATE.READ_VALUE;
78+
}
79+
}
80+
break;
81+
82+
case READ_END:
83+
default:
84+
85+
}
86+
87+
if (isFinish()) {
88+
propagateExpireAtIfNeed(context.getKey(), context.getExpireMilli());
89+
}
90+
}
91+
92+
if (isFinish()) return len.getLenValue();
93+
else return null;
94+
}
95+
96+
private void propagateCmdIfNeed(byte[] value) {
97+
if (null == value || null == context.getKey()) {
98+
return;
99+
}
100+
101+
notifyRedisOp(new RedisOpSingleKey(
102+
RedisOpType.RPUSH,
103+
new byte[][] {RedisOpType.RPUSH.name().getBytes(), context.getKey().get(), value},
104+
context.getKey(), value));
105+
}
106+
107+
@Override
108+
public boolean isFinish() {
109+
return STATE.READ_END.equals(state);
110+
}
111+
112+
@Override
113+
public void reset() {
114+
super.reset();
115+
if (rdbStringParser != null) {
116+
rdbStringParser.reset();
117+
}
118+
this.state = STATE.READ_INIT;
119+
}
120+
121+
@Override
122+
protected Logger getLogger() {
123+
return logger;
124+
}
125+
}

redis/redis-core/src/test/java/com/ctrip/xpipe/redis/core/redis/parser/GeneralRedisOpParserTest.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,24 @@ public void testCtripGtidLwmParse() {
5959
RedisSingleKeyOp redisSingleKeyOp = (RedisSingleKeyOp) redisOp;
6060
Assert.assertArrayEquals("24d9e2513182d156cbd999df5ebedf24e7634140".getBytes(), redisSingleKeyOp.getKey().get());
6161
Assert.assertArrayEquals("1494763841".getBytes(), redisSingleKeyOp.getValue());
62+
Assert.assertFalse(redisOp.getOpType().isSwallow());
63+
}
64+
@Test
65+
public void testUnknowParse() {
66+
byte[][] rawOpArgs = {"unknow".getBytes(), "unknow_key".getBytes(), "unknow_value".getBytes()};
67+
RedisOp redisOp = parser.parse(rawOpArgs);
68+
Assert.assertEquals(RedisOpType.UNKNOWN, redisOp.getOpType());
69+
Assert.assertNull(redisOp.getOpGtid());
70+
Assert.assertArrayEquals(rawOpArgs, redisOp.buildRawOpArgs());
71+
byte[][] rawOpArgs1 = {"credis_flushall".getBytes()};
72+
redisOp = parser.parse(rawOpArgs1);
73+
Assert.assertEquals(RedisOpType.UNKNOWN, redisOp.getOpType());
74+
Assert.assertNull(redisOp.getOpGtid());
75+
Assert.assertArrayEquals(rawOpArgs1, redisOp.buildRawOpArgs());
76+
77+
RedisSingleKeyOp redisSingleKeyOp = (RedisSingleKeyOp) redisOp;
78+
Assert.assertNull(redisSingleKeyOp.getKey());
79+
Assert.assertNull(redisSingleKeyOp.getValue());
6280
Assert.assertTrue(redisOp.getOpType().isSwallow());
6381
}
6482

@@ -138,10 +156,11 @@ public void testPingParse() {
138156
Assert.assertFalse(redisOp.getOpType().isSwallow());
139157
}
140158

141-
@Test(expected = UnsupportedOperationException.class)
159+
@Test
142160
public void testNoneExistsCmdParse() {
143161
RedisOp redisOp = parser.parse(Arrays.asList("EMPTY", "0").toArray());
144162
Assert.assertEquals(RedisOpType.UNKNOWN, redisOp.getOpType());
163+
Assert.assertTrue(redisOp.getOpType().isSwallow());
145164
}
146165

147166
@Test(expected = IllegalArgumentException.class)

redis/redis-core/src/test/java/com/ctrip/xpipe/redis/core/redis/rdb/parser/DefaultRdbParserTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,20 @@ public void testParseSet() {
146146
Assert.assertEquals("SADD set 13927438904093012", redisOps.get(3).toString());
147147
Assert.assertEquals("SADD set v1", redisOps.get(4).toString());
148148
}
149+
@Test
150+
public void testParseList() {
151+
ByteBuf byteBuf = Unpooled.wrappedBuffer(listRdbBytes);
152+
while (!parser.isFinish()) {
153+
parser.read(byteBuf);
154+
}
155+
156+
Assert.assertEquals("SELECT 0", redisOps.get(0).toString());
157+
Assert.assertEquals("RPUSH test_0_7743 aaa0_0_530", redisOps.get(1).toString());
158+
Assert.assertEquals("RPUSH test_0_7743 aaa0_0_454", redisOps.get(2).toString());
159+
Assert.assertEquals("RPUSH test_0_7743 aaa1_1_39", redisOps.get(3).toString());
160+
Assert.assertEquals("RPUSH test_0_7743 aaa1_1_244", redisOps.get(4).toString());
161+
162+
}
149163

150164
@Test
151165
public void testParseZiplistZSet() {

0 commit comments

Comments
 (0)