Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make code compile in Java 9+ #8893

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ maven_install(
"org.mockito:mockito-junit-jupiter:4.11.0",
"com.alibaba.fastjson2:fastjson2:2.0.43",
"org.junit.jupiter:junit-jupiter-api:5.9.1",
"com.github.oshi:oshi-core:6.6.5",
],
fetch_sources = True,
repositories = [
Expand Down
1 change: 1 addition & 0 deletions common/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ java_library(
"@maven//:io_github_aliyunmq_rocketmq_slf4j_api",
"@maven//:io_github_aliyunmq_rocketmq_logback_classic",
"@maven//:org_apache_rocketmq_rocketmq_rocksdb",
"@maven//:com_github_oshi_oshi_core",
],
)

Expand Down
4 changes: 4 additions & 0 deletions common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -116,5 +116,9 @@
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-rocksdb</artifactId>
</dependency>
<dependency>
<groupId>com.github.oshi</groupId>
<artifactId>oshi-core</artifactId>
</dependency>
</dependencies>
</project>
2 changes: 2 additions & 0 deletions common/src/main/java/org/apache/rocketmq/common/MixAll.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.rocketmq.common.utils.IOTinyUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import oshi.SystemInfo;

public class MixAll {
public static final String ROCKETMQ_HOME_ENV = "ROCKETMQ_HOME";
Expand Down Expand Up @@ -119,6 +120,7 @@ public class MixAll {
public static final String MULTI_PATH_SPLITTER = System.getProperty("rocketmq.broker.multiPathSplitter", ",");

private static final String OS = System.getProperty("os.name").toLowerCase();
public static final SystemInfo SYSTEM_INFO = new SystemInfo();

public static boolean isWindows() {
return OS.contains("win");
Expand Down
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@
<rocksdb.version>1.0.2</rocksdb.version>
<jackson-databind.version>2.13.4.2</jackson-databind.version>
<sofa-jraft.version>1.3.14</sofa-jraft.version>
<oshi.version>6.6.5</oshi.version>

<!-- Test dependencies -->
<junit.version>4.13.2</junit.version>
Expand Down Expand Up @@ -1053,6 +1054,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.github.oshi</groupId>
<artifactId>oshi-core</artifactId>
<version>${oshi.version}</version>
</dependency>
<dependency>
<groupId>com.adobe.testing</groupId>
<artifactId>s3mock-junit4</artifactId>
Expand Down
7 changes: 3 additions & 4 deletions store/src/main/java/org/apache/rocketmq/store/CommitLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.rocketmq.store;

import com.google.common.base.Strings;
import io.netty.util.internal.PlatformDependent;
import java.net.Inet6Address;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -67,8 +68,6 @@
import org.apache.rocketmq.store.util.LibC;
import org.rocksdb.RocksDBException;

import sun.nio.ch.DirectBuffer;

/**
* Store all metadata downtime for recovery, data protection reliability
*/
Expand Down Expand Up @@ -2384,7 +2383,7 @@ private byte[] sampling(byte[] pageCacheTable, int sampleStep) {

private byte[] checkFileInPageCache(MappedFile mappedFile) {
long fileSize = mappedFile.getFileSize();
final long address = ((DirectBuffer) mappedFile.getMappedByteBuffer()).address();
final long address = PlatformDependent.directBufferAddress(mappedFile.getMappedByteBuffer());
int pageNums = (int) (fileSize + this.pageSize - 1) / this.pageSize;
byte[] pageCacheRst = new byte[pageNums];
int mincore = LibC.INSTANCE.mincore(new Pointer(address), new NativeLong(fileSize), pageCacheRst);
Expand Down Expand Up @@ -2461,7 +2460,7 @@ private int setFileReadMode(MappedFile mappedFile, int mode) {
log.error("setFileReadMode mappedFile is null");
return -1;
}
final long address = ((DirectBuffer) mappedFile.getMappedByteBuffer()).address();
final long address = PlatformDependent.directBufferAddress(mappedFile.getMappedByteBuffer());
int madvise = LibC.INSTANCE.madvise(new Pointer(address), new NativeLong(mappedFile.getFileSize()), mode);
if (madvise != 0) {
log.error("setFileReadMode error fileName: {}, madvise: {}, mode:{}", mappedFile.getFileName(), madvise, mode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@

import com.sun.jna.NativeLong;
import com.sun.jna.Pointer;
import io.netty.util.internal.PlatformDependent;
import java.nio.ByteBuffer;
import java.util.Deque;
import java.util.concurrent.ConcurrentLinkedDeque;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.store.util.LibC;
import sun.nio.ch.DirectBuffer;

public class TransientStorePool {
private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
Expand All @@ -47,8 +47,7 @@ public TransientStorePool(final int poolSize, final int fileSize) {
public void init() {
for (int i = 0; i < poolSize; i++) {
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);

final long address = ((DirectBuffer) byteBuffer).address();
final long address = PlatformDependent.directBufferAddress(byteBuffer);
Pointer pointer = new Pointer(address);
LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));

Expand All @@ -58,7 +57,7 @@ public void init() {

public void destroy() {
for (ByteBuffer byteBuffer : availableBuffers) {
final long address = ((DirectBuffer) byteBuffer).address();
final long address = PlatformDependent.directBufferAddress(byteBuffer);
Pointer pointer = new Pointer(address);
LibC.INSTANCE.munlock(pointer, new NativeLong(fileSize));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

import com.sun.jna.NativeLong;
import com.sun.jna.Pointer;
import io.netty.util.internal.PlatformDependent;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
Expand All @@ -38,6 +38,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.commons.lang3.SystemUtils;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageExt;
Expand All @@ -55,14 +56,10 @@
import org.apache.rocketmq.store.TransientStorePool;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.util.LibC;
import sun.misc.Unsafe;
import sun.nio.ch.DirectBuffer;

public class DefaultMappedFile extends AbstractMappedFile {
public static final int OS_PAGE_SIZE = 1024 * 4;
public static final Unsafe UNSAFE = getUnsafe();
private static final Method IS_LOADED_METHOD;
public static final int UNSAFE_PAGE_SIZE = UNSAFE == null ? OS_PAGE_SIZE : UNSAFE.pageSize();
public static final long OS_PAGE_SIZE = MixAll.SYSTEM_INFO.getHardware().getMemory().getPageSize();

protected static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);

Expand Down Expand Up @@ -754,7 +751,7 @@ public void setFirstCreateInQueue(boolean firstCreateInQueue) {
@Override
public void mlock() {
final long beginTime = System.currentTimeMillis();
final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
final long address = PlatformDependent.directBufferAddress(this.mappedByteBuffer);
Pointer pointer = new Pointer(address);
{
int ret = LibC.INSTANCE.mlock(pointer, new NativeLong(this.fileSize));
Expand All @@ -770,7 +767,7 @@ public void mlock() {
@Override
public void munlock() {
final long beginTime = System.currentTimeMillis();
final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
final long address = PlatformDependent.directBufferAddress(this.mappedByteBuffer);
Pointer pointer = new Pointer(address);
int ret = LibC.INSTANCE.munlock(pointer, new NativeLong(this.fileSize));
log.info("munlock {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
Expand Down Expand Up @@ -862,25 +859,14 @@ public Iterator<SelectMappedBufferResult> iterator(int startPos) {
return new Itr(startPos);
}

public static Unsafe getUnsafe() {
try {
Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
return (Unsafe) f.get(null);
} catch (Exception ignore) {

}
return null;
}

public static long mappingAddr(long addr) {
long offset = addr % UNSAFE_PAGE_SIZE;
offset = (offset >= 0) ? offset : (UNSAFE_PAGE_SIZE + offset);
long offset = addr % OS_PAGE_SIZE;
offset = (offset >= 0) ? offset : (OS_PAGE_SIZE + offset);
return addr - offset;
}

public static int pageCount(long size) {
return (int) (size + (long) UNSAFE_PAGE_SIZE - 1L) / UNSAFE_PAGE_SIZE;
return (int) ((size + OS_PAGE_SIZE - 1L) / OS_PAGE_SIZE);
}

@Override
Expand All @@ -889,7 +875,7 @@ public boolean isLoaded(long position, int size) {
return true;
}
try {
long addr = ((DirectBuffer) mappedByteBuffer).address() + position;
long addr = PlatformDependent.directBufferAddress(mappedByteBuffer) + position;
return (boolean) IS_LOADED_METHOD.invoke(mappedByteBuffer, mappingAddr(addr), size, pageCount(size));
} catch (Exception e) {
log.info("invoke isLoaded0 of file {} error:", file.getAbsolutePath(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.test.dledger;

import java.io.File;
import java.util.UUID;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
Expand All @@ -38,8 +39,6 @@
import org.junit.Assert;
import org.junit.Test;

import static sun.util.locale.BaseLocale.SEP;

public class DLedgerProduceAndConsumeIT {

public BrokerConfig buildBrokerConfig(String cluster, String brokerName) {
Expand All @@ -55,7 +54,7 @@ public MessageStoreConfig buildStoreConfig(String brokerName, String peers, Stri
MessageStoreConfig storeConfig = new MessageStoreConfig();
String baseDir = IntegrationTestBase.createBaseDir();
storeConfig.setStorePathRootDir(baseDir);
storeConfig.setStorePathCommitLog(baseDir + SEP + "commitlog");
storeConfig.setStorePathCommitLog(baseDir + File.separator + "commitlog");
storeConfig.setHaListenPort(0);
storeConfig.setMappedFileSizeCommitLog(10 * 1024 * 1024);
storeConfig.setEnableDLegerCommitLog(true);
Expand Down
Loading