diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java index 5aebbc7d86..72b0cd704a 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java @@ -31,6 +31,9 @@ import org.apache.inlong.agent.plugin.task.file.FileDataUtils; import org.apache.inlong.agent.utils.AgentUtils; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,7 +55,20 @@ */ public class LogFileSource extends AbstractSource { + public static final int LEN_OF_FILE_OFFSET_ARRAY = 2; + + @Data + @AllArgsConstructor + @NoArgsConstructor + protected class FileOffset { + + private Long lineOffset; + private Long byteOffset; + private boolean hasByteOffset; + } + private static final Logger LOGGER = LoggerFactory.getLogger(LogFileSource.class); + public static final String OFFSET_SEP = ":"; private final Integer SIZE_OF_BUFFER_TO_READ_FILE = 64 * 1024; private final Long INODE_UPDATE_INTERVAL_MS = 1000L; private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); // 设置格式 @@ -86,8 +102,7 @@ protected void initSource(InstanceProfile profile) { file = new File(fileName); inodeInfo = profile.get(TaskConstants.INODE_INFO); lastInodeUpdateTime = AgentUtils.getCurrentTime(); - linePosition = getInitLineOffset(isIncrement, taskId, instanceId, inodeInfo); - bytePosition = getBytePositionByLine(linePosition); + initOffset(isIncrement, taskId, instanceId, inodeInfo); randomAccessFile = new RandomAccessFile(file, "r"); } catch (Exception ex) { stopRunning(); @@ -137,14 +152,9 @@ protected String getThreadName() { } private List readFromPos(long pos) throws IOException { - List lines = new ArrayList<>(); - List dataList = new ArrayList<>(); - bytePosition = readLines(randomAccessFile, pos, lines, BATCH_READ_LINE_COUNT, BATCH_READ_LINE_TOTAL_LEN, false); - for (int i = 0; i < lines.size(); i++) { - linePosition++; - dataList.add(new SourceData(lines.get(i), Long.toString(linePosition))); - } - return dataList; + List lines = new ArrayList<>(); + bytePosition = readLines(randomAccessFile, pos, lines, BATCH_READ_LINE_COUNT, BATCH_READ_LINE_TOTAL_LEN); + return lines; } private int getRealLineCount(String fileName) { @@ -157,30 +167,39 @@ private int getRealLineCount(String fileName) { } } - private long getInitLineOffset(boolean isIncrement, String taskId, String instanceId, String inodeInfo) { - long offset = 0; + private void initOffset(boolean isIncrement, String taskId, String instanceId, String inodeInfo) + throws IOException { + long lineOffset; + long byteOffset; if (offsetProfile != null && offsetProfile.getInodeInfo().compareTo(inodeInfo) == 0) { - offset = Long.parseLong(offsetProfile.getOffset()); - int fileLineCount = getRealLineCount(instanceId); - if (fileLineCount < offset) { - LOGGER.info("getInitLineOffset inode no change taskId {} file rotate, offset set to 0, file {}", taskId, - fileName); - offset = 0; + FileOffset fileOffset = parseFIleOffset(offsetProfile.getOffset()); + if (fileOffset.hasByteOffset) { + lineOffset = fileOffset.lineOffset; + byteOffset = fileOffset.byteOffset; + LOGGER.info("initOffset inode no change taskId {} restore lineOffset {} byteOffset {}, file {}", taskId, + lineOffset, byteOffset, fileName); } else { - LOGGER.info("getInitLineOffset inode no change taskId {} from offset store {}, file {}", taskId, offset, - fileName); + lineOffset = fileOffset.lineOffset; + byteOffset = getBytePositionByLine(lineOffset); + LOGGER.info("initOffset inode no change taskId {} restore lineOffset {} count byteOffset {}, file {}", + taskId, + lineOffset, byteOffset, fileName); } } else { if (isIncrement) { - offset = getRealLineCount(instanceId); - LOGGER.info("getInitLineOffset taskId {} for new increment read from {} file {}", taskId, - offset, fileName); + lineOffset = getRealLineCount(instanceId); + byteOffset = getBytePositionByLine(lineOffset); + LOGGER.info("initOffset taskId {} for new increment lineOffset {} byteOffset {}, file {}", taskId, + lineOffset, byteOffset, fileName); } else { - offset = 0; - LOGGER.info("getInitLineOffset taskId {} for new all read from 0 file {}", taskId, fileName); + lineOffset = 0; + byteOffset = 0; + LOGGER.info("initOffset taskId {} for new all read lineOffset {} byteOffset {} file {}", taskId, + lineOffset, byteOffset, fileName); } } - return offset; + linePosition = lineOffset; + bytePosition = byteOffset; } public File getFile() { @@ -202,9 +221,9 @@ private long getBytePositionByLine(long linePosition) throws IOException { try { input = new RandomAccessFile(file, "r"); while (readCount < linePosition) { - List lines = new ArrayList<>(); + List lines = new ArrayList<>(); pos = readLines(input, pos, lines, Math.min((int) (linePosition - readCount), BATCH_READ_LINE_COUNT), - BATCH_READ_LINE_TOTAL_LEN, true); + BATCH_READ_LINE_TOTAL_LEN); readCount += lines.size(); if (lines.size() == 0) { LOGGER.error("getBytePositionByLine LineNum {} larger than the real file"); @@ -229,8 +248,8 @@ private long getBytePositionByLine(long linePosition) throws IOException { * @return The new position after the lines have been read * @throws IOException if an I/O error occurs. */ - private long readLines(RandomAccessFile reader, long pos, List lines, int maxLineCount, int maxLineTotalLen, - boolean isCounting) + private long readLines(RandomAccessFile reader, long pos, List lines, int maxLineCount, + int maxLineTotalLen) throws IOException { if (maxLineCount == 0) { return pos; @@ -248,13 +267,10 @@ private long readLines(RandomAccessFile reader, long pos, List lines, in byte ch = bufferToReadFile[i]; switch (ch) { case '\n': - if (isCounting) { - lines.add(null); - } else { - lines.add(baos.toByteArray()); - lineTotalLen += baos.size(); - } + linePosition++; rePos = pos + i + 1; + lines.add(new SourceData(baos.toByteArray(), getOffsetString(linePosition, rePos))); + lineTotalLen += baos.size(); if (overLen) { LOGGER.warn("readLines over len finally string len {}", new String(baos.toByteArray()).length()); @@ -297,6 +313,19 @@ private long readLines(RandomAccessFile reader, long pos, List lines, in return rePos; } + private String getOffsetString(Long lineOffset, Long byteOffset) { + return lineOffset + OFFSET_SEP + byteOffset; + } + + private FileOffset parseFIleOffset(String offset) { + String[] offsetArray = offset.split(OFFSET_SEP); + if (offsetArray.length == LEN_OF_FILE_OFFSET_ARRAY) { + return new FileOffset(Long.parseLong(offsetArray[0]), Long.parseLong(offsetArray[1]), true); + } else { + return new FileOffset(Long.parseLong(offsetArray[0]), null, false); + } + } + private boolean isInodeChanged() { if (AgentUtils.getCurrentTime() - lastInodeUpdateTime > INODE_UPDATE_INTERVAL_MS) { try { diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java index 5d6871fecb..408b9f1b70 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java @@ -42,6 +42,7 @@ import org.slf4j.LoggerFactory; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.concurrent.TimeUnit; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT; @@ -74,13 +75,18 @@ public static void setup() { OffsetManager.init(taskBasicStore, instanceBasicStore, offsetBasicStore); } - private LogFileSource getSource(int taskId, long offset) { + private LogFileSource getSource(int taskId, long lineOffset, long byteOffset, String dataContentStyle, + boolean isOffSetNew) { try { - String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+"; - TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern, "csv", false, "", "", + String pattern; + String fileName; + boolean retry; + fileName = LOADER.getResource("test/20230928_1.txt").getPath(); + pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+"; + retry = false; + TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern, dataContentStyle, retry, "", "", TaskStateEnum.RUNNING, "D", - "GMT+8:00", null); - String fileName = LOADER.getResource("test/20230928_1.txt").getPath(); + "GMT+8:00", Arrays.asList("ok")); InstanceProfile instanceProfile = taskProfile.createInstanceProfile("", fileName, taskProfile.getCycleUnit(), "20230928", AgentUtils.getCurrentTime()); instanceProfile.set(TaskConstants.INODE_INFO, FileDataUtils.getInodeInfo(instanceProfile.getInstanceId())); @@ -91,17 +97,21 @@ private LogFileSource getSource(int taskId, long offset) { Whitebox.setInternalState(source, "SIZE_OF_BUFFER_TO_READ_FILE", 2); Whitebox.setInternalState(source, "EMPTY_CHECK_COUNT_AT_LEAST", 3); Whitebox.setInternalState(source, "WAIT_TIMEOUT_MS", 10); - if (offset > 0) { + if (lineOffset > 0) { + String finalOffset = Long.toString(lineOffset); + if (isOffSetNew) { + finalOffset += LogFileSource.OFFSET_SEP + byteOffset; + } OffsetProfile offsetProfile = new OffsetProfile(instanceProfile.getTaskId(), instanceProfile.getInstanceId(), - Long.toString(offset), instanceProfile.get(INODE_INFO)); + finalOffset, instanceProfile.get(INODE_INFO)); OffsetManager.getInstance().setOffset(offsetProfile); } source.init(instanceProfile); source.start(); return source; } catch (Exception e) { - LOGGER.error("source init error {}", e); + LOGGER.error("source init error", e); Assert.assertTrue("source init error", false); } return null; @@ -124,7 +134,7 @@ private void testFullRead() { for (int i = 0; i < check.length; i++) { srcLen += check[i].getBytes(StandardCharsets.UTF_8).length; } - LogFileSource source = getSource(1, 0); + LogFileSource source = getSource(1, 0, 0, "csv", false); Message msg = source.read(); int readLen = 0; int cnt = 0; @@ -149,7 +159,7 @@ private void testFullRead() { } private void testCleanQueue() { - LogFileSource source = getSource(2, 0); + LogFileSource source = getSource(2, 0, 0, "csv", false); for (int i = 0; i < 2; i++) { source.read(); } @@ -160,16 +170,16 @@ private void testCleanQueue() { } private void testReadWithOffset() { - LogFileSource source = getSource(3, 1); + LogFileSource source = getSource(3, 1, 25, "csv", false); for (int i = 0; i < 2; i++) { Message msg = source.read(); - Assert.assertTrue(msg != null); + Assert.assertEquals(new String(msg.getBody()), check[i + 1]); } Message msg = source.read(); Assert.assertTrue(msg == null); source.destroy(); - source = getSource(4, 3); + source = getSource(4, 3, 69, "csv", false); msg = source.read(); Assert.assertTrue(msg == null); source.destroy();