Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-11527][Agent] Save both row and byte position information when saving offset #11528

Merged
merged 1 commit into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import org.apache.inlong.agent.plugin.task.file.FileDataUtils;
import org.apache.inlong.agent.utils.AgentUtils;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -52,7 +55,20 @@
*/
public class LogFileSource extends AbstractSource {

public static final int LEN_OF_FILE_OFFSET_ARRAY = 2;

@Data
@AllArgsConstructor
@NoArgsConstructor
protected class FileOffset {

private Long lineOffset;
private Long byteOffset;
private boolean hasByteOffset;
}

private static final Logger LOGGER = LoggerFactory.getLogger(LogFileSource.class);
public static final String OFFSET_SEP = ":";
private final Integer SIZE_OF_BUFFER_TO_READ_FILE = 64 * 1024;
private final Long INODE_UPDATE_INTERVAL_MS = 1000L;
private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); // 设置格式
Expand Down Expand Up @@ -86,8 +102,7 @@ protected void initSource(InstanceProfile profile) {
file = new File(fileName);
inodeInfo = profile.get(TaskConstants.INODE_INFO);
lastInodeUpdateTime = AgentUtils.getCurrentTime();
linePosition = getInitLineOffset(isIncrement, taskId, instanceId, inodeInfo);
bytePosition = getBytePositionByLine(linePosition);
initOffset(isIncrement, taskId, instanceId, inodeInfo);
randomAccessFile = new RandomAccessFile(file, "r");
} catch (Exception ex) {
stopRunning();
Expand Down Expand Up @@ -137,14 +152,9 @@ protected String getThreadName() {
}

private List<SourceData> readFromPos(long pos) throws IOException {
List<byte[]> lines = new ArrayList<>();
List<SourceData> dataList = new ArrayList<>();
bytePosition = readLines(randomAccessFile, pos, lines, BATCH_READ_LINE_COUNT, BATCH_READ_LINE_TOTAL_LEN, false);
for (int i = 0; i < lines.size(); i++) {
linePosition++;
dataList.add(new SourceData(lines.get(i), Long.toString(linePosition)));
}
return dataList;
List<SourceData> lines = new ArrayList<>();
bytePosition = readLines(randomAccessFile, pos, lines, BATCH_READ_LINE_COUNT, BATCH_READ_LINE_TOTAL_LEN);
return lines;
}

private int getRealLineCount(String fileName) {
Expand All @@ -157,30 +167,39 @@ private int getRealLineCount(String fileName) {
}
}

private long getInitLineOffset(boolean isIncrement, String taskId, String instanceId, String inodeInfo) {
long offset = 0;
private void initOffset(boolean isIncrement, String taskId, String instanceId, String inodeInfo)
throws IOException {
long lineOffset;
long byteOffset;
if (offsetProfile != null && offsetProfile.getInodeInfo().compareTo(inodeInfo) == 0) {
offset = Long.parseLong(offsetProfile.getOffset());
int fileLineCount = getRealLineCount(instanceId);
if (fileLineCount < offset) {
LOGGER.info("getInitLineOffset inode no change taskId {} file rotate, offset set to 0, file {}", taskId,
fileName);
offset = 0;
FileOffset fileOffset = parseFIleOffset(offsetProfile.getOffset());
if (fileOffset.hasByteOffset) {
lineOffset = fileOffset.lineOffset;
byteOffset = fileOffset.byteOffset;
LOGGER.info("initOffset inode no change taskId {} restore lineOffset {} byteOffset {}, file {}", taskId,
lineOffset, byteOffset, fileName);
} else {
LOGGER.info("getInitLineOffset inode no change taskId {} from offset store {}, file {}", taskId, offset,
fileName);
lineOffset = fileOffset.lineOffset;
byteOffset = getBytePositionByLine(lineOffset);
LOGGER.info("initOffset inode no change taskId {} restore lineOffset {} count byteOffset {}, file {}",
taskId,
lineOffset, byteOffset, fileName);
}
} else {
if (isIncrement) {
offset = getRealLineCount(instanceId);
LOGGER.info("getInitLineOffset taskId {} for new increment read from {} file {}", taskId,
offset, fileName);
lineOffset = getRealLineCount(instanceId);
byteOffset = getBytePositionByLine(lineOffset);
LOGGER.info("initOffset taskId {} for new increment lineOffset {} byteOffset {}, file {}", taskId,
lineOffset, byteOffset, fileName);
} else {
offset = 0;
LOGGER.info("getInitLineOffset taskId {} for new all read from 0 file {}", taskId, fileName);
lineOffset = 0;
byteOffset = 0;
LOGGER.info("initOffset taskId {} for new all read lineOffset {} byteOffset {} file {}", taskId,
lineOffset, byteOffset, fileName);
}
}
return offset;
linePosition = lineOffset;
bytePosition = byteOffset;
}

public File getFile() {
Expand All @@ -202,9 +221,9 @@ private long getBytePositionByLine(long linePosition) throws IOException {
try {
input = new RandomAccessFile(file, "r");
while (readCount < linePosition) {
List<byte[]> lines = new ArrayList<>();
List<SourceData> lines = new ArrayList<>();
pos = readLines(input, pos, lines, Math.min((int) (linePosition - readCount), BATCH_READ_LINE_COUNT),
BATCH_READ_LINE_TOTAL_LEN, true);
BATCH_READ_LINE_TOTAL_LEN);
readCount += lines.size();
if (lines.size() == 0) {
LOGGER.error("getBytePositionByLine LineNum {} larger than the real file");
Expand All @@ -229,8 +248,8 @@ private long getBytePositionByLine(long linePosition) throws IOException {
* @return The new position after the lines have been read
* @throws IOException if an I/O error occurs.
*/
private long readLines(RandomAccessFile reader, long pos, List<byte[]> lines, int maxLineCount, int maxLineTotalLen,
boolean isCounting)
private long readLines(RandomAccessFile reader, long pos, List<SourceData> lines, int maxLineCount,
int maxLineTotalLen)
throws IOException {
if (maxLineCount == 0) {
return pos;
Expand All @@ -248,13 +267,10 @@ private long readLines(RandomAccessFile reader, long pos, List<byte[]> lines, in
byte ch = bufferToReadFile[i];
switch (ch) {
case '\n':
if (isCounting) {
lines.add(null);
} else {
lines.add(baos.toByteArray());
lineTotalLen += baos.size();
}
linePosition++;
rePos = pos + i + 1;
lines.add(new SourceData(baos.toByteArray(), getOffsetString(linePosition, rePos)));
lineTotalLen += baos.size();
if (overLen) {
LOGGER.warn("readLines over len finally string len {}",
new String(baos.toByteArray()).length());
Expand Down Expand Up @@ -297,6 +313,19 @@ private long readLines(RandomAccessFile reader, long pos, List<byte[]> lines, in
return rePos;
}

private String getOffsetString(Long lineOffset, Long byteOffset) {
return lineOffset + OFFSET_SEP + byteOffset;
}

private FileOffset parseFIleOffset(String offset) {
String[] offsetArray = offset.split(OFFSET_SEP);
if (offsetArray.length == LEN_OF_FILE_OFFSET_ARRAY) {
return new FileOffset(Long.parseLong(offsetArray[0]), Long.parseLong(offsetArray[1]), true);
} else {
return new FileOffset(Long.parseLong(offsetArray[0]), null, false);
}
}

private boolean isInodeChanged() {
if (AgentUtils.getCurrentTime() - lastInodeUpdateTime > INODE_UPDATE_INTERVAL_MS) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;

import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT;
Expand Down Expand Up @@ -74,13 +75,18 @@ public static void setup() {
OffsetManager.init(taskBasicStore, instanceBasicStore, offsetBasicStore);
}

private LogFileSource getSource(int taskId, long offset) {
private LogFileSource getSource(int taskId, long lineOffset, long byteOffset, String dataContentStyle,
boolean isOffSetNew) {
try {
String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern, "csv", false, "", "",
String pattern;
String fileName;
boolean retry;
fileName = LOADER.getResource("test/20230928_1.txt").getPath();
pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
retry = false;
TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern, dataContentStyle, retry, "", "",
TaskStateEnum.RUNNING, "D",
"GMT+8:00", null);
String fileName = LOADER.getResource("test/20230928_1.txt").getPath();
"GMT+8:00", Arrays.asList("ok"));
InstanceProfile instanceProfile = taskProfile.createInstanceProfile("",
fileName, taskProfile.getCycleUnit(), "20230928", AgentUtils.getCurrentTime());
instanceProfile.set(TaskConstants.INODE_INFO, FileDataUtils.getInodeInfo(instanceProfile.getInstanceId()));
Expand All @@ -91,17 +97,21 @@ private LogFileSource getSource(int taskId, long offset) {
Whitebox.setInternalState(source, "SIZE_OF_BUFFER_TO_READ_FILE", 2);
Whitebox.setInternalState(source, "EMPTY_CHECK_COUNT_AT_LEAST", 3);
Whitebox.setInternalState(source, "WAIT_TIMEOUT_MS", 10);
if (offset > 0) {
if (lineOffset > 0) {
String finalOffset = Long.toString(lineOffset);
if (isOffSetNew) {
finalOffset += LogFileSource.OFFSET_SEP + byteOffset;
}
OffsetProfile offsetProfile = new OffsetProfile(instanceProfile.getTaskId(),
instanceProfile.getInstanceId(),
Long.toString(offset), instanceProfile.get(INODE_INFO));
finalOffset, instanceProfile.get(INODE_INFO));
OffsetManager.getInstance().setOffset(offsetProfile);
}
source.init(instanceProfile);
source.start();
return source;
} catch (Exception e) {
LOGGER.error("source init error {}", e);
LOGGER.error("source init error", e);
Assert.assertTrue("source init error", false);
}
return null;
Expand All @@ -124,7 +134,7 @@ private void testFullRead() {
for (int i = 0; i < check.length; i++) {
srcLen += check[i].getBytes(StandardCharsets.UTF_8).length;
}
LogFileSource source = getSource(1, 0);
LogFileSource source = getSource(1, 0, 0, "csv", false);
Message msg = source.read();
int readLen = 0;
int cnt = 0;
Expand All @@ -149,7 +159,7 @@ private void testFullRead() {
}

private void testCleanQueue() {
LogFileSource source = getSource(2, 0);
LogFileSource source = getSource(2, 0, 0, "csv", false);
for (int i = 0; i < 2; i++) {
source.read();
}
Expand All @@ -160,16 +170,16 @@ private void testCleanQueue() {
}

private void testReadWithOffset() {
LogFileSource source = getSource(3, 1);
LogFileSource source = getSource(3, 1, 25, "csv", false);
for (int i = 0; i < 2; i++) {
Message msg = source.read();
Assert.assertTrue(msg != null);
Assert.assertEquals(new String(msg.getBody()), check[i + 1]);
}
Message msg = source.read();
Assert.assertTrue(msg == null);
source.destroy();

source = getSource(4, 3);
source = getSource(4, 3, 69, "csv", false);
msg = source.read();
Assert.assertTrue(msg == null);
source.destroy();
Expand Down
Loading