Skip to content

Commit

Permalink
feature: support get protocol (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
funky-eyes authored Apr 6, 2024
1 parent a3ab842 commit c4df6b3
Show file tree
Hide file tree
Showing 8 changed files with 290 additions and 15 deletions.
8 changes: 2 additions & 6 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,11 @@ jobs:
- name: "Test, Check style, Check PMD, Check license with Maven and Java8"
if: matrix.java == '8'
run: |
./mvnw -T 4C clean test \
-Dmaven.git-commit-id.skip=true \
-e -B -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn;
./mvnw -T 4C clean test
- name: "Test with Maven and Java${{ matrix.java }}"
if: matrix.java != '8'
run: |
./mvnw -T 4C clean test \
-Dmaven.git-commit-id.skip=true \
-e -B -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn;
./mvnw -T 4C clean test
- name: "Codecov"
if: matrix.java == '8'
uses: codecov/[email protected]
52 changes: 52 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,24 @@
<mock-jedis.version>0.3.1</mock-jedis.version>
<junit-jupiter.version>5.8.2</junit-jupiter.version>
<commons-cli.version>1.6.0</commons-cli.version>
<log.version>1.2.9</log.version>
</properties>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.6</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>${log.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${log.version}</version>
</dependency>
<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-client</artifactId>
Expand Down Expand Up @@ -86,6 +102,42 @@
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin>
<groupId>com.mycila</groupId>
<artifactId>license-maven-plugin</artifactId>
<version>3.0</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>remove</goal>
<goal>format</goal>
</goals>
</execution>
</executions>
<configuration>
<quiet>true</quiet>
<header>${main.user.dir}/tools/codestyle/HEADER</header>
<includes>
<include>**/src/main/java/**</include>
<include>**/src/test/java/**</include>
</includes>
<excludes>
</excludes>
<strictCheck>true</strictCheck>
<mapping>
<java>SLASHSTAR_STYLE</java>
</mapping>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<version>2.5.1</version>
<configuration>
<downloadSources>true</downloadSources>
</configuration>
</plugin>
</plugins>
</build>
</project>
19 changes: 18 additions & 1 deletion src/main/java/org/redis2asp/factory/AeroSpikeClientFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,14 @@
import com.aerospike.client.AerospikeClient;
import com.aerospike.client.Host;
import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.async.EventLoops;
import com.aerospike.client.async.EventPolicy;
import com.aerospike.client.async.NettyEventLoops;
import com.aerospike.client.async.NioEventLoops;
import com.aerospike.client.policy.ClientPolicy;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;

public class AeroSpikeClientFactory {

Expand All @@ -32,19 +39,29 @@ public class AeroSpikeClientFactory {
public static volatile String namespace;

public static volatile String set;
public static volatile EventLoops eventLoops;
static {
EventPolicy eventPolicy = new EventPolicy();
if (Epoll.isAvailable()) {
EventLoopGroup group = new EpollEventLoopGroup(Runtime.getRuntime().availableProcessors());
eventLoops = new NettyEventLoops(eventPolicy, group);
} else {
eventLoops = new NioEventLoops(eventPolicy, Runtime.getRuntime().availableProcessors());
}
}

/**
* Return either a native Aerospike client or a proxy client, based on isProxy.
*
* @param clientPolicy client configuration parameters, pass in null for defaults
* @param hosts array of server hosts that the client can connect
* @return IAerospikeClient
*/
public static void createInstance(ClientPolicy clientPolicy, Host... hosts) {
if (client == null) {
LOCK.lock();
try {
if (client == null) {
clientPolicy.eventLoops = eventLoops;
client = new AerospikeClient(clientPolicy, hosts);
}
} finally {
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/org/redis2asp/protocol/RedisCommandDecoder.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.DecoderException;
import org.redis2asp.protocol.request.CommandRequest;
import org.redis2asp.protocol.request.GetRequest;
import org.redis2asp.protocol.request.SetRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -53,6 +54,8 @@ private RedisRequest<?> convert2RedisRequest(List<String> params) {
String cmd = params.get(0);
LOGGER.info("cmd: {}", cmd);
switch (cmd.toLowerCase()) {
case "get":
return new GetRequest(params.get(1));
case "command":
return new CommandRequest();
case "set":
Expand Down
57 changes: 52 additions & 5 deletions src/main/java/org/redis2asp/protocol/RedisCommandHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,36 +18,83 @@

import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutorService;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.Bin;
import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.Key;
import com.aerospike.client.Record;
import com.aerospike.client.listener.RecordListener;
import com.aerospike.client.listener.WriteListener;
import com.alipay.remoting.CommandCode;
import com.alipay.remoting.CommandHandler;
import com.alipay.remoting.RemotingContext;
import com.alipay.remoting.RemotingProcessor;
import com.alipay.sofa.common.profile.StringUtil;
import org.redis2asp.factory.AeroSpikeClientFactory;
import org.redis2asp.protocol.request.CommandRequest;
import org.redis2asp.protocol.request.GetRequest;
import org.redis2asp.protocol.request.SetRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RedisCommandHandler implements CommandHandler {
IAerospikeClient client = AeroSpikeClientFactory.getClient();

private final Logger logger = LoggerFactory.getLogger(getClass());

IAerospikeClient client = AeroSpikeClientFactory.getClient();

@Override
public void handleCommand(RemotingContext ctx, Object msg) {
if (msg instanceof RedisRequest) {
RedisRequest<?> redisRequest = (RedisRequest) msg;
if (redisRequest instanceof GetRequest) {
GetRequest getRequest = (GetRequest) redisRequest;
Key key = new Key(AeroSpikeClientFactory.namespace, AeroSpikeClientFactory.set, getRequest.getKey());
client.get(null, new RecordListener() {
@Override
public void onSuccess(Key key, Record record) {
if (record == null) {
ctx.writeAndFlush(redisRequest.getResponse());
return;
}
String value = record.getString(getRequest.getKey());
if (StringUtil.isNotBlank(value)) {
getRequest.setResponse(value.getBytes(StandardCharsets.UTF_8));
}
ctx.writeAndFlush(redisRequest.getResponse());
}

@Override
public void onFailure(AerospikeException ae) {
logger.error(ae.getMessage(), ae);
getRequest.setResponse(ae.getMessage().getBytes(StandardCharsets.UTF_8));
ctx.writeAndFlush(redisRequest.getResponse());
}
}, client.getReadPolicyDefault(), key);
}
if (redisRequest instanceof SetRequest) {
SetRequest setRequest = (SetRequest)redisRequest;
SetRequest setRequest = (SetRequest) redisRequest;
Bin bin = new Bin(setRequest.getKey(), setRequest.getValue());
Key key = new Key(AeroSpikeClientFactory.namespace, AeroSpikeClientFactory.set, setRequest.getKey());
client.put(client.getWritePolicyDefault(), key, bin);
setRequest.setResponse("OK".getBytes(StandardCharsets.UTF_8));
client.put(null, new WriteListener() {
@Override
public void onSuccess(Key key) {
setRequest.setResponse("OK".getBytes(StandardCharsets.UTF_8));
ctx.writeAndFlush(redisRequest.getResponse());
}

@Override
public void onFailure(AerospikeException ae) {
setRequest.setResponse(ae.getMessage().getBytes(StandardCharsets.UTF_8));
ctx.writeAndFlush(redisRequest.getResponse());
}
}, client.getWritePolicyDefault(), key, bin);
}
if (redisRequest instanceof CommandRequest) {
CommandRequest commandRequest = (CommandRequest) redisRequest;
commandRequest.setResponse("OK".getBytes(StandardCharsets.UTF_8));
ctx.writeAndFlush(redisRequest.getResponse());
}
ctx.writeAndFlush(redisRequest.getResponse());
}
}

Expand Down
47 changes: 47 additions & 0 deletions src/main/java/org/redis2asp/protocol/request/GetRequest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redis2asp.protocol.request;

import org.redis2asp.protocol.RedisRequest;
import org.redis2asp.protocol.RedisResponse;
import org.redis2asp.protocol.response.BulkResponse;

public class GetRequest implements RedisRequest<byte[]> {

String key;

BulkResponse response = new BulkResponse();

public GetRequest(String key) {
this.key = key;
}

public String getKey() {
return key;
}

@Override
public void setResponse(byte[] data) {
this.response.setData(data);
}

@Override
public RedisResponse<byte[]> getResponse() {
return response;
}

}
100 changes: 100 additions & 0 deletions src/main/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration>
<!-- file-appender properties -->
<property name="LOG_FILE_PATH"
value="${user.home}/logs/redis2asp"/>
<property name="FILE_LOG_PATTERN"
value="%d{yyyy-MM-dd HH:mm:ss.SSS} %5p --- [%t] [%logger] [%M] [%X{X-TX-XID:-}]: %m%n%wEx2"/>

<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<Pattern>${FILE_LOG_PATTERN}</Pattern>
<charset>UTF-8</charset>
</encoder>
</appender>

<!--ALL-->
<appender name="FILE_ALL" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_FILE_PATH}/${APPLICATION_NAME:-redis2asp}redis2asp.all.log</file>
<append>true</append>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${LOG_FILE_PATH}/history/${APPLICATION_NAME:-redis2asp}redis2asp.all.%d{yyyy-MM-dd}.%i.log.gz</fileNamePattern>
<maxFileSize>2GB</maxFileSize>
<MaxHistory>7</MaxHistory>
<totalSizeCap>7GB</totalSizeCap>
<cleanHistoryOnStart>true</cleanHistoryOnStart>
</rollingPolicy>
<encoder>
<Pattern>${FILE_LOG_PATTERN}</Pattern>
<charset>UTF-8</charset>
</encoder>
</appender>

<!--WARN-->
<appender name="FILE_WARN" class="ch.qos.logback.core.rolling.RollingFileAppender">
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>WARN</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
<file>${LOG_FILE_PATH}/${APPLICATION_NAME:-redis2asp}redis2asp.warn.log</file>
<append>true</append>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${LOG_FILE_PATH}/history/${APPLICATION_NAME:-redis2asp}redis2asp.warn.%d{yyyy-MM-dd}.%i.log.gz</fileNamePattern>
<maxFileSize>2GB</maxFileSize>
<MaxHistory>7</MaxHistory>
<totalSizeCap>7GB</totalSizeCap>
<cleanHistoryOnStart>true</cleanHistoryOnStart>
</rollingPolicy>
<encoder>
<Pattern>${FILE_LOG_PATTERN}</Pattern>
<charset>UTF-8</charset>
</encoder>
</appender>

<!--ERROR-->
<appender name="FILE_ERROR" class="ch.qos.logback.core.rolling.RollingFileAppender">
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>ERROR</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
<file>${LOG_FILE_PATH}/${APPLICATION_NAME:-redis2asp}redis2asp.error.log</file>
<append>true</append>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${LOG_FILE_PATH}/history/${APPLICATION_NAME:-redis2asp}redis2asp.error.%d{yyyy-MM-dd}.%i.log.gz</fileNamePattern>
<maxFileSize>2GB</maxFileSize>
<MaxHistory>7</MaxHistory>
<totalSizeCap>7GB</totalSizeCap>
<cleanHistoryOnStart>true</cleanHistoryOnStart>
</rollingPolicy>
<encoder>
<Pattern>${FILE_LOG_PATTERN}</Pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="CONSOLE"/>
<appender-ref ref="FILE_ALL"/>
<appender-ref ref="FILE_WARN"/>
<appender-ref ref="FILE_ERROR"/>
</root>
</configuration>
Loading

0 comments on commit c4df6b3

Please sign in to comment.