From c4df6b3b217d1ce7c78b378a19665279d48d46f4 Mon Sep 17 00:00:00 2001 From: funkye Date: Sat, 6 Apr 2024 14:41:44 +0800 Subject: [PATCH] feature: support get protocol (#3) --- .github/workflows/build.yml | 8 +- pom.xml | 52 +++++++++ .../factory/AeroSpikeClientFactory.java | 19 +++- .../protocol/RedisCommandDecoder.java | 3 + .../protocol/RedisCommandHandler.java | 57 +++++++++- .../protocol/request/GetRequest.java | 47 ++++++++ src/main/resources/logback.xml | 100 ++++++++++++++++++ src/test/java/org/redis2asp/ServerTest.java | 19 +++- 8 files changed, 290 insertions(+), 15 deletions(-) create mode 100644 src/main/java/org/redis2asp/protocol/request/GetRequest.java create mode 100644 src/main/resources/logback.xml diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index c65a0e0..72964d7 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -41,15 +41,11 @@ jobs: - name: "Test, Check style, Check PMD, Check license with Maven and Java8" if: matrix.java == '8' run: | - ./mvnw -T 4C clean test \ - -Dmaven.git-commit-id.skip=true \ - -e -B -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn; + ./mvnw -T 4C clean test - name: "Test with Maven and Java${{ matrix.java }}" if: matrix.java != '8' run: | - ./mvnw -T 4C clean test \ - -Dmaven.git-commit-id.skip=true \ - -e -B -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn; + ./mvnw -T 4C clean test - name: "Codecov" if: matrix.java == '8' uses: codecov/codecov-action@v3.1.4 diff --git a/pom.xml b/pom.xml index 9b7ea9d..f224096 100644 --- a/pom.xml +++ b/pom.xml @@ -16,8 +16,24 @@ 0.3.1 5.8.2 1.6.0 + 1.2.9 + + org.slf4j + slf4j-api + 1.7.6 + + + ch.qos.logback + logback-core + ${log.version} + + + ch.qos.logback + logback-classic + ${log.version} + com.aerospike aerospike-client @@ -86,6 +102,42 @@ ${project.build.sourceEncoding} + + com.mycila + license-maven-plugin + 3.0 + + + generate-sources + + remove + format + + + + + true +
${main.user.dir}/tools/codestyle/HEADER
+ + **/src/main/java/** + **/src/test/java/** + + + + true + + SLASHSTAR_STYLE + +
+
+ + org.apache.maven.plugins + maven-eclipse-plugin + 2.5.1 + + true + + \ No newline at end of file diff --git a/src/main/java/org/redis2asp/factory/AeroSpikeClientFactory.java b/src/main/java/org/redis2asp/factory/AeroSpikeClientFactory.java index e85d780..6bee1f8 100644 --- a/src/main/java/org/redis2asp/factory/AeroSpikeClientFactory.java +++ b/src/main/java/org/redis2asp/factory/AeroSpikeClientFactory.java @@ -21,7 +21,14 @@ import com.aerospike.client.AerospikeClient; import com.aerospike.client.Host; import com.aerospike.client.IAerospikeClient; +import com.aerospike.client.async.EventLoops; +import com.aerospike.client.async.EventPolicy; +import com.aerospike.client.async.NettyEventLoops; +import com.aerospike.client.async.NioEventLoops; import com.aerospike.client.policy.ClientPolicy; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.Epoll; +import io.netty.channel.epoll.EpollEventLoopGroup; public class AeroSpikeClientFactory { @@ -32,19 +39,29 @@ public class AeroSpikeClientFactory { public static volatile String namespace; public static volatile String set; + public static volatile EventLoops eventLoops; + static { + EventPolicy eventPolicy = new EventPolicy(); + if (Epoll.isAvailable()) { + EventLoopGroup group = new EpollEventLoopGroup(Runtime.getRuntime().availableProcessors()); + eventLoops = new NettyEventLoops(eventPolicy, group); + } else { + eventLoops = new NioEventLoops(eventPolicy, Runtime.getRuntime().availableProcessors()); + } + } /** * Return either a native Aerospike client or a proxy client, based on isProxy. * * @param clientPolicy client configuration parameters, pass in null for defaults * @param hosts array of server hosts that the client can connect - * @return IAerospikeClient */ public static void createInstance(ClientPolicy clientPolicy, Host... hosts) { if (client == null) { LOCK.lock(); try { if (client == null) { + clientPolicy.eventLoops = eventLoops; client = new AerospikeClient(clientPolicy, hosts); } } finally { diff --git a/src/main/java/org/redis2asp/protocol/RedisCommandDecoder.java b/src/main/java/org/redis2asp/protocol/RedisCommandDecoder.java index 852538a..6b612cf 100644 --- a/src/main/java/org/redis2asp/protocol/RedisCommandDecoder.java +++ b/src/main/java/org/redis2asp/protocol/RedisCommandDecoder.java @@ -24,6 +24,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.DecoderException; import org.redis2asp.protocol.request.CommandRequest; +import org.redis2asp.protocol.request.GetRequest; import org.redis2asp.protocol.request.SetRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,6 +54,8 @@ private RedisRequest convert2RedisRequest(List params) { String cmd = params.get(0); LOGGER.info("cmd: {}", cmd); switch (cmd.toLowerCase()) { + case "get": + return new GetRequest(params.get(1)); case "command": return new CommandRequest(); case "set": diff --git a/src/main/java/org/redis2asp/protocol/RedisCommandHandler.java b/src/main/java/org/redis2asp/protocol/RedisCommandHandler.java index d52b740..7f07ea8 100644 --- a/src/main/java/org/redis2asp/protocol/RedisCommandHandler.java +++ b/src/main/java/org/redis2asp/protocol/RedisCommandHandler.java @@ -18,36 +18,83 @@ import java.nio.charset.StandardCharsets; import java.util.concurrent.ExecutorService; +import com.aerospike.client.AerospikeException; import com.aerospike.client.Bin; import com.aerospike.client.IAerospikeClient; import com.aerospike.client.Key; +import com.aerospike.client.Record; +import com.aerospike.client.listener.RecordListener; +import com.aerospike.client.listener.WriteListener; import com.alipay.remoting.CommandCode; import com.alipay.remoting.CommandHandler; import com.alipay.remoting.RemotingContext; import com.alipay.remoting.RemotingProcessor; +import com.alipay.sofa.common.profile.StringUtil; import org.redis2asp.factory.AeroSpikeClientFactory; import org.redis2asp.protocol.request.CommandRequest; +import org.redis2asp.protocol.request.GetRequest; import org.redis2asp.protocol.request.SetRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class RedisCommandHandler implements CommandHandler { - IAerospikeClient client = AeroSpikeClientFactory.getClient(); + + private final Logger logger = LoggerFactory.getLogger(getClass()); + + IAerospikeClient client = AeroSpikeClientFactory.getClient(); @Override public void handleCommand(RemotingContext ctx, Object msg) { if (msg instanceof RedisRequest) { RedisRequest redisRequest = (RedisRequest) msg; + if (redisRequest instanceof GetRequest) { + GetRequest getRequest = (GetRequest) redisRequest; + Key key = new Key(AeroSpikeClientFactory.namespace, AeroSpikeClientFactory.set, getRequest.getKey()); + client.get(null, new RecordListener() { + @Override + public void onSuccess(Key key, Record record) { + if (record == null) { + ctx.writeAndFlush(redisRequest.getResponse()); + return; + } + String value = record.getString(getRequest.getKey()); + if (StringUtil.isNotBlank(value)) { + getRequest.setResponse(value.getBytes(StandardCharsets.UTF_8)); + } + ctx.writeAndFlush(redisRequest.getResponse()); + } + + @Override + public void onFailure(AerospikeException ae) { + logger.error(ae.getMessage(), ae); + getRequest.setResponse(ae.getMessage().getBytes(StandardCharsets.UTF_8)); + ctx.writeAndFlush(redisRequest.getResponse()); + } + }, client.getReadPolicyDefault(), key); + } if (redisRequest instanceof SetRequest) { - SetRequest setRequest = (SetRequest)redisRequest; + SetRequest setRequest = (SetRequest) redisRequest; Bin bin = new Bin(setRequest.getKey(), setRequest.getValue()); Key key = new Key(AeroSpikeClientFactory.namespace, AeroSpikeClientFactory.set, setRequest.getKey()); - client.put(client.getWritePolicyDefault(), key, bin); - setRequest.setResponse("OK".getBytes(StandardCharsets.UTF_8)); + client.put(null, new WriteListener() { + @Override + public void onSuccess(Key key) { + setRequest.setResponse("OK".getBytes(StandardCharsets.UTF_8)); + ctx.writeAndFlush(redisRequest.getResponse()); + } + + @Override + public void onFailure(AerospikeException ae) { + setRequest.setResponse(ae.getMessage().getBytes(StandardCharsets.UTF_8)); + ctx.writeAndFlush(redisRequest.getResponse()); + } + }, client.getWritePolicyDefault(), key, bin); } if (redisRequest instanceof CommandRequest) { CommandRequest commandRequest = (CommandRequest) redisRequest; commandRequest.setResponse("OK".getBytes(StandardCharsets.UTF_8)); + ctx.writeAndFlush(redisRequest.getResponse()); } - ctx.writeAndFlush(redisRequest.getResponse()); } } diff --git a/src/main/java/org/redis2asp/protocol/request/GetRequest.java b/src/main/java/org/redis2asp/protocol/request/GetRequest.java new file mode 100644 index 0000000..a916620 --- /dev/null +++ b/src/main/java/org/redis2asp/protocol/request/GetRequest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redis2asp.protocol.request; + +import org.redis2asp.protocol.RedisRequest; +import org.redis2asp.protocol.RedisResponse; +import org.redis2asp.protocol.response.BulkResponse; + +public class GetRequest implements RedisRequest { + + String key; + + BulkResponse response = new BulkResponse(); + + public GetRequest(String key) { + this.key = key; + } + + public String getKey() { + return key; + } + + @Override + public void setResponse(byte[] data) { + this.response.setData(data); + } + + @Override + public RedisResponse getResponse() { + return response; + } + +} diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml new file mode 100644 index 0000000..d341474 --- /dev/null +++ b/src/main/resources/logback.xml @@ -0,0 +1,100 @@ + + + + + + + + + + ${FILE_LOG_PATTERN} + UTF-8 + + + + + + ${LOG_FILE_PATH}/${APPLICATION_NAME:-redis2asp}redis2asp.all.log + true + + ${LOG_FILE_PATH}/history/${APPLICATION_NAME:-redis2asp}redis2asp.all.%d{yyyy-MM-dd}.%i.log.gz + 2GB + 7 + 7GB + true + + + ${FILE_LOG_PATTERN} + UTF-8 + + + + + + + WARN + ACCEPT + DENY + + ${LOG_FILE_PATH}/${APPLICATION_NAME:-redis2asp}redis2asp.warn.log + true + + ${LOG_FILE_PATH}/history/${APPLICATION_NAME:-redis2asp}redis2asp.warn.%d{yyyy-MM-dd}.%i.log.gz + 2GB + 7 + 7GB + true + + + ${FILE_LOG_PATTERN} + UTF-8 + + + + + + + ERROR + ACCEPT + DENY + + ${LOG_FILE_PATH}/${APPLICATION_NAME:-redis2asp}redis2asp.error.log + true + + ${LOG_FILE_PATH}/history/${APPLICATION_NAME:-redis2asp}redis2asp.error.%d{yyyy-MM-dd}.%i.log.gz + 2GB + 7 + 7GB + true + + + ${FILE_LOG_PATTERN} + UTF-8 + + + + + + + + + diff --git a/src/test/java/org/redis2asp/ServerTest.java b/src/test/java/org/redis2asp/ServerTest.java index 71686ab..5d8b855 100644 --- a/src/test/java/org/redis2asp/ServerTest.java +++ b/src/test/java/org/redis2asp/ServerTest.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ThreadLocalRandom; import com.aerospike.client.IAerospikeClient; import com.aerospike.client.Key; import com.aerospike.client.Record; @@ -36,7 +37,7 @@ public class ServerTest { static Server server; static IAerospikeClient aspClient; - static Logger logger = LoggerFactory.getLogger(ServerTest.class); + static Logger logger = LoggerFactory.getLogger(ServerTest.class); @BeforeAll public static void init() throws IOException, ParseException { @@ -57,7 +58,7 @@ public void testRedisSet() { } @Test - public void testSetAsp() { + public void testGetSetAsp() { try (Jedis jedis = new Jedis("127.0.0.1", 6789)) { String result = jedis.set("a", "b"); Assertions.assertEquals(result, "OK"); @@ -65,7 +66,19 @@ public void testSetAsp() { Key key = new Key(AeroSpikeClientFactory.namespace, AeroSpikeClientFactory.set, "a"); Record record = aspClient.get(aspClient.getReadPolicyDefault(), key); Map map = record.bins; - Assertions.assertTrue(map.containsKey("a")); + Assertions.assertEquals(map.get("a"), "b"); + try (Jedis jedis = new Jedis("127.0.0.1", 6789, 3000)) { + String result = jedis.get("a"); + Assertions.assertEquals(result, "b"); + } + } + + @Test + public void testGetNilAsp() { + try (Jedis jedis = new Jedis("127.0.0.1", 6789, 3000)) { + String result = jedis.get(String.valueOf(ThreadLocalRandom.current().nextInt(111))); + Assertions.assertNull(result); + } } @AfterAll