Skip to content

Commit

Permalink
[INLONG-11527][Agent] Save both row and byte position information whe…
Browse files Browse the repository at this point in the history
…n saving offset
  • Loading branch information
justinwwhuang committed Nov 21, 2024
1 parent c99a0eb commit 1ebca75
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import org.apache.inlong.agent.plugin.task.file.FileDataUtils;
import org.apache.inlong.agent.utils.AgentUtils;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -52,7 +55,20 @@
*/
public class LogFileSource extends AbstractSource {

public static final int LEN_OF_FILE_OFFSET_ARRAY = 2;

@Data
@AllArgsConstructor
@NoArgsConstructor
protected class FileOffset {

private Long lineOffset;
private Long byteOffset;
private boolean hasByteOffset;
}

private static final Logger LOGGER = LoggerFactory.getLogger(LogFileSource.class);
public static final String OFFSET_SEP = ":";
private final Integer SIZE_OF_BUFFER_TO_READ_FILE = 64 * 1024;
private final Long INODE_UPDATE_INTERVAL_MS = 1000L;
private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); // 设置格式
Expand Down Expand Up @@ -86,8 +102,7 @@ protected void initSource(InstanceProfile profile) {
file = new File(fileName);
inodeInfo = profile.get(TaskConstants.INODE_INFO);
lastInodeUpdateTime = AgentUtils.getCurrentTime();
linePosition = getInitLineOffset(isIncrement, taskId, instanceId, inodeInfo);
bytePosition = getBytePositionByLine(linePosition);
initOffset(isIncrement, taskId, instanceId, inodeInfo);
randomAccessFile = new RandomAccessFile(file, "r");
} catch (Exception ex) {
stopRunning();
Expand Down Expand Up @@ -137,14 +152,9 @@ protected String getThreadName() {
}

private List<SourceData> readFromPos(long pos) throws IOException {
List<byte[]> lines = new ArrayList<>();
List<SourceData> dataList = new ArrayList<>();
bytePosition = readLines(randomAccessFile, pos, lines, BATCH_READ_LINE_COUNT, BATCH_READ_LINE_TOTAL_LEN, false);
for (int i = 0; i < lines.size(); i++) {
linePosition++;
dataList.add(new SourceData(lines.get(i), Long.toString(linePosition)));
}
return dataList;
List<SourceData> lines = new ArrayList<>();
bytePosition = readLines(randomAccessFile, pos, lines, BATCH_READ_LINE_COUNT, BATCH_READ_LINE_TOTAL_LEN);
return lines;
}

private int getRealLineCount(String fileName) {
Expand All @@ -157,30 +167,39 @@ private int getRealLineCount(String fileName) {
}
}

private long getInitLineOffset(boolean isIncrement, String taskId, String instanceId, String inodeInfo) {
long offset = 0;
private void initOffset(boolean isIncrement, String taskId, String instanceId, String inodeInfo)
throws IOException {
long lineOffset;
long byteOffset;
if (offsetProfile != null && offsetProfile.getInodeInfo().compareTo(inodeInfo) == 0) {
offset = Long.parseLong(offsetProfile.getOffset());
int fileLineCount = getRealLineCount(instanceId);
if (fileLineCount < offset) {
LOGGER.info("getInitLineOffset inode no change taskId {} file rotate, offset set to 0, file {}", taskId,
fileName);
offset = 0;
FileOffset fileOffset = parseFIleOffset(offsetProfile.getOffset());
if (fileOffset.hasByteOffset) {
lineOffset = fileOffset.lineOffset;
byteOffset = fileOffset.byteOffset;
LOGGER.info("initOffset inode no change taskId {} restore lineOffset {} byteOffset {}, file {}", taskId,
lineOffset, byteOffset, fileName);
} else {
LOGGER.info("getInitLineOffset inode no change taskId {} from offset store {}, file {}", taskId, offset,
fileName);
lineOffset = fileOffset.lineOffset;
byteOffset = getBytePositionByLine(lineOffset);
LOGGER.info("initOffset inode no change taskId {} restore lineOffset {} count byteOffset {}, file {}",
taskId,
lineOffset, byteOffset, fileName);
}
} else {
if (isIncrement) {
offset = getRealLineCount(instanceId);
LOGGER.info("getInitLineOffset taskId {} for new increment read from {} file {}", taskId,
offset, fileName);
lineOffset = getRealLineCount(instanceId);
byteOffset = getBytePositionByLine(lineOffset);
LOGGER.info("initOffset taskId {} for new increment lineOffset {} byteOffset {}, file {}", taskId,
lineOffset, byteOffset, fileName);
} else {
offset = 0;
LOGGER.info("getInitLineOffset taskId {} for new all read from 0 file {}", taskId, fileName);
lineOffset = 0;
byteOffset = 0;
LOGGER.info("initOffset taskId {} for new all read lineOffset {} byteOffset {} file {}", taskId,
lineOffset, byteOffset, fileName);
}
}
return offset;
linePosition = lineOffset;
bytePosition = byteOffset;
}

public File getFile() {
Expand All @@ -202,9 +221,9 @@ private long getBytePositionByLine(long linePosition) throws IOException {
try {
input = new RandomAccessFile(file, "r");
while (readCount < linePosition) {
List<byte[]> lines = new ArrayList<>();
List<SourceData> lines = new ArrayList<>();
pos = readLines(input, pos, lines, Math.min((int) (linePosition - readCount), BATCH_READ_LINE_COUNT),
BATCH_READ_LINE_TOTAL_LEN, true);
BATCH_READ_LINE_TOTAL_LEN);
readCount += lines.size();
if (lines.size() == 0) {
LOGGER.error("getBytePositionByLine LineNum {} larger than the real file");
Expand All @@ -229,8 +248,8 @@ private long getBytePositionByLine(long linePosition) throws IOException {
* @return The new position after the lines have been read
* @throws IOException if an I/O error occurs.
*/
private long readLines(RandomAccessFile reader, long pos, List<byte[]> lines, int maxLineCount, int maxLineTotalLen,
boolean isCounting)
private long readLines(RandomAccessFile reader, long pos, List<SourceData> lines, int maxLineCount,
int maxLineTotalLen)
throws IOException {
if (maxLineCount == 0) {
return pos;
Expand All @@ -248,13 +267,10 @@ private long readLines(RandomAccessFile reader, long pos, List<byte[]> lines, in
byte ch = bufferToReadFile[i];
switch (ch) {
case '\n':
if (isCounting) {
lines.add(null);
} else {
lines.add(baos.toByteArray());
lineTotalLen += baos.size();
}
linePosition++;
rePos = pos + i + 1;
lines.add(new SourceData(baos.toByteArray(), getOffsetString(linePosition, rePos)));
lineTotalLen += baos.size();
if (overLen) {
LOGGER.warn("readLines over len finally string len {}",
new String(baos.toByteArray()).length());
Expand Down Expand Up @@ -297,6 +313,19 @@ private long readLines(RandomAccessFile reader, long pos, List<byte[]> lines, in
return rePos;
}

private String getOffsetString(Long lineOffset, Long byteOffset) {
return lineOffset + OFFSET_SEP + byteOffset;
}

private FileOffset parseFIleOffset(String offset) {
String[] offsetArray = offset.split(OFFSET_SEP);
if (offsetArray.length == LEN_OF_FILE_OFFSET_ARRAY) {
return new FileOffset(Long.parseLong(offsetArray[0]), Long.parseLong(offsetArray[1]), true);
} else {
return new FileOffset(Long.parseLong(offsetArray[0]), null, false);
}
}

private boolean isInodeChanged() {
if (AgentUtils.getCurrentTime() - lastInodeUpdateTime > INODE_UPDATE_INTERVAL_MS) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;

import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT;
Expand Down Expand Up @@ -74,13 +75,18 @@ public static void setup() {
OffsetManager.init(taskBasicStore, instanceBasicStore, offsetBasicStore);
}

private LogFileSource getSource(int taskId, long offset) {
private LogFileSource getSource(int taskId, long lineOffset, long byteOffset, String dataContentStyle,
boolean isOffSetNew) {
try {
String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern, "csv", false, "", "",
String pattern;
String fileName;
boolean retry;
fileName = LOADER.getResource("test/20230928_1.txt").getPath();
pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
retry = false;
TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern, dataContentStyle, retry, "", "",
TaskStateEnum.RUNNING, "D",
"GMT+8:00", null);
String fileName = LOADER.getResource("test/20230928_1.txt").getPath();
"GMT+8:00", Arrays.asList("ok"));
InstanceProfile instanceProfile = taskProfile.createInstanceProfile("",
fileName, taskProfile.getCycleUnit(), "20230928", AgentUtils.getCurrentTime());
instanceProfile.set(TaskConstants.INODE_INFO, FileDataUtils.getInodeInfo(instanceProfile.getInstanceId()));
Expand All @@ -91,17 +97,21 @@ private LogFileSource getSource(int taskId, long offset) {
Whitebox.setInternalState(source, "SIZE_OF_BUFFER_TO_READ_FILE", 2);
Whitebox.setInternalState(source, "EMPTY_CHECK_COUNT_AT_LEAST", 3);
Whitebox.setInternalState(source, "WAIT_TIMEOUT_MS", 10);
if (offset > 0) {
if (lineOffset > 0) {
String finalOffset = Long.toString(lineOffset);
if (isOffSetNew) {
finalOffset += LogFileSource.OFFSET_SEP + byteOffset;
}
OffsetProfile offsetProfile = new OffsetProfile(instanceProfile.getTaskId(),
instanceProfile.getInstanceId(),
Long.toString(offset), instanceProfile.get(INODE_INFO));
finalOffset, instanceProfile.get(INODE_INFO));
OffsetManager.getInstance().setOffset(offsetProfile);
}
source.init(instanceProfile);
source.start();
return source;
} catch (Exception e) {
LOGGER.error("source init error {}", e);
LOGGER.error("source init error", e);
Assert.assertTrue("source init error", false);
}
return null;
Expand All @@ -124,7 +134,7 @@ private void testFullRead() {
for (int i = 0; i < check.length; i++) {
srcLen += check[i].getBytes(StandardCharsets.UTF_8).length;
}
LogFileSource source = getSource(1, 0);
LogFileSource source = getSource(1, 0, 0, "csv", false);
Message msg = source.read();
int readLen = 0;
int cnt = 0;
Expand All @@ -149,7 +159,7 @@ private void testFullRead() {
}

private void testCleanQueue() {
LogFileSource source = getSource(2, 0);
LogFileSource source = getSource(2, 0, 0, "csv", false);
for (int i = 0; i < 2; i++) {
source.read();
}
Expand All @@ -160,16 +170,16 @@ private void testCleanQueue() {
}

private void testReadWithOffset() {
LogFileSource source = getSource(3, 1);
LogFileSource source = getSource(3, 1, 25, "csv", false);
for (int i = 0; i < 2; i++) {
Message msg = source.read();
Assert.assertTrue(msg != null);
Assert.assertEquals(new String(msg.getBody()), check[i + 1]);
}
Message msg = source.read();
Assert.assertTrue(msg == null);
source.destroy();

source = getSource(4, 3);
source = getSource(4, 3, 69, "csv", false);
msg = source.read();
Assert.assertTrue(msg == null);
source.destroy();
Expand Down

0 comments on commit 1ebca75

Please sign in to comment.