Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#8859] Fix duplicate mapped file in mutil commitlog store path mode. #8897

Open
wants to merge 7 commits into
base: develop
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
public class AllocateMappedFileService extends ServiceThread {
private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
private static int waitTimeOut = 1000 * 5;
private ConcurrentMap<String, AllocateRequest> requestTable =
private ConcurrentMap<String/*FileName*/, AllocateRequest> requestTable =
new ConcurrentHashMap<>();
private PriorityBlockingQueue<AllocateRequest> requestQueue =
new PriorityBlockingQueue<>();
Expand All @@ -50,6 +50,11 @@ public AllocateMappedFileService(DefaultMessageStore messageStore) {
this.messageStore = messageStore;
}

public String getCachedMappedFilePath(long createOffset) {
AllocateRequest req = requestTable.get(UtilAll.offset2FileName(createOffset));
return req == null ? null : req.getFilePath();
}

public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
int canSubmitRequests = 2;
if (this.messageStore.isTransientStorePoolEnable()) {
Expand All @@ -59,14 +64,15 @@ public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String next
}
}

String nextFileName = nextFilePath.substring(nextFilePath.lastIndexOf("/") + 1);
AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;
boolean nextPutOK = this.requestTable.putIfAbsent(nextFileName, nextReq) == null;

if (nextPutOK) {
if (canSubmitRequests <= 0) {
log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " +
"RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.remainTransientStoreBufferNumbs());
this.requestTable.remove(nextFilePath);
this.requestTable.remove(nextFileName);
return null;
}
boolean offerOK = this.requestQueue.offer(nextReq);
Expand All @@ -76,13 +82,14 @@ public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String next
canSubmitRequests--;
}

String nextNextFileName = nextNextFilePath.substring(nextNextFilePath.lastIndexOf("/") + 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is necessary to use File.separator, as the file path separator on other OS may not be '/'.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, fixed now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The UTs also need to be modified accordingly.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, it was my oversight. I have modified it as required. Please review it again. Thank you.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, it was my oversight. I have modified it as required. Please review it again. Thank you.

By the way, is there any compatibility issue here? From the code, it seems that there isn't.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean file path separator in test case? I also have this doubts but the other use cases I refer to in this file are like this.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, there are indeed some compatibility issues in my UTs, I have located the problem in the workflows. I will provide a new patch after completing the test in my repository later.

Copy link
Author

@hexueyuan hexueyuan Nov 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a new patch after completing the test in my repository

Fixed compatibility issue.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like some errors blocked the workflow checks, but I don't have permission to stop or rerun it.

AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFileName, nextNextReq) == null;
if (nextNextPutOK) {
if (canSubmitRequests <= 0) {
log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " +
"RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.remainTransientStoreBufferNumbs());
this.requestTable.remove(nextNextFilePath);
this.requestTable.remove(nextNextFileName);
} else {
boolean offerOK = this.requestQueue.offer(nextNextReq);
if (!offerOK) {
Expand All @@ -96,7 +103,7 @@ public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String next
return null;
}

AllocateRequest result = this.requestTable.get(nextFilePath);
AllocateRequest result = this.requestTable.get(nextFileName);
try {
if (result != null) {
messageStore.getPerfCounter().startTick("WAIT_MAPFILE_TIME_MS");
Expand All @@ -106,7 +113,7 @@ public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String next
log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize());
return null;
} else {
this.requestTable.remove(nextFilePath);
this.requestTable.remove(nextFileName);
return result.getMappedFile();
}
} else {
Expand Down Expand Up @@ -156,7 +163,8 @@ private boolean mmapOperation() {
AllocateRequest req = null;
try {
req = this.requestQueue.take();
AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());
String fileName = req.getFilePath().substring(req.getFilePath().lastIndexOf("/") + 1);
AllocateRequest expectedRequest = this.requestTable.get(fileName);
if (null == expectedRequest) {
log.warn("this mmap request expired, maybe cause timeout " + req.getFilePath() + " "
+ req.getFileSize());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,24 @@ public MappedFile tryCreateMappedFile(long createOffset) {

String[] paths = availableStorePath.toArray(new String[]{});
Arrays.sort(paths);
String nextFilePath = paths[(int) (fileIdx % paths.length)] + File.separator

String nextFilePath = null;
String nextNextFilePath = null;
if (allocateMappedFileService != null) {
nextFilePath = allocateMappedFileService.getCachedMappedFilePath(
createOffset);
nextNextFilePath = allocateMappedFileService.getCachedMappedFilePath(
createOffset + this.mappedFileSize);
}
if (nextFilePath == null) {
nextFilePath = paths[(int) (fileIdx % paths.length)] + File.separator
+ UtilAll.offset2FileName(createOffset);
String nextNextFilePath = paths[(int) ((fileIdx + 1) % paths.length)] + File.separator
}
if (nextNextFilePath == null) {
nextNextFilePath = paths[(int) ((fileIdx + 1) % paths.length)] + File.separator
+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);
}

return doCreateMappedFile(nextFilePath, nextNextFilePath);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,19 @@
package org.apache.rocketmq.store;

import static org.assertj.core.api.Assertions.assertThat;

import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.logfile.MappedFile;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.junit.Test;


Expand Down Expand Up @@ -153,4 +160,43 @@ public void testFullStorePath() {
mappedFileQueue.shutdown(1000);
mappedFileQueue.destroy();
}

@Test
public void testUniqueNextNextMappedFile() throws IOException {
Set<String> fullStorePath = new HashSet<>();

MessageStoreConfig config = new MessageStoreConfig();
config.setStorePathCommitLog("target/unit_test_store/a" + MixAll.MULTI_PATH_SPLITTER
+ "target/unit_test_store/b" + MixAll.MULTI_PATH_SPLITTER
+ "target/unit_test_store/c");

DefaultMessageStore messageStore = new DefaultMessageStore(config,
new BrokerStatsManager("CommitlogTest", true),
(topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties) -> {},
new BrokerConfig(),
new ConcurrentHashMap<>());

AllocateMappedFileService allocateMappedFileService = new AllocateMappedFileService(messageStore);
allocateMappedFileService.start();

MappedFileQueue mappedFileQueue = new MultiPathMappedFileQueue(config, 1024, allocateMappedFileService, () -> fullStorePath);
String[] storePaths = config.getStorePathCommitLog().trim().split(MixAll.MULTI_PATH_SPLITTER);
assertThat(storePaths.length).isEqualTo(3);

// the nextFilePath is "target/unit_test_store/a/00000000000000000000"
// the first invoke will insert nextNextFilePath "target/unit_test_store/b/00000000000000001024" into requestTable
mappedFileQueue.tryCreateMappedFile(0);

// mark target/unit_test_store/b/ as full
fullStorePath.add("target/unit_test_store/b");
// the nextFilePath is still "target/unit_test_store/b/00000000000000001024"
MappedFile mappedFile1 = mappedFileQueue.tryCreateMappedFile(1024);

assertThat(mappedFile1).isNotNull();
assertThat(mappedFile1.getFile().getPath()).isEqualTo("target/unit_test_store/b/00000000000000001024");

mappedFileQueue.shutdown(1000);
mappedFileQueue.destroy();
allocateMappedFileService.shutdown();
}
}
Loading