diff --git a/conf/smart-default.xml b/conf/smart-default.xml index cdcec656b81..ed711e436c8 100644 --- a/conf/smart-default.xml +++ b/conf/smart-default.xml @@ -425,17 +425,6 @@ - - smart.sync.schedule.strategy - UNORDERED - - Strategy of copying files during 'sync' rule. Possible values: - FIFO - the files created/modified first will be scheduled for transfer first - LIFO - the files created/modified last will be scheduled for transfer first - UNORDERED - no guarantees of the file scheduling order - - - smart.sync.file.equality.strategy CHECKSUM diff --git a/smart-action/src/main/java/org/smartdata/action/SyncAction.java b/smart-action/src/main/java/org/smartdata/action/SyncAction.java index 71f688558c6..b7dfb1b9699 100644 --- a/smart-action/src/main/java/org/smartdata/action/SyncAction.java +++ b/smart-action/src/main/java/org/smartdata/action/SyncAction.java @@ -38,6 +38,7 @@ public class SyncAction extends SmartAction { // related to remote cluster and fileDiff.src public static final String DEST = "-dest"; public static final String PRESERVE = "-preserve"; + public static final String BASE_OPERATION = "-baseOperation"; @Override protected void execute() throws Exception { diff --git a/smart-common/src/main/java/org/smartdata/conf/SmartConfKeys.java b/smart-common/src/main/java/org/smartdata/conf/SmartConfKeys.java index 273ad44c068..40a7c873db2 100644 --- a/smart-common/src/main/java/org/smartdata/conf/SmartConfKeys.java +++ b/smart-common/src/main/java/org/smartdata/conf/SmartConfKeys.java @@ -114,10 +114,6 @@ public class SmartConfKeys { "smart.dispatch.cmdlets.extra.num"; public static final int SMART_DISPATCH_CMDLETS_EXTRA_NUM_DEFAULT = 10; - public static final String SMART_SYNC_SCHEDULE_STRATEGY_KEY = "smart.sync.schedule.strategy"; - public static final String SMART_SYNC_SCHEDULE_STRATEGY_DEFAULT = "UNORDERED"; - - public static final String SMART_SYNC_FILE_EQUALITY_STRATEGY = "smart.sync.file.equality.strategy"; public static final String SMART_SYNC_FILE_EQUALITY_STRATEGY_DEFAULT = "CHECKSUM"; diff --git a/smart-common/src/main/java/org/smartdata/model/FileDiff.java b/smart-common/src/main/java/org/smartdata/model/FileDiff.java index a0abf7e02c1..1c83520c7bf 100644 --- a/smart-common/src/main/java/org/smartdata/model/FileDiff.java +++ b/smart-common/src/main/java/org/smartdata/model/FileDiff.java @@ -17,14 +17,16 @@ */ package org.smartdata.model; -import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; import java.util.HashMap; -import java.util.Iterator; import java.util.Map; -import java.util.Objects; +@Data +@AllArgsConstructor +@Builder(toBuilder = true) public class FileDiff { private long diffId; private long ruleId; @@ -51,119 +53,7 @@ public FileDiff(FileDiffType diffType, FileDiffState state) { this.state = state; } - public long getDiffId() { - return diffId; - } - - public void setDiffId(long diffId) { - this.diffId = diffId; - } - - public long getRuleId() { - return ruleId; - } - - public void setRuleId(long ruleId) { - this.ruleId = ruleId; - } - - public FileDiffType getDiffType() { - return diffType; - } - - public void setDiffType(FileDiffType diffType) { - this.diffType = diffType; - } - - public String getSrc() { - return src; - } - - public void setSrc(String src) { - this.src = src; - } - - public Map getParameters() { - return parameters; - } - - public void setParameters(Map parameters) { - this.parameters = parameters; - } - public void setParameter(String key, String value) { parameters.put(key, value); } - - public String getParametersJsonString() { - Gson gson = new Gson(); - return gson.toJson(parameters); - } - - public void setParametersFromJsonString(String jsonParameters) { - Gson gson = new Gson(); - parameters = gson.fromJson(jsonParameters, - new TypeToken>() { - }.getType()); - } - - public String getParametersString() { - StringBuffer ret = new StringBuffer(); - if (parameters.containsKey("-dest")) { - ret.append(String.format(" -dest %s", parameters.get("-dest"))); - parameters.remove("-dest"); - } - for (Iterator> it = parameters.entrySet().iterator(); it.hasNext();) { - Map.Entry entry = it.next(); - ret.append(String.format(" %s %s", entry.getKey(), entry.getValue())); - } - return ret.toString(); - } - - - public FileDiffState getState() { - return state; - } - - public void setState(FileDiffState state) { - this.state = state; - } - - public long getCreateTime() { - return createTime; - } - - public void setCreateTime(long createTime) { - this.createTime = createTime; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - FileDiff fileDiff = (FileDiff) o; - return diffId == fileDiff.diffId - && ruleId == fileDiff.ruleId - && createTime == fileDiff.createTime - && diffType == fileDiff.diffType - && Objects.equals(src, fileDiff.src) - && Objects.equals(parameters, fileDiff.parameters) - && state == fileDiff.state; - } - - @Override - public int hashCode() { - return Objects.hash(diffId, ruleId, diffType, src, parameters, state, createTime); - } - - @Override - public String toString() { - return String.format( - "FileDiff{diffId=%s, parameters=%s, src=%s, diffType=%s, state=%s, createTime=%s}", - diffId, parameters, src, diffType, state.getValue(), createTime); - } } diff --git a/smart-common/src/main/java/org/smartdata/model/FileInfo.java b/smart-common/src/main/java/org/smartdata/model/FileInfo.java index 8f269960bb7..516b2c4ce3b 100644 --- a/smart-common/src/main/java/org/smartdata/model/FileInfo.java +++ b/smart-common/src/main/java/org/smartdata/model/FileInfo.java @@ -17,15 +17,20 @@ */ package org.smartdata.model; -import java.util.Objects; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +@Data +@AllArgsConstructor +@Builder(setterPrefix = "set") public class FileInfo { private String path; private long fileId; private long length; - private boolean isdir; + private boolean isDir; private short blockReplication; - private long blocksize; + private long blockSize; private long modificationTime; private long accessTime; private short permission; @@ -33,302 +38,4 @@ public class FileInfo { private String group; private byte storagePolicy; private byte erasureCodingPolicy; - - public FileInfo(String path, long fileId, long length, boolean isdir, - short blockReplication, long blocksize, long modificationTime, - long accessTime, short permission, String owner, String group, - byte storagePolicy, byte erasureCodingPolicy) { - this.path = path; - this.fileId = fileId; - this.length = length; - this.isdir = isdir; - this.blockReplication = blockReplication; - this.blocksize = blocksize; - this.modificationTime = modificationTime; - this.accessTime = accessTime; - this.permission = permission; - this.owner = owner; - this.group = group; - this.storagePolicy = storagePolicy; - this.erasureCodingPolicy = erasureCodingPolicy; - } - - public String getPath() { - return path; - } - - public void setPath(String path) { - this.path = path; - } - - public long getFileId() { - return fileId; - } - - public void setFileId(long fileId) { - this.fileId = fileId; - } - - public long getLength() { - return length; - } - - public void setLength(long length) { - this.length = length; - } - - public boolean isdir() { - return isdir; - } - - public void setIsdir(boolean isdir) { - this.isdir = isdir; - } - - public short getBlockReplication() { - return blockReplication; - } - - public void setBlockReplication(short blockReplication) { - this.blockReplication = blockReplication; - } - - public long getBlocksize() { - return blocksize; - } - - public void setBlocksize(long blocksize) { - this.blocksize = blocksize; - } - - public long getModificationTime() { - return modificationTime; - } - - public void setModificationTime(long modificationTime) { - this.modificationTime = modificationTime; - } - - public long getAccessTime() { - return accessTime; - } - - public void setAccessTime(long accessTime) { - this.accessTime = accessTime; - } - - public short getPermission() { - return permission; - } - - public void setPermission(short permission) { - this.permission = permission; - } - - public String getOwner() { - return owner; - } - - public void setOwner(String owner) { - this.owner = owner; - } - - public String getGroup() { - return group; - } - - public void setGroup(String group) { - this.group = group; - } - - public byte getStoragePolicy() { - return storagePolicy; - } - - public void setStoragePolicy(byte storagePolicy) { - this.storagePolicy = storagePolicy; - } - - public byte getErasureCodingPolicy() { - return erasureCodingPolicy; - } - - public void setErasureCodingPolicy(byte erasureCodingPolicy) { - this.erasureCodingPolicy = erasureCodingPolicy; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - FileInfo fileInfo = (FileInfo) o; - return fileId == fileInfo.fileId - && length == fileInfo.length - && isdir == fileInfo.isdir - && blockReplication == fileInfo.blockReplication - && blocksize == fileInfo.blocksize - && modificationTime == fileInfo.modificationTime - && accessTime == fileInfo.accessTime - && permission == fileInfo.permission - && storagePolicy == fileInfo.storagePolicy - && erasureCodingPolicy == fileInfo.erasureCodingPolicy - && Objects.equals(path, fileInfo.path) - && Objects.equals(owner, fileInfo.owner) - && Objects.equals(group, fileInfo.group); - } - - @Override - public int hashCode() { - return Objects.hash( - path, - fileId, - length, - isdir, - blockReplication, - blocksize, - modificationTime, - accessTime, - permission, - owner, - group, - storagePolicy, - erasureCodingPolicy); - } - - public static Builder newBuilder() { - return new Builder(); - } - - @Override - public String toString() { - return String.format( - "FileInfo{path=\'%s\', fileId=%s, length=%s, isdir=%s, blockReplication=%s, " - + "blocksize=%s, modificationTime=%s, accessTime=%s, permission=%s, owner=\'%s\', " - + "group=\'%s\', storagePolicy=%s, erasureCodingPolicy=%s}", - path, - fileId, - length, - isdir, - blockReplication, - blocksize, - modificationTime, - accessTime, - permission, - owner, - group, - storagePolicy, - erasureCodingPolicy); - } - - public static class Builder { - private String path; - private long fileId; - private long length; - private boolean isdir; - private short blockReplication; - private long blocksize; - private long modificationTime; - private long accessTime; - private short permission; - private String owner; - private String group; - private byte storagePolicy; - private byte erasureCodingPolicy; - - public Builder setPath(String path) { - this.path = path; - return this; - } - - public Builder setFileId(long fileId) { - this.fileId = fileId; - return this; - } - - public Builder setLength(long length) { - this.length = length; - return this; - } - - public Builder setIsdir(boolean isdir) { - this.isdir = isdir; - return this; - } - - public Builder setBlockReplication(short blockReplication) { - this.blockReplication = blockReplication; - return this; - } - - public Builder setBlocksize(long blocksize) { - this.blocksize = blocksize; - return this; - } - - public Builder setModificationTime(long modificationTime) { - this.modificationTime = modificationTime; - return this; - } - - public Builder setAccessTime(long accessTime) { - this.accessTime = accessTime; - return this; - } - - public Builder setPermission(short permission) { - this.permission = permission; - return this; - } - - public Builder setOwner(String owner) { - this.owner = owner; - return this; - } - - public Builder setGroup(String group) { - this.group = group; - return this; - } - - public Builder setStoragePolicy(byte storagePolicy) { - this.storagePolicy = storagePolicy; - return this; - } - - public Builder setErasureCodingPolicy(byte erasureCodingPolicy) { - this.erasureCodingPolicy = erasureCodingPolicy; - return this; - } - - public FileInfo build() { - return new FileInfo(path, fileId, length, isdir, blockReplication, - blocksize, modificationTime, accessTime, permission, owner, - group, storagePolicy, erasureCodingPolicy); - } - - @Override - public String toString() { - return String.format( - "Builder{path=\'%s\', fileId=%s, length=%s, isdir=%s, blockReplication=%s, " - + "blocksize=%s, modificationTime=%s, accessTime=%s, permission=%s, owner=\'%s\', " - + "group=\'%s\', storagePolicy=%s, erasureCodingPolicy=%s}", - path, - fileId, - length, - isdir, - blockReplication, - blocksize, - modificationTime, - accessTime, - permission, - owner, - group, - storagePolicy, - erasureCodingPolicy); - } - } } diff --git a/smart-common/src/main/java/org/smartdata/model/FileInfoDiff.java b/smart-common/src/main/java/org/smartdata/model/FileInfoDiff.java index f341bf5fb5e..ecb0c5ca055 100644 --- a/smart-common/src/main/java/org/smartdata/model/FileInfoDiff.java +++ b/smart-common/src/main/java/org/smartdata/model/FileInfoDiff.java @@ -17,8 +17,11 @@ */ package org.smartdata.model; -import java.util.Objects; +import lombok.Data; +import lombok.experimental.Accessors; +@Data +@Accessors(chain = true) public class FileInfoDiff { private String path; private Long length; @@ -29,126 +32,5 @@ public class FileInfoDiff { private String owner; private String group; private Byte erasureCodingPolicy; - - public String getPath() { - return path; - } - - public FileInfoDiff setPath(String path) { - this.path = path; - return this; - } - - public Long getLength() { - return length; - } - - public FileInfoDiff setLength(Long length) { - this.length = length; - return this; - } - - public Short getBlockReplication() { - return blockReplication; - } - - public FileInfoDiff setBlockReplication(Short blockReplication) { - this.blockReplication = blockReplication; - return this; - } - - public Long getModificationTime() { - return modificationTime; - } - - public FileInfoDiff setModificationTime(Long modificationTime) { - this.modificationTime = modificationTime; - return this; - } - - public Long getAccessTime() { - return accessTime; - } - - public FileInfoDiff setAccessTime(Long accessTime) { - this.accessTime = accessTime; - return this; - } - - public Short getPermission() { - return permission; - } - - public FileInfoDiff setPermission(Short permission) { - this.permission = permission; - return this; - } - - public String getOwner() { - return owner; - } - - public FileInfoDiff setOwner(String owner) { - this.owner = owner; - return this; - } - - public String getGroup() { - return group; - } - - public FileInfoDiff setGroup(String group) { - this.group = group; - return this; - } - - public Byte getErasureCodingPolicy() { - return erasureCodingPolicy; - } - - public FileInfoDiff setErasureCodingPolicy(Byte erasureCodingPolicy) { - this.erasureCodingPolicy = erasureCodingPolicy; - return this; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - FileInfoDiff that = (FileInfoDiff) o; - return Objects.equals(path, that.path) - && Objects.equals(length, that.length) - && Objects.equals(blockReplication, that.blockReplication) - && Objects.equals(modificationTime, that.modificationTime) - && Objects.equals(accessTime, that.accessTime) - && Objects.equals(permission, that.permission) - && Objects.equals(owner, that.owner) - && Objects.equals(group, that.group) - && Objects.equals(erasureCodingPolicy, that.erasureCodingPolicy); - } - - @Override - public int hashCode() { - return Objects.hash(path, length, blockReplication, - modificationTime, accessTime, permission, owner, group, erasureCodingPolicy); - } - - @Override - public String toString() { - return "FileInfoDiff{" - + "path='" + path + '\'' - + ", length=" + length - + ", blockReplication=" + blockReplication - + ", modificationTime=" + modificationTime - + ", accessTime=" + accessTime - + ", permission=" + permission - + ", owner='" + owner + '\'' - + ", group='" + group + '\'' - + ", erasureCodingPolicy=" + erasureCodingPolicy - + '}'; - } + private Byte storagePolicy; } diff --git a/smart-common/src/main/java/org/smartdata/model/FileInfoMapper.java b/smart-common/src/main/java/org/smartdata/model/FileInfoMapper.java deleted file mode 100644 index fc56b81ea78..00000000000 --- a/smart-common/src/main/java/org/smartdata/model/FileInfoMapper.java +++ /dev/null @@ -1,182 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.smartdata.model; - -import java.util.HashMap; -import java.util.Map; -import java.util.Set; - -public class FileInfoMapper { - public static final String PATH = "path"; - public static final String FID = "fid"; - public static final String LENGTH = "length"; - public static final String BLOCK_REPLICATION = "block_replication"; - public static final String BLOCK_SIZE = "block_size"; - public static final String MODIFICATION_TIME = "modification_time"; - public static final String ACCESS_TIME = "access_time"; - public static final String IS_DIR = "is_dir"; - public static final String STORAGE_POLICY = "storage_policy"; - public static final String OWNER = "owner"; - public static final String GROUP = "group"; - public static final String PERMISSION = "permission"; - - private Map attrMap; - - public FileInfoMapper(Map attrMap) { - this.attrMap = attrMap; - } - - public Set getAttributesSpecified() { - return attrMap.keySet(); - } - - public String getPath() { - return (String) attrMap.get(PATH); - } - - public Long getFileId() { - return (Long) attrMap.get(FID); - } - - public Long getLength() { - return (Long) attrMap.get(LENGTH); - } - - public Boolean getIsdir() { - return (Boolean) attrMap.get(IS_DIR); - } - - public Short getBlock_replication() { - return (Short) attrMap.get(BLOCK_REPLICATION); - } - - public Long getBlocksize() { - return (Long) attrMap.get(BLOCK_SIZE); - } - - public Long getModification_time() { - return (Long) attrMap.get(MODIFICATION_TIME); - } - - public Long getAccess_time() { - return (Long) attrMap.get(ACCESS_TIME); - } - - public Short getPermission() { - return (Short) attrMap.get(PERMISSION); - } - - public String getOwner() { - return (String) attrMap.get(OWNER); - } - - public String getGroup(String group) { - return (String) attrMap.get(GROUP); - } - - public Byte getStoragePolicy(byte storagePolicy) { - return (Byte) attrMap.get(STORAGE_POLICY); - } - - public FileInfo toFileInfo() { - FileInfo.Builder builder = FileInfo.newBuilder(); - return builder.build(); - } - - public static Builder newBuilder() { - return new Builder(); - } - - @Override - public String toString() { - return String.format("FileInfoMapper{attrMap=%s}", attrMap); - } - - public static class Builder { - private Map attrMap = new HashMap<>(); - - public Builder setPath(String path) { - attrMap.put(FileInfoMapper.PATH, path); - return this; - } - - public Builder setFileId(long fileId) { - attrMap.put(FileInfoMapper.FID, fileId); - return this; - } - - public Builder setLength(long length) { - attrMap.put(FileInfoMapper.LENGTH, length); - return this; - } - - public Builder setIsdir(boolean isdir) { - attrMap.put(FileInfoMapper.IS_DIR, isdir); - return this; - } - - public Builder setBlockReplication(short blockReplication) { - attrMap.put(FileInfoMapper.BLOCK_REPLICATION, blockReplication); - return this; - } - - public Builder setBlocksize(long blocksize) { - attrMap.put(FileInfoMapper.BLOCK_SIZE, blocksize); - return this; - } - - public Builder setModificationTime(long modificationTime) { - attrMap.put(FileInfoMapper.MODIFICATION_TIME, modificationTime); - return this; - } - - public Builder setAccessTime(long accessTime) { - attrMap.put(FileInfoMapper.ACCESS_TIME, accessTime); - return this; - } - - public Builder setPermission(short permission) { - attrMap.put(FileInfoMapper.PERMISSION, permission); - return this; - } - - public Builder setOwner(String owner) { - attrMap.put(FileInfoMapper.OWNER, owner); - return this; - } - - public Builder setGroup(String group) { - attrMap.put(FileInfoMapper.GROUP, group); - return this; - } - - public Builder setStoragePolicy(byte storagePolicy) { - attrMap.put(FileInfoMapper.STORAGE_POLICY, storagePolicy); - return this; - } - - public FileInfoMapper build() { - return new FileInfoMapper(attrMap); - } - - @Override - public String toString() { - return String.format("Builder{attrMap=%s}", attrMap); - } - } -} diff --git a/smart-common/src/main/java/org/smartdata/model/NamespaceUpdater.java b/smart-common/src/main/java/org/smartdata/model/NamespaceUpdater.java deleted file mode 100644 index 8f43e5f4248..00000000000 --- a/smart-common/src/main/java/org/smartdata/model/NamespaceUpdater.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.smartdata.model; - -public interface NamespaceUpdater { - - void insertFile(FileInfo file); - - void insertFiles(FileInfo[] files); - - void updateFile(FileInfoMapper fileInfoMapper); - - void updateFiles(FileInfoMapper[] fileInfoMappers); - - void deleteFile(long fid); - - void deleteDirectory(String dirPath); - - void deleteAllFiles(); -} diff --git a/smart-common/src/main/java/org/smartdata/utils/FileDiffUtils.java b/smart-common/src/main/java/org/smartdata/utils/FileDiffUtils.java index 5cb7db3d10d..d783b884421 100644 --- a/smart-common/src/main/java/org/smartdata/utils/FileDiffUtils.java +++ b/smart-common/src/main/java/org/smartdata/utils/FileDiffUtils.java @@ -18,18 +18,20 @@ package org.smartdata.utils; import org.smartdata.model.FileDiff; +import org.smartdata.model.FileDiffType; public class FileDiffUtils { public static final String LENGTH_ARG = "-length"; public static final String OFFSET_ARG = "-offset"; public static final String DEST_ARG = "-dest"; + public static final String BASE_OPERATION = "-baseOperation"; public static String getParameter(FileDiff fileDiff, String parameter) { return fileDiff.getParameters().get(parameter); } - public static String getOffset(FileDiff fileDiff) { - return getParameter(fileDiff, OFFSET_ARG); + public static Long getOffset(FileDiff fileDiff) { + return Long.parseLong(getParameter(fileDiff, OFFSET_ARG)); } public static String getLength(FileDiff fileDiff) { @@ -39,4 +41,13 @@ public static String getLength(FileDiff fileDiff) { public static String getDest(FileDiff fileDiff) { return getParameter(fileDiff, DEST_ARG); } + + public static boolean isBaseOperation(FileDiff fileDiff) { + return getParameter(fileDiff, BASE_OPERATION) != null; + } + + public static boolean isCreateFileDiff(FileDiff fileDiff) { + return fileDiff.getDiffType() == FileDiffType.APPEND + && getOffset(fileDiff) == 0L; + } } diff --git a/smart-engine/src/main/java/org/smartdata/server/engine/CmdletManager.java b/smart-engine/src/main/java/org/smartdata/server/engine/CmdletManager.java index d592acc34a9..59903687529 100644 --- a/smart-engine/src/main/java/org/smartdata/server/engine/CmdletManager.java +++ b/smart-engine/src/main/java/org/smartdata/server/engine/CmdletManager.java @@ -23,6 +23,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.smartdata.AbstractService; @@ -565,7 +566,7 @@ private ScheduleResult scheduleCmdletActions(CmdletInfo info, LaunchCmdlet launc try { scheduleResult = scheduler.onSchedule(info, actionInfo, launchCmdlet, launchAction); } catch (Exception exception) { - actionInfo.appendLogLine("OnSchedule exception: " + exception); + actionInfo.appendLogLine("OnSchedule exception: " + ExceptionUtils.getStackTrace(exception)); scheduleResult = ScheduleResult.FAIL; } diff --git a/smart-engine/src/main/java/org/smartdata/server/engine/RuleManager.java b/smart-engine/src/main/java/org/smartdata/server/engine/RuleManager.java index af36f2ef015..97e43e3028f 100644 --- a/smart-engine/src/main/java/org/smartdata/server/engine/RuleManager.java +++ b/smart-engine/src/main/java/org/smartdata/server/engine/RuleManager.java @@ -64,8 +64,6 @@ import java.util.List; import java.util.concurrent.ConcurrentHashMap; -import static org.smartdata.conf.SmartConfKeys.SMART_SYNC_SCHEDULE_STRATEGY_DEFAULT; -import static org.smartdata.conf.SmartConfKeys.SMART_SYNC_SCHEDULE_STRATEGY_KEY; import static org.smartdata.model.WhitelistHelper.validatePathsCovered; import static org.smartdata.model.audit.UserActivityObject.RULE; import static org.smartdata.model.audit.UserActivityOperation.CREATE; @@ -125,13 +123,8 @@ public RuleManager( this.ruleInfoHandler = new RuleInfoHandler(ruleDao); this.pathChecker = new PathChecker(context.getConf()); - FileCopyScheduleStrategy copyScheduleStrategy = FileCopyScheduleStrategy.of( - context.getConf().get( - SMART_SYNC_SCHEDULE_STRATEGY_KEY, - SMART_SYNC_SCHEDULE_STRATEGY_DEFAULT)); - RuleExecutorPluginManager.addPlugin(new FileCopyDrPlugin( - context.getMetaStore(), copyScheduleStrategy)); + context.getMetaStore(), FileCopyScheduleStrategy.ordered())); RuleExecutorPluginManager.addPlugin(new FileCopy2S3Plugin()); RuleExecutorPluginManager.addPlugin(new SmallFilePlugin(context, cmdletManager)); RuleExecutorPluginManager.addPlugin(new ErasureCodingPlugin(context)); diff --git a/smart-engine/src/main/java/org/smartdata/server/engine/rule/copy/DiffCreationTimeCopyScheduleStrategy.java b/smart-engine/src/main/java/org/smartdata/server/engine/rule/copy/DiffCreationTimeCopyScheduleStrategy.java index 34217014961..f6951fd3a28 100644 --- a/smart-engine/src/main/java/org/smartdata/server/engine/rule/copy/DiffCreationTimeCopyScheduleStrategy.java +++ b/smart-engine/src/main/java/org/smartdata/server/engine/rule/copy/DiffCreationTimeCopyScheduleStrategy.java @@ -20,20 +20,6 @@ import java.util.List; public class DiffCreationTimeCopyScheduleStrategy implements FileCopyScheduleStrategy { - public DiffCreationTimeCopyScheduleStrategy(DiffSelectionStrategy diffSelectionStrategy) { - this.diffSelectionStrategy = diffSelectionStrategy; - } - - /** Behaviour in case if there are several pending diff for the same file. */ - public enum DiffSelectionStrategy { - /** Select pending diff with the earliest creation time. */ - EARLIEST, - /** Select pending diff with the latest creation time. */ - LATEST - } - - private final DiffSelectionStrategy diffSelectionStrategy; - @Override public String wrapGetFilesToCopyQuery(String query, List pathTemplates) { return "SELECT file_diff.src " @@ -49,12 +35,6 @@ public String wrapGetFilesToCopyQuery(String query, List pathTemplates) + ")) " // choose only one pending file_diff per file based on the provided strategy + "GROUP BY file_diff.src " - + "ORDER BY " + orderClause() + ";"; - } - - private String orderClause() { - return diffSelectionStrategy == DiffSelectionStrategy.EARLIEST - ? "MIN(create_time)" - : "MAX(create_time) DESC"; + + "ORDER BY MIN(create_time);"; } } diff --git a/smart-engine/src/main/java/org/smartdata/server/engine/rule/copy/FileCopyScheduleStrategy.java b/smart-engine/src/main/java/org/smartdata/server/engine/rule/copy/FileCopyScheduleStrategy.java index ca2390cefd1..6de8b0cd139 100644 --- a/smart-engine/src/main/java/org/smartdata/server/engine/rule/copy/FileCopyScheduleStrategy.java +++ b/smart-engine/src/main/java/org/smartdata/server/engine/rule/copy/FileCopyScheduleStrategy.java @@ -19,43 +19,15 @@ import org.smartdata.utils.StringUtil; -import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; -import static org.smartdata.server.engine.rule.copy.DiffCreationTimeCopyScheduleStrategy.DiffSelectionStrategy.EARLIEST; -import static org.smartdata.server.engine.rule.copy.DiffCreationTimeCopyScheduleStrategy.DiffSelectionStrategy.LATEST; public interface FileCopyScheduleStrategy { - enum Strategy { - UNORDERED, - FIFO, - LIFO - } - String wrapGetFilesToCopyQuery(String query, List pathTemplates); - static FileCopyScheduleStrategy of(String rawStrategyName) { - try { - Strategy strategy = Strategy.valueOf(rawStrategyName.toUpperCase()); - return of(strategy); - } catch (IllegalArgumentException exception) { - throw new IllegalArgumentException( - "Wrong file copy schedule strategy " - + rawStrategyName + ". Should be one of: " - + Arrays.toString(Strategy.values())); - } - } - - static FileCopyScheduleStrategy of(Strategy strategyName) { - switch (strategyName) { - case FIFO: - return new DiffCreationTimeCopyScheduleStrategy(EARLIEST); - case LIFO: - return new DiffCreationTimeCopyScheduleStrategy(LATEST); - default: - return new OrderAgnosticCopyScheduleStrategy(); - } + static FileCopyScheduleStrategy ordered() { + return new DiffCreationTimeCopyScheduleStrategy(); } static String pathTemplatesToSqlCondition(List pathTemplates) { diff --git a/smart-engine/src/main/java/org/smartdata/server/engine/rule/copy/OrderAgnosticCopyScheduleStrategy.java b/smart-engine/src/main/java/org/smartdata/server/engine/rule/copy/OrderAgnosticCopyScheduleStrategy.java deleted file mode 100644 index 1940d088243..00000000000 --- a/smart-engine/src/main/java/org/smartdata/server/engine/rule/copy/OrderAgnosticCopyScheduleStrategy.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.smartdata.server.engine.rule.copy; - -import java.util.List; - -public class OrderAgnosticCopyScheduleStrategy implements FileCopyScheduleStrategy { - @Override - public String wrapGetFilesToCopyQuery(String query, List pathTemplates) { - return query - // files that were removed from HDFS also are removed from the file table, - // so we need to take them into account during copy - + " UNION SELECT src FROM file_diff " - // select only pending file diffs - + "WHERE state = 0 " - // with type REMOVE or DELETE - + "AND diff_type IN (1,2) AND (" - + FileCopyScheduleStrategy.pathTemplatesToSqlCondition(pathTemplates) - + ");"; - } -} diff --git a/smart-engine/src/test/java/org/smartdata/server/engine/rule/copy/TestFileCopyScheduleStrategy.java b/smart-engine/src/test/java/org/smartdata/server/engine/rule/copy/TestFileCopyScheduleStrategy.java index 4970d0a4a75..e7ad203b82a 100644 --- a/smart-engine/src/test/java/org/smartdata/server/engine/rule/copy/TestFileCopyScheduleStrategy.java +++ b/smart-engine/src/test/java/org/smartdata/server/engine/rule/copy/TestFileCopyScheduleStrategy.java @@ -23,30 +23,12 @@ import java.util.Arrays; public class TestFileCopyScheduleStrategy { - @Test - public void checkOrderAgnosticCopyScheduleStrategy() { - FileCopyScheduleStrategy orderAgnosticStrategy = - FileCopyScheduleStrategy.of("UNORDERED"); - - String wrappedQuery = orderAgnosticStrategy.wrapGetFilesToCopyQuery( - "select * from test", - Arrays.asList("/path1/*", "/path2/*")); - - String expectedQuery = "select * from test " - + "UNION SELECT src FROM file_diff " - + "WHERE state = 0 AND diff_type IN (1,2) AND (" - + "src LIKE '/path1/%' OR src LIKE '/path2/%'" - + ");"; - - Assert.assertEquals(expectedQuery, wrappedQuery); - } - @Test public void checkFifoCopyScheduleStrategy() { - FileCopyScheduleStrategy orderAgnosticStrategy = - FileCopyScheduleStrategy.of("FIFO"); + FileCopyScheduleStrategy orderedScheduleStrategy = + FileCopyScheduleStrategy.ordered(); - String wrappedQuery = orderAgnosticStrategy.wrapGetFilesToCopyQuery( + String wrappedQuery = orderedScheduleStrategy.wrapGetFilesToCopyQuery( "select * from test", Arrays.asList("/path1/*", "/path2/*")); @@ -62,36 +44,4 @@ public void checkFifoCopyScheduleStrategy() { Assert.assertEquals(expectedQuery, wrappedQuery); } - - @Test - public void checkLifoCopyScheduleStrategy() { - FileCopyScheduleStrategy orderAgnosticStrategy = - FileCopyScheduleStrategy.of("LIFO"); - - String wrappedQuery = orderAgnosticStrategy.wrapGetFilesToCopyQuery( - "select * from test", - Arrays.asList("/path1/*", "/path2/*")); - - String expectedQuery = "SELECT file_diff.src " - + "FROM file_diff " - + "LEFT JOIN (select * from test) as q " - + "ON file_diff.src = q.path " - + "WHERE q.path IS NOT NULL OR " - + "(state = 0 AND diff_type IN (1,2) AND (" - + "src LIKE '/path1/%' OR src LIKE '/path2/%'" - + ")) GROUP BY file_diff.src " - + "ORDER BY MAX(create_time) DESC;"; - - Assert.assertEquals(expectedQuery, wrappedQuery); - } - - @Test - public void checkThrowIfWrongStrategyName() { - IllegalArgumentException exception = Assert.assertThrows( - IllegalArgumentException.class, - () -> FileCopyScheduleStrategy.of("unknown")); - - Assert.assertTrue(exception.getMessage() - .contains("Wrong file copy schedule strategy")); - } } diff --git a/smart-hadoop-support/smart-hadoop-common/src/main/java/org/smartdata/hdfs/HadoopUtil.java b/smart-hadoop-support/smart-hadoop-common/src/main/java/org/smartdata/hdfs/HadoopUtil.java index 433cf44398f..ee80438f494 100644 --- a/smart-hadoop-support/smart-hadoop-common/src/main/java/org/smartdata/hdfs/HadoopUtil.java +++ b/smart-hadoop-support/smart-hadoop-common/src/main/java/org/smartdata/hdfs/HadoopUtil.java @@ -261,13 +261,13 @@ public static URI getNameNodeUri(Configuration conf) } public static FileInfo convertFileStatus(HdfsFileStatus status, String path) { - return FileInfo.newBuilder() + return FileInfo.builder() .setPath(path) .setFileId(status.getFileId()) .setLength(status.getLen()) - .setIsdir(status.isDir()) + .setIsDir(status.isDir()) .setBlockReplication(status.getReplication()) - .setBlocksize(status.getBlockSize()) + .setBlockSize(status.getBlockSize()) .setModificationTime(status.getModificationTime()) .setAccessTime(status.getAccessTime()) .setPermission(status.getPermission().toShort()) diff --git a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/action/DeleteFileAction.java b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/action/DeleteFileAction.java index ffd7078643c..e2315c488c4 100644 --- a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/action/DeleteFileAction.java +++ b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/action/DeleteFileAction.java @@ -52,12 +52,12 @@ protected void preExecute() { @Override protected void execute(FileSystem fileSystem) throws Exception { - if (!fileSystem.exists(filePath)) { - throw new ActionException( - "DeleteFile Action fails, file doesn't exist!"); + boolean successfullyDeleted = fileSystem.delete(filePath, true); + if (!successfullyDeleted && fileSystem.exists(filePath)) { + throw new ActionException("File was not deleted: " + filePath); } - fileSystem.delete(filePath, true); + appendLog("File successfully deleted: " + filePath); } } diff --git a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/metric/fetcher/FileDiffGenerator.java b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/metric/fetcher/FileDiffGenerator.java new file mode 100644 index 00000000000..34b029ed779 --- /dev/null +++ b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/metric/fetcher/FileDiffGenerator.java @@ -0,0 +1,265 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.smartdata.hdfs.metric.fetcher; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.hadoop.hdfs.inotify.Event; +import org.smartdata.action.SyncAction; +import org.smartdata.hdfs.action.CopyFileAction; +import org.smartdata.metastore.MetaStore; +import org.smartdata.metastore.MetaStoreException; +import org.smartdata.model.FileDiff; +import org.smartdata.model.FileDiffState; +import org.smartdata.model.FileDiffType; +import org.smartdata.model.FileInfo; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.BiFunction; +import java.util.function.Supplier; + +import static org.smartdata.action.SyncAction.BASE_OPERATION; +import static org.smartdata.hdfs.action.MetaDataAction.BLOCK_REPLICATION; +import static org.smartdata.hdfs.action.MetaDataAction.GROUP_NAME; +import static org.smartdata.hdfs.action.MetaDataAction.MTIME; +import static org.smartdata.hdfs.action.MetaDataAction.OWNER_NAME; +import static org.smartdata.hdfs.action.MetaDataAction.PERMISSION; +import static org.smartdata.utils.PathUtil.addPathSeparator; + +@Slf4j +@RequiredArgsConstructor +public class FileDiffGenerator { + + private final MetaStore metaStore; + private final Supplier currentTimeMsSupplier; + + public Optional onFileCreate(FileInfo file) throws MetaStoreException { + if (!inBackup(file.getPath())) { + return Optional.empty(); + } + + FileDiff createDiff = toCreateDiff(file, file.getPath()); + return Optional.of(createDiff); + } + + public Optional onFileClose(Event.CloseEvent closeEvent) throws MetaStoreException { + if (!inBackup(closeEvent.getPath())) { + return Optional.empty(); + } + + FileInfo fileInfo = metaStore.getFile(closeEvent.getPath()); + long currentLength = Optional.ofNullable(fileInfo) + .map(FileInfo::getLength) + .orElse(0L); + + if (currentLength == closeEvent.getFileSize()) { + return Optional.empty(); + } + + FileDiff fileDiff = toAppendDiff(closeEvent.getPath(), currentLength, closeEvent.getFileSize()); + return Optional.of(fileDiff); + } + + public Optional onMetadataUpdate(Event.MetadataUpdateEvent metadataUpdateEvent) + throws MetaStoreException { + if (!inBackup(metadataUpdateEvent.getPath())) { + return Optional.empty(); + } + + Map parameters = new HashMap<>(); + switch (metadataUpdateEvent.getMetadataType()) { + case TIMES: + if (metadataUpdateEvent.getMtime() > 0) { + parameters.put(MTIME, String.valueOf(metadataUpdateEvent.getMtime())); + } + break; + case OWNER: + Optional.ofNullable(metadataUpdateEvent.getOwnerName()) + .ifPresent(name -> parameters.put(OWNER_NAME, name)); + Optional.ofNullable(metadataUpdateEvent.getGroupName()) + .ifPresent(name -> parameters.put(GROUP_NAME, name)); + break; + case PERMS: + parameters.put(PERMISSION, String.valueOf(metadataUpdateEvent.getPerms().toShort())); + break; + case REPLICATION: + parameters.put(BLOCK_REPLICATION, String.valueOf(metadataUpdateEvent.getReplication())); + break; + default: + return Optional.empty(); + } + + if (parameters.isEmpty()) { + return Optional.empty(); + } + + FileDiff fileDiff = fileDiffBuilder(metadataUpdateEvent.getPath()) + .diffType(FileDiffType.METADATA) + .parameters(parameters) + .build(); + return Optional.of(fileDiff); + } + + public List onFileRename( + Event.RenameEvent renameEvent, FileInfo srcFileInfo) throws MetaStoreException { + boolean srcInBackup = inBackup(renameEvent.getSrcPath()); + boolean destInBackup = inBackup(renameEvent.getDstPath()); + + if (!srcInBackup && !destInBackup) { + return Collections.emptyList(); + } + + if (srcFileInfo == null) { + log.error( + "Inconsistency in metastore and HDFS namespace, file not found: {}", + renameEvent.getSrcPath()); + return Collections.emptyList(); + } + + List fileDiffs; + if (srcInBackup) { + if (destInBackup) { + // if both src and dest are in backup directory, + // then generate rename diffs for all content under src + fileDiffs = visitFileRecursively(srcFileInfo, renameEvent, this::buildRenameFileDiff); + } else { + // if src is in backup directory and dest isn't, + // then simply delete all files under src on remote cluster + fileDiffs = Collections.singletonList(getDeleteFileDiff(srcFileInfo.getPath())); + } + } else { + // if dest is in backup directory and src isn't, + // then simply copy files under dest to remote cluster + fileDiffs = visitFileRecursively(srcFileInfo, renameEvent, this::buildCreateFileDiff); + } + + if (fileDiffs.isEmpty()) { + log.error( + "Inconsistency in metastore and HDFS namespace, file not found: {}", + renameEvent.getSrcPath()); + return Collections.emptyList(); + } + // set first diff as base rename operation + fileDiffs.get(0).setParameter(BASE_OPERATION, ""); + return fileDiffs; + } + + public Optional onFileDelete(String path) + throws MetaStoreException { + if (!inBackup(path)) { + return Optional.empty(); + } + + FileDiff deleteFileDiff = getDeleteFileDiff(path); + return Optional.of(deleteFileDiff); + } + + private FileDiff toCreateDiff(FileInfo file, String path) { + return file.isDir() + ? toCreateDirectoryDiff(path) + : toCreateFileDiff(file, path); + } + + private FileDiff toCreateDirectoryDiff(String path) { + return fileDiffBuilder(path) + .diffType(FileDiffType.MKDIR) + .build(); + } + + private FileDiff toAppendDiff(String path, long currentLength, long newLength) { + Map parameters = new HashMap<>(); + parameters.put(CopyFileAction.OFFSET_INDEX, String.valueOf(currentLength)); + parameters.put(CopyFileAction.LENGTH, String.valueOf(newLength - currentLength)); + + return fileDiffBuilder(path) + .diffType(FileDiffType.APPEND) + .parameters(parameters) + .build(); + } + + private FileDiff toCreateFileDiff(FileInfo file, String path) { + Map parameters = new HashMap<>(); + parameters.put(CopyFileAction.OFFSET_INDEX, "0"); + parameters.put(CopyFileAction.LENGTH, String.valueOf(file.getLength())); + + return fileDiffBuilder(path) + .diffType(FileDiffType.APPEND) + .parameters(parameters) + .build(); + } + + private FileDiff buildRenameFileDiff(FileInfo fileInfo, Event.RenameEvent renameEvent) { + Map parameters = new HashMap<>(); + parameters.put( + SyncAction.DEST, + fileInfo.getPath().replaceFirst( + renameEvent.getSrcPath(), + renameEvent.getDstPath()) + ); + return fileDiffBuilder(fileInfo.getPath()) + .diffType(FileDiffType.RENAME) + .parameters(parameters) + .build(); + } + + private FileDiff getDeleteFileDiff(String path) { + return fileDiffBuilder(path) + .diffType(FileDiffType.DELETE) + .build(); + } + + private FileDiff buildCreateFileDiff(FileInfo fileInfo, Event.RenameEvent renameEvent) { + String newPath = fileInfo.getPath() + .replaceFirst(renameEvent.getSrcPath(), renameEvent.getDstPath()); + return toCreateDiff(fileInfo, newPath); + } + + private List visitFileRecursively( + FileInfo srcFileInfo, C context, + BiFunction diffProducer) + throws MetaStoreException { + List results = new ArrayList<>(); + results.add(diffProducer.apply(srcFileInfo, context)); + + if (srcFileInfo.isDir()) { + metaStore.getFilesByPrefixInOrder(addPathSeparator(srcFileInfo.getPath())) + .stream() + .map(fileInfo -> diffProducer.apply(fileInfo, context)) + .forEach(results::add); + } + + return results; + } + + private FileDiff.Builder fileDiffBuilder(String src) { + return FileDiff.builder() + .src(src) + .state(FileDiffState.PENDING) + .createTime(currentTimeMsSupplier.get()) + .parameters(new HashMap<>()); + } + + private boolean inBackup(String src) throws MetaStoreException { + return metaStore.srcInBackup(src); + } +} diff --git a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/metric/fetcher/InotifyEventApplier.java b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/metric/fetcher/InotifyEventApplier.java index 6411ca981ff..54304a2312a 100644 --- a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/metric/fetcher/InotifyEventApplier.java +++ b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/metric/fetcher/InotifyEventApplier.java @@ -29,381 +29,225 @@ import org.smartdata.hdfs.HadoopUtil; import org.smartdata.metastore.MetaStore; import org.smartdata.metastore.MetaStoreException; -import org.smartdata.model.BackUpInfo; import org.smartdata.model.FileDiff; -import org.smartdata.model.FileDiffType; import org.smartdata.model.FileInfo; import org.smartdata.model.FileInfoDiff; +import org.smartdata.model.PathChecker; import java.io.ByteArrayInputStream; import java.io.DataInputStream; -import java.util.ArrayList; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Arrays; import java.util.List; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.stream.Collectors; -import org.smartdata.model.PathChecker; +import java.util.Optional; /** * This is a very preliminary and buggy applier, can further enhance by referring to * {@link org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader} */ public class InotifyEventApplier { + private static final Logger LOG = + LoggerFactory.getLogger(InotifyEventApplier.class); + private static final String ROOT_DIRECTORY = "/"; + private static final String EC_POLICY_XATTR = "hdfs.erasurecoding.policy"; + + private static final byte DEFAULT_STORAGE_POLICY_ID = 7; + private static final byte DEFAULT_EC_POLICY_ID = 0; private final MetaStore metaStore; private final PathChecker pathChecker; - private DFSClient client; - private static final Logger LOG = - LoggerFactory.getLogger(InotifyEventFetcher.class); - private NamespaceFetcher namespaceFetcher; - - public InotifyEventApplier(MetaStore metaStore, DFSClient client) { - this(new SmartConf(), metaStore, client); - } + private final DFSClient client; + private final FileDiffGenerator fileDiffGenerator; - public InotifyEventApplier(SmartConf conf, MetaStore metaStore, DFSClient client, NamespaceFetcher namespaceFetcher) { - this(conf, metaStore, client); - this.namespaceFetcher = namespaceFetcher; + public InotifyEventApplier(SmartConf conf, MetaStore metaStore, DFSClient client) { + this(conf, metaStore, client, new FileDiffGenerator(metaStore, System::currentTimeMillis)); } - public InotifyEventApplier(SmartConf conf, MetaStore metaStore, DFSClient client) { + public InotifyEventApplier(SmartConf conf, MetaStore metaStore, + DFSClient client, FileDiffGenerator fileDiffGenerator) { this.metaStore = metaStore; this.client = client; this.pathChecker = new PathChecker(conf); + this.fileDiffGenerator = fileDiffGenerator; } - public void apply(List events) throws IOException, MetaStoreException, InterruptedException { + public void apply(List events) throws IOException, InterruptedException { for (Event event : events) { apply(event); } } - public void apply(Event[] events) throws IOException, MetaStoreException, InterruptedException { - this.apply(Arrays.asList(events)); + public void apply(Event[] events) throws IOException, InterruptedException { + apply(Arrays.asList(events)); } - private void apply(Event event) throws IOException, MetaStoreException, InterruptedException { - String path; - String srcPath, dstPath; - LOG.debug("Even Type = {}", event.getEventType()); + private void apply(Event event) throws IOException { + LOG.debug("Handle INotify event: {}", event); // we already filtered events in the fetch tasks, so we can skip // event's path check here switch (event.getEventType()) { case CREATE: - path = ((Event.CreateEvent) event).getPath(); - LOG.trace("event type: {}, path: {}", event.getEventType().name(), path); applyCreate((Event.CreateEvent) event); break; case CLOSE: - path = ((Event.CloseEvent) event).getPath(); - LOG.trace("event type: {}, path: {}", event.getEventType().name(), path); applyClose((Event.CloseEvent) event); break; case RENAME: - srcPath = ((Event.RenameEvent) event).getSrcPath(); - dstPath = ((Event.RenameEvent) event).getDstPath(); - LOG.trace("event type: {}, src path: {}, dest path: {}", - event.getEventType().name(), srcPath, dstPath); - applyRename((Event.RenameEvent)event); + applyRename((Event.RenameEvent) event); break; case METADATA: // The property dfs.namenode.accesstime.precision in HDFS's configuration controls // the precision of access time. Its default value is 1h. To avoid missing a // MetadataUpdateEvent for updating access time, a smaller value should be set. - path = ((Event.MetadataUpdateEvent)event).getPath(); - LOG.trace("event type: {}, path: {}", event.getEventType().name(), path); - applyMetadataUpdate((Event.MetadataUpdateEvent)event); + applyMetadataUpdate((Event.MetadataUpdateEvent) event); break; + case UNLINK: + applyUnlink((Event.UnlinkEvent) event); case APPEND: - path = ((Event.AppendEvent)event).getPath(); - LOG.trace("event type: {}, path: {}", event.getEventType().name(), path); - // do nothing break; - case UNLINK: - path = ((Event.UnlinkEvent)event).getPath(); - LOG.trace("event type: {}, path: {}", event.getEventType().name(), path); - applyUnlink((Event.UnlinkEvent)event); } } - //Todo: times and ec policy id, etc. - private void applyCreate(Event.CreateEvent createEvent) throws IOException, MetaStoreException { - FileInfo fileInfo = getFileInfo(createEvent.getPath()); - if (fileInfo == null) { - return; - } + private void applyCreate(Event.CreateEvent createEvent) throws IOException { + FileInfo fileInfo = fileBuilderWithPolicies(createEvent.getPath()) + .setPath(createEvent.getPath()) + .setIsDir(createEvent.getiNodeType() == Event.CreateEvent.INodeType.DIRECTORY) + .setBlockReplication((short) createEvent.getReplication()) + .setBlockSize(createEvent.getDefaultBlockSize()) + .setModificationTime(createEvent.getCtime()) + .setAccessTime(createEvent.getCtime()) + .setPermission(createEvent.getPerms().toShort()) + .setOwner(createEvent.getOwnerName()) + .setGroup(createEvent.getGroupName()) + .build(); + + fileDiffGenerator.onFileCreate(fileInfo) + .ifPresent(this::insertFileDiffUnchecked); - applyCreateFileDiff(fileInfo); metaStore.deleteFileByPath(fileInfo.getPath(), false); metaStore.deleteFileState(fileInfo.getPath()); - metaStore.insertFile(fileInfo); + metaStore.insertFile(fileInfo, true); + } + + private Optional getHdfsFileStatus(String path) throws IOException { + return Optional.ofNullable(client.getFileInfo(path)); + } + + private void insertFileDiffUnchecked(FileDiff fileDiff) { + try { + metaStore.insertFileDiff(fileDiff); + } catch (MetaStoreException e) { + throw new UncheckedIOException(e); + } } - private void applyRenameIgnoredFile(Event.RenameEvent renameEvent) throws IOException, MetaStoreException { + // TODO store ignored files to metastore as well + // to take info from metastore instead of fetching fileInfo from hdfs + private void applyRenameIgnoredFile(Event.RenameEvent renameEvent) throws IOException { FileInfo fileInfo = getFileInfo(renameEvent.getDstPath()); if (fileInfo == null) { + LOG.warn("Error getting info about file moved from ignored directory {}", + renameEvent.getDstPath()); return; } - applyCreateFileDiff(fileInfo); + fileDiffGenerator.onFileCreate(fileInfo) + .ifPresent(this::insertFileDiffUnchecked); + metaStore.deleteFileByPath(fileInfo.getPath(), false); - metaStore.insertFile(fileInfo); - metaStore.renameFile(renameEvent.getSrcPath(), renameEvent.getDstPath(), fileInfo.isdir()); + metaStore.insertFile(fileInfo, true); + metaStore.renameFile(renameEvent.getSrcPath(), renameEvent.getDstPath(), fileInfo.isDir()); } private FileInfo getFileInfo(String path) throws IOException { - HdfsFileStatus fileStatus = client.getFileInfo(path); - if (fileStatus == null) { - LOG.debug("Can not get HdfsFileStatus for file " + path); - return null; - } - - return HadoopUtil.convertFileStatus(fileStatus, path); - } - - private void applyCreateFileDiff(FileInfo fileInfo) throws MetaStoreException { - if (inBackup(fileInfo.getPath())) { - if (fileInfo.isdir()) { - FileDiff fileDiff = new FileDiff(FileDiffType.MKDIR); - fileDiff.setSrc(fileInfo.getPath()); - metaStore.insertFileDiff(fileDiff); - return; - } - FileDiff fileDiff = new FileDiff(FileDiffType.APPEND); - fileDiff.setSrc(fileInfo.getPath()); - fileDiff.getParameters().put("-offset", String.valueOf(0)); - // Note that "-length 0" means create an empty file - fileDiff.getParameters() - .put("-length", String.valueOf(fileInfo.getLength())); - // TODO add support in CopyFileAction or split into two file diffs - //add modification_time and access_time to filediff - fileDiff.getParameters().put("-mtime", "" + fileInfo.getModificationTime()); - // fileDiff.getParameters().put("-atime", "" + fileInfo.getAccessTime()); - //add owner to filediff - fileDiff.getParameters().put("-owner", "" + fileInfo.getOwner()); - fileDiff.getParameters().put("-group", "" + fileInfo.getGroup()); - //add Permission to filediff - fileDiff.getParameters().put("-permission", "" + fileInfo.getPermission()); - //add replication count to file diff - fileDiff.getParameters().put("-replication", "" + fileInfo.getBlockReplication()); - metaStore.insertFileDiff(fileDiff); - } - } - - private boolean inBackup(String src) throws MetaStoreException { - return metaStore.srcInBackup(src); + return getHdfsFileStatus(path) + .map(status -> HadoopUtil.convertFileStatus(status, path)) + .orElseGet(() -> { + LOG.warn("Error getting file status for file {}", path); + return null; + }); } //Todo: should update mtime? atime? private void applyClose(Event.CloseEvent closeEvent) throws MetaStoreException { - FileDiff fileDiff = new FileDiff(FileDiffType.APPEND); - fileDiff.setSrc(closeEvent.getPath()); - long newLen = closeEvent.getFileSize(); - long currLen; - // TODO make sure offset is correct - if (inBackup(closeEvent.getPath())) { - FileInfo fileInfo = metaStore.getFile(closeEvent.getPath()); - if (fileInfo == null) { - // TODO add metadata - currLen = 0; - } else { - currLen = fileInfo.getLength(); - } - if (currLen != newLen) { - fileDiff.getParameters().put("-offset", String.valueOf(currLen)); - fileDiff.getParameters() - .put("-length", String.valueOf(newLen - currLen)); - metaStore.insertFileDiff(fileDiff); - } - } + fileDiffGenerator.onFileClose(closeEvent) + .ifPresent(this::insertFileDiffUnchecked); + FileInfoDiff fileInfoDiff = new FileInfoDiff() .setLength(closeEvent.getFileSize()) .setModificationTime(closeEvent.getTimestamp()); metaStore.updateFileByPath(closeEvent.getPath(), fileInfoDiff); } - //Todo: should update mtime? atime? -// private String getTruncateSql(Event.TruncateEvent truncateEvent) { -// return String.format( -// "UPDATE file SET length = %s, modification_time = %s WHERE path = '%s';", -// truncateEvent.getFileSize(), truncateEvent.getTimestamp(), truncateEvent.getPath()); -// } - - private void applyRename(Event.RenameEvent renameEvent) - throws IOException, MetaStoreException, InterruptedException { + private void applyRename(Event.RenameEvent renameEvent) throws IOException { String src = renameEvent.getSrcPath(); String dest = renameEvent.getDstPath(); + // if the src is ignored, create new file with dest path if (pathChecker.isIgnored(src)) { applyRenameIgnoredFile(renameEvent); return; } - HdfsFileStatus status = client.getFileInfo(dest); - FileInfo info = metaStore.getFile(src); - - // For backup data to use. - generateFileDiff(renameEvent); + // if the dest is ignored, delete src info from file table + if (pathChecker.isIgnored(dest)) { + deleteFile(src); + return; + } - if (status == null) { - LOG.debug("Get rename dest status failed, {} -> {}", src, dest); + FileInfo srcFile = metaStore.getFile(src); + if (srcFile == null) { + LOG.error( + "Inconsistency in metastore and HDFS namespace, file not found: {}", + renameEvent.getSrcPath()); + return; } + + List fileDiffs = fileDiffGenerator.onFileRename(renameEvent, srcFile); + metaStore.insertFileDiffs(fileDiffs); + // The dest path which the src is renamed to should be checked in file table // to avoid duplicated record for one same path. FileInfo destInfo = metaStore.getFile(dest); if (destInfo != null) { metaStore.deleteFileByPath(dest, false); } - // src is not in file table because it is not fetched or other reason - if (info == null) { - if (status != null) { - //info = HadoopUtil.convertFileStatus(status, dest); - //metaStore.insertFile(info); - namespaceFetcher.startFetch(dest); - while(!namespaceFetcher.fetchFinished()) { - LOG.info("Fetching the files under " + dest); - Thread.sleep(100); - } - namespaceFetcher.stop(); - } - return; - } - - // if the dest is ignored, delete src info from file table - // TODO: tackle with file_state and small_state - if (pathChecker.isIgnored(dest)) { - // fuzzy matching is used to delete content under the dir - metaStore.deleteFileByPath(src, true); - return; - } - metaStore.renameFile(src, dest, info.isdir()); + metaStore.renameFile(src, dest, srcFile.isDir()); + maybeSetPolicies(srcFile, dest); } - private void generateFileDiff(Event.RenameEvent renameEvent) - throws MetaStoreException { - String src = renameEvent.getSrcPath(); - String dest = renameEvent.getDstPath(); - FileInfo info = metaStore.getFile(src); - // TODO: consider src or dest is ignored by SSM - if (inBackup(src)) { - // rename the file if the renamed file is still under the backup src dir - // if not, insert a delete file diff - if (inBackup(dest)) { - FileDiff fileDiff = new FileDiff(FileDiffType.RENAME); - fileDiff.setSrc(src); - fileDiff.getParameters().put("-dest", dest); - metaStore.insertFileDiff(fileDiff); - } else { - insertDeleteDiff(src, info.isdir()); - } - } else if (inBackup(dest)) { - // tackle such case: rename file from outside into backup dir - if (!info.isdir()) { - FileDiff fileDiff = new FileDiff(FileDiffType.APPEND); - fileDiff.setSrc(dest); - fileDiff.getParameters().put("-offset", String.valueOf(0)); - fileDiff.getParameters() - .put("-length", String.valueOf(info.getLength())); - metaStore.insertFileDiff(fileDiff); - } else { - List fileInfos = metaStore.getFilesByPrefix(src.endsWith("/") ? src : src + "/"); - for (FileInfo fileInfo : fileInfos) { - // TODO: cover subdir with no file case - if (fileInfo.isdir()) { - continue; - } - FileDiff fileDiff = new FileDiff(FileDiffType.APPEND); - fileDiff.setSrc(fileInfo.getPath().replaceFirst(src, dest)); - fileDiff.getParameters().put("-offset", String.valueOf(0)); - fileDiff.getParameters() - .put("-length", String.valueOf(fileInfo.getLength())); - metaStore.insertFileDiff(fileDiff); - } - } - } - } - - private void applyMetadataUpdate(Event.MetadataUpdateEvent metadataUpdateEvent) throws MetaStoreException { + private void applyMetadataUpdate( + Event.MetadataUpdateEvent metadataUpdateEvent) throws MetaStoreException { + fileDiffGenerator.onMetadataUpdate(metadataUpdateEvent) + .ifPresent(this::insertFileDiffUnchecked); - FileDiff fileDiff = null; - if (inBackup(metadataUpdateEvent.getPath())) { - fileDiff = new FileDiff(FileDiffType.METADATA); - fileDiff.setSrc(metadataUpdateEvent.getPath()); - } FileInfoDiff fileInfoUpdate = new FileInfoDiff(); switch (metadataUpdateEvent.getMetadataType()) { case TIMES: if (metadataUpdateEvent.getMtime() > 0) { - if (fileDiff != null) { - fileDiff.getParameters().put("-mtime", String.valueOf(metadataUpdateEvent.getMtime())); - // fileDiff.getParameters().put("-access_time", "" + metadataUpdateEvent.getAtime()); - metaStore.insertFileDiff(fileDiff); - } fileInfoUpdate.setModificationTime(metadataUpdateEvent.getMtime()); } if (metadataUpdateEvent.getAtime() > 0) { - // if (fileDiff != null) { - // fileDiff.getParameters().put("-access_time", "" + metadataUpdateEvent.getAtime()); - // metaStore.insertFileDiff(fileDiff); - // } fileInfoUpdate.setAccessTime(metadataUpdateEvent.getAtime()); } break; case OWNER: - if (fileDiff != null) { - fileDiff.getParameters().put("-owner", metadataUpdateEvent.getOwnerName()); - metaStore.insertFileDiff(fileDiff); - } fileInfoUpdate.setOwner(metadataUpdateEvent.getOwnerName()) .setGroup(metadataUpdateEvent.getGroupName()); break; case PERMS: - if (fileDiff != null) { - fileDiff.getParameters().put("-permission", "" + metadataUpdateEvent.getPerms().toShort()); - metaStore.insertFileDiff(fileDiff); - } fileInfoUpdate.setPermission(metadataUpdateEvent.getPerms().toShort()); break; case REPLICATION: - if (fileDiff != null) { - fileDiff.getParameters().put("-replication", "" + metadataUpdateEvent.getReplication()); - metaStore.insertFileDiff(fileDiff); - } fileInfoUpdate.setBlockReplication((short) metadataUpdateEvent.getReplication()); break; case XATTRS: - final String EC_POLICY = "hdfs.erasurecoding.policy"; - //Todo - if (LOG.isDebugEnabled()) { - String message = metadataUpdateEvent.getxAttrs() - .stream() - .map(XAttr::toString) - .collect(Collectors.joining("\n")); - LOG.debug(message); - } - // The following code should be executed merely on HDFS3.x. - for (XAttr xAttr : metadataUpdateEvent.getxAttrs()) { - if (xAttr.getName().equals(EC_POLICY)) { - try { - String ecPolicyName = WritableUtils.readString( - new DataInputStream(new ByteArrayInputStream(xAttr.getValue()))); - byte ecPolicyId = CompatibilityHelperLoader.getHelper(). - getErasureCodingPolicyByName(client, ecPolicyName); - if (ecPolicyId == (byte) -1) { - LOG.error("Unrecognized EC policy for updating!"); - } - fileInfoUpdate.setErasureCodingPolicy(ecPolicyId); - break; - } catch (IOException ex) { - LOG.error("Error occurred for updating ecPolicy!", ex); - } - } - } + getErasureCodingPolicyId(metadataUpdateEvent.getxAttrs()) + .ifPresent(fileInfoUpdate::setErasureCodingPolicy); break; case ACLS: return; @@ -412,108 +256,108 @@ private void applyMetadataUpdate(Event.MetadataUpdateEvent metadataUpdateEvent) } private void applyUnlink(Event.UnlinkEvent unlinkEvent) throws MetaStoreException { + deleteFile(unlinkEvent.getPath()); + } + + private void deleteFile(String path) throws MetaStoreException { + fileDiffGenerator.onFileDelete(path) + .ifPresent(this::insertFileDiffUnchecked); + // delete root, i.e., / - if (ROOT_DIRECTORY.equals(unlinkEvent.getPath())) { + if (ROOT_DIRECTORY.equals(path)) { LOG.warn("Deleting root directory!!!"); - insertDeleteDiff(ROOT_DIRECTORY, true); metaStore.unlinkRootDirectory(); return; } - String path = unlinkEvent.getPath(); // file has no "/" appended in the metaStore FileInfo fileInfo = metaStore.getFile(path.endsWith("/") ? path.substring(0, path.length() - 1) : path); if (fileInfo != null) { - insertDeleteDiff(unlinkEvent.getPath(), fileInfo.isdir()); - metaStore.unlinkFile(unlinkEvent.getPath(), fileInfo.isdir()); + metaStore.unlinkFile(path, fileInfo.isDir()); } } - // TODO: just insert a fileDiff for this kind of path. - // It seems that there is no need to see if path matches with one dir in FileInfo. - private void insertDeleteDiff(String path, boolean isDir) throws MetaStoreException { - if (isDir) { - path = path.endsWith("/") ? path.substring(0, path.length() - 1) : path; - List fileInfos = metaStore.getFilesByPrefix(path); - for (FileInfo fileInfo : fileInfos) { - if (fileInfo.isdir()) { - if (path.equals(fileInfo.getPath())) { - insertDeleteDiff(fileInfo.getPath()); - break; - } - } + private Optional getErasureCodingPolicyId(List xAttrs) { + for (XAttr xAttr : xAttrs) { + if (!EC_POLICY_XATTR.equals(xAttr.getName())) { + continue; } - } else { - insertDeleteDiff(path); - } - } - private void insertDeleteDiff(String path) throws MetaStoreException { - // TODO: remove "/" appended in src or dest in backup_file table - String pathWithSlash = path.endsWith("/") ? path : path + "/"; - if (inBackup(path)) { - List backUpInfos = metaStore.getBackUpInfoBySrc(pathWithSlash); - for (BackUpInfo backUpInfo : backUpInfos) { - String destPath = pathWithSlash.replaceFirst(backUpInfo.getSrc(), backUpInfo.getDest()); - try { - // tackle root path case - URI namenodeUri = new URI(destPath); - String root = "hdfs://" + namenodeUri.getHost() + ":" - + String.valueOf(namenodeUri.getPort()); - if (destPath.equals(root) || destPath.equals(root + "/") || destPath.equals("/")) { - for (String srcFilePath : getFilesUnderDir(pathWithSlash)) { - FileDiff fileDiff = new FileDiff(FileDiffType.DELETE); - fileDiff.setSrc(srcFilePath); - String destFilePath = srcFilePath.replaceFirst(backUpInfo.getSrc(), backUpInfo.getDest()); - fileDiff.getParameters().put("-dest", destFilePath); - metaStore.insertFileDiff(fileDiff); - } - } else { - FileDiff fileDiff = new FileDiff(FileDiffType.DELETE); - // use the path getting from event with no slash appended - fileDiff.setSrc(path); - // put sync's dest path in parameter for delete use - fileDiff.getParameters().put("-dest", destPath); - metaStore.insertFileDiff(fileDiff); - } - } catch (URISyntaxException e) { - LOG.error("Error occurs!", e); + try { + String ecPolicyName = WritableUtils.readString( + new DataInputStream(new ByteArrayInputStream(xAttr.getValue()))); + byte ecPolicyId = CompatibilityHelperLoader.getHelper(). + getErasureCodingPolicyByName(client, ecPolicyName); + if (ecPolicyId == (byte) -1) { + LOG.error("Unrecognized EC policy for updating: {}", ecPolicyId); } + return Optional.of(ecPolicyId); + } catch (IOException ex) { + LOG.error("Error occurred for updating ecPolicy!", ex); } } + + return Optional.empty(); } - private List getFilesUnderDir(String dir) throws MetaStoreException { - dir = dir.endsWith("/") ? dir : dir + "/"; - List fileList = new ArrayList<>(); - List subdirList = new ArrayList<>(); - // get fileInfo in asc order of path to guarantee that - // the subdir is tackled prior to files or dirs under it - List fileInfos = metaStore.getFilesByPrefixInOrder(dir); - for (FileInfo fileInfo : fileInfos) { - // just delete subdir instead of deleting all files under it - if (isUnderDir(fileInfo.getPath(), subdirList)) { - continue; - } - fileList.add(fileInfo.getPath()); - if (fileInfo.isdir()) { - subdirList.add(fileInfo.getPath()); - } + + /** @see InotifyEventApplier#fileBuilderWithPolicies */ + private void maybeSetPolicies(FileInfo oldFile, String newPath) throws IOException { + if (oldFile.getErasureCodingPolicy() != DEFAULT_EC_POLICY_ID + || oldFile.getStoragePolicy() != DEFAULT_STORAGE_POLICY_ID) { + // we don't need to update anything in case if policies were + // already applied during create event handling + return; } - return fileList; - } - private boolean isUnderDir(String path, List dirs) { - if (dirs.isEmpty()) { - return false; + Optional fileStatus = getHdfsFileStatus(newPath); + if (!fileStatus.isPresent()) { + LOG.warn("Error getting status for {} after rename", newPath); + return; } - for (String subdir : dirs) { - if (path.startsWith(subdir)) { - return true; - } + + FileInfoDiff fileInfoDiff = new FileInfoDiff() + .setStoragePolicy(fileStatus.get().getStoragePolicy()) + .setErasureCodingPolicy(CompatibilityHelperLoader.getHelper() + .getErasureCodingPolicy(fileStatus.get())); + metaStore.updateFileByPath(newPath, fileInfoDiff); + } + + /** + * Try to enrich FileInfo builder with EC and storage policies + * information from HDFS. In case if no information is found in HDFS for + * specified file, fallback to default policies. + * + * HDFS client will return null information about file in 2 cases: + * it was either deleted or renamed. In first case, there will be no issue, + * if we use fallback policies, because SSM will eventually remove file from its + * namespace copy, when the delete event arrive. + * In case of rename we will try to fetch policies info from HDFS again + * for the new name in the {@link InotifyEventApplier#maybeSetPolicies}. + * In case of failure in this method too, it only means, that file was deleted after rename. + */ + private FileInfo.Builder fileBuilderWithPolicies(String path) { + try { + return getHdfsFileStatus(path) + .map(status -> FileInfo.builder() + .setStoragePolicy(status.getStoragePolicy()) + .setErasureCodingPolicy( + CompatibilityHelperLoader.getHelper().getErasureCodingPolicy(status))) + .orElseGet(() -> { + LOG.warn("Can't enrich info about new file: {} not found", path); + return defaultFileBuilder(); + }); + } catch (IOException e) { + LOG.warn("Can't enrich info about new file: {}", path, e); + return defaultFileBuilder(); } - return false; + } + + private FileInfo.Builder defaultFileBuilder() { + return FileInfo.builder() + .setStoragePolicy(DEFAULT_STORAGE_POLICY_ID) + .setErasureCodingPolicy(DEFAULT_EC_POLICY_ID); } } diff --git a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/metric/fetcher/InotifyEventFetcher.java b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/metric/fetcher/InotifyEventFetcher.java index c272929fd73..6d187bdcb02 100644 --- a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/metric/fetcher/InotifyEventFetcher.java +++ b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/metric/fetcher/InotifyEventFetcher.java @@ -24,6 +24,7 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.squareup.tape.QueueFile; +import org.apache.hadoop.fs.impl.WrappedIOException; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSInotifyEventInputStream; import org.apache.hadoop.hdfs.inotify.Event; @@ -34,7 +35,6 @@ import org.smartdata.SmartConstants; import org.smartdata.conf.SmartConf; import org.smartdata.conf.SmartConfKeys; - import org.smartdata.metastore.MetaStore; import org.smartdata.metastore.MetaStoreException; import org.smartdata.model.SystemInfo; @@ -42,6 +42,7 @@ import java.io.File; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -84,7 +85,7 @@ public InotifyEventFetcher(DFSClient client, MetaStore metaStore, this.conf = conf; this.nameSpaceFetcher = new NamespaceFetcher(client, metaStore, null, conf); this.applier = applier == null - ? new InotifyEventApplier(conf, metaStore, client, nameSpaceFetcher) + ? new InotifyEventApplier(conf, metaStore, client) : applier; } @@ -96,7 +97,7 @@ public void start() throws IOException { Long lastTxid = getLastTxid(); //If whitelist is changed, the whole namespace will be fetched when servers restart if (lastTxid != null && lastTxid != -1 && canContinueFromLastTxid(client, lastTxid) - && !isWhitelistChanged(conf, metaStore)) { + && !isWhitelistChanged(conf, metaStore)) { startFromLastTxid(lastTxid); } else { startWithFetchingNameSpace(); @@ -149,7 +150,7 @@ private void startWithFetchingNameSpace() throws IOException { LOG.info("Start fetching namespace with current edit log txid = " + startId); nameSpaceFetcher.startFetch(); inotifyFetchFuture = scheduledExecutorService.scheduleAtFixedRate( - new InotifyFetchTask(queueFile, client, startId), 0, 100, TimeUnit.MILLISECONDS); + new InotifyFetchTask(queueFile, client, startId), 0, 100, TimeUnit.MILLISECONDS); eventApplyTask = new EventApplyTask(nameSpaceFetcher, applier, queueFile, startId, conf); ListenableFuture future = listeningExecutorService.submit(eventApplyTask); Futures.addCallback(future, new NameSpaceFetcherCallBack(), scheduledExecutorService); @@ -204,7 +205,7 @@ public void stop() { if (inotifyFetchFuture != null) { inotifyFetchFuture.cancel(true); } - if (fetchAndApplyFuture != null){ + if (fetchAndApplyFuture != null) { fetchAndApplyFuture.cancel(true); } nameSpaceFetcher.stop(); @@ -294,7 +295,7 @@ public void run() { break; } } - } catch (InterruptedException | IOException e) { + } catch (InterruptedException | IOException | UncheckedIOException e) { LOG.error("Inotify dequeue error", e); } } diff --git a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/metric/fetcher/InotifyFetchAndApplyTask.java b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/metric/fetcher/InotifyFetchAndApplyTask.java index 018ef4d485f..7d02e9419c8 100644 --- a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/metric/fetcher/InotifyFetchAndApplyTask.java +++ b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/metric/fetcher/InotifyFetchAndApplyTask.java @@ -29,7 +29,7 @@ import org.smartdata.model.SystemInfo; import java.io.IOException; -import java.util.Date; +import java.time.LocalDateTime; import java.util.concurrent.atomic.AtomicLong; public class InotifyFetchAndApplyTask implements Runnable { @@ -42,8 +42,7 @@ public class InotifyFetchAndApplyTask implements Runnable { private final INotifyEventFilter eventFilter; public InotifyFetchAndApplyTask(DFSClient client, MetaStore metaStore, - InotifyEventApplier applier, long startId, SmartConf conf) - throws IOException { + InotifyEventApplier applier, long startId, SmartConf conf) throws IOException { this.applier = applier; this.metaStore = metaStore; this.lastId = new AtomicLong(startId); @@ -53,7 +52,7 @@ public InotifyFetchAndApplyTask(DFSClient client, MetaStore metaStore, @Override public void run() { - LOG.trace("InotifyFetchAndApplyTask run at " + new Date()); + LOG.debug("InotifyFetchAndApplyTask run at {}", LocalDateTime.now()); try { EventBatch eventBatch = inotifyEventInputStream.poll(); while (eventBatch != null) { @@ -72,8 +71,4 @@ public void run() { LOG.error("Inotify Apply Events error", t); } } - - public long getLastId() { - return this.lastId.get(); - } } diff --git a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/metric/fetcher/NamespaceFetcher.java b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/metric/fetcher/NamespaceFetcher.java index 158eb08dfbc..f661478e223 100644 --- a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/metric/fetcher/NamespaceFetcher.java +++ b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/metric/fetcher/NamespaceFetcher.java @@ -138,27 +138,6 @@ public static void init(String dir) { IngestionTask.init(dir); } - /* - startFetch(dir) is used to restart fetcher to fetch one specific dir. - In rename event, when src is not in file table because it is not fetched or other reason, - dest should be fetched by using startFetch(dest). - */ - public void startFetch(String dir) { - init(dir); - this.fetchTaskFutures = new ScheduledFuture[ingestionTasks.length]; - for (int i = 0; i < ingestionTasks.length; i++) { - fetchTaskFutures[i] = this.scheduledExecutorService.scheduleAtFixedRate( - ingestionTasks[i], 0, fetchInterval, TimeUnit.MILLISECONDS); - } - - this.consumerFutures = new ScheduledFuture[consumers.length]; - for (int i = 0; i < consumers.length; i++) { - consumerFutures[i] = this.scheduledExecutorService.scheduleAtFixedRate( - consumers[i], 0, fetchInterval, TimeUnit.MILLISECONDS); - } - LOG.info("Start fetch the given dir."); - } - public boolean fetchFinished() { return IngestionTask.finished(); } diff --git a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/CopyScheduler.java b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/CopyScheduler.java index ff3c59434db..e95afe31192 100644 --- a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/CopyScheduler.java +++ b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/CopyScheduler.java @@ -36,7 +36,6 @@ import org.smartdata.metastore.MetaStore; import org.smartdata.metastore.MetaStoreException; import org.smartdata.model.ActionInfo; -import org.smartdata.model.BackUpInfo; import org.smartdata.model.CmdletInfo; import org.smartdata.model.CompressionFileState; import org.smartdata.model.FileDiff; @@ -50,6 +49,7 @@ import java.io.IOException; import java.net.URI; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -85,11 +85,17 @@ import static org.smartdata.conf.SmartConfKeys.SMART_COPY_SCHEDULER_DIFF_CACHE_SYNC_THRESHOLD_KEY; import static org.smartdata.conf.SmartConfKeys.SMART_COPY_SCHEDULER_FILE_DIFF_ARCHIVE_SIZE_DEFAULT; import static org.smartdata.conf.SmartConfKeys.SMART_COPY_SCHEDULER_FILE_DIFF_ARCHIVE_SIZE_KEY; +import static org.smartdata.model.FileDiffState.APPLIED; +import static org.smartdata.model.FileDiffState.MERGED; +import static org.smartdata.model.FileDiffState.PENDING; +import static org.smartdata.model.FileDiffState.isTerminalState; import static org.smartdata.model.FileDiffType.DELETE; import static org.smartdata.utils.ConfigUtil.toRemoteClusterConfig; import static org.smartdata.utils.FileDiffUtils.getDest; import static org.smartdata.utils.FileDiffUtils.getLength; import static org.smartdata.utils.FileDiffUtils.getOffset; +import static org.smartdata.utils.FileDiffUtils.isBaseOperation; +import static org.smartdata.utils.FileDiffUtils.isCreateFileDiff; import static org.smartdata.utils.PathUtil.pathStartsWith; public class CopyScheduler extends ActionSchedulerService { @@ -332,7 +338,7 @@ private boolean requireWait(FileDiff fileDiff) { if (fileDiff.getDiffId() == archiveDiff.getDiffId()) { break; } - if (!FileDiffState.isTerminalState(archiveDiff.getState())) { + if (!isTerminalState(archiveDiff.getState())) { if (pathStartsWith(fileDiff.getSrc(), archiveDiff.getSrc()) || pathStartsWith(archiveDiff.getSrc(), fileDiff.getSrc())) { return true; @@ -564,7 +570,7 @@ private FileDiff runFileInitialSync(String src, String dest) throws MetaStoreExc mergePendingDiffs(src); - if (srcFileInfo.isdir()) { + if (srcFileInfo.isDir()) { FileDiff fileDiff = new FileDiff(FileDiffType.MKDIR, FileDiffState.PENDING); fileDiff.setSrc(src); return fileDiff; @@ -674,7 +680,7 @@ private synchronized void pushCacheToDB() throws MetaStoreException { .map(fileDiffCache::get) .filter(Objects::nonNull) .peek(updatedFileDiffs::add) - .filter(diff -> FileDiffState.isTerminalState(diff.getState())) + .filter(diff -> isTerminalState(diff.getState())) .map(FileDiff::getDiffId) .collect(Collectors.toList()); @@ -792,7 +798,7 @@ private void addToFileDiffArchive(FileDiff newFileDiff) { fileDiffArchive.add(newFileDiff); int index = 0; while (fileDiffArchive.size() > fileDiffArchiveSize && index < fileDiffArchiveSize) { - if (FileDiffState.isTerminalState(fileDiffArchive.get(index).getState())) { + if (isTerminalState(fileDiffArchive.get(index).getState())) { fileDiffArchive.remove(index); continue; } @@ -817,13 +823,13 @@ private class FileChain { // Current file path/name private final String filePath; // file diff id - private final List diffChain; + private final Queue diffChain; // append file diff id - private final List appendChain; + private final Queue appendChain; FileChain(String filePath) { - this.diffChain = new ArrayList<>(); - this.appendChain = new ArrayList<>(); + this.diffChain = new ArrayDeque<>(); + this.appendChain = new ArrayDeque<>(); this.currAppendLength = 0; this.filePath = filePath; } @@ -839,10 +845,9 @@ void addToChain(FileDiff fileDiff) throws MetaStoreException { switch (fileDiff.getDiffType()) { case APPEND: - String offset = getOffset(fileDiff); // check if it's actually a create event and we have previous // events connected with this path - if (offset != null && offset.equals("0") && !diffChain.isEmpty()) { + if (isCreateFileDiff(fileDiff) && !diffChain.isEmpty()) { // mark previous events connected with this path as merged mergeAllDiffs(); } @@ -859,15 +864,7 @@ void addToChain(FileDiff fileDiff) throws MetaStoreException { diffChain.add(diffId); break; case RENAME: - if (isRenameSyncedFile(fileDiff)) { - // Add New Name to Name Chain - mergeRename(fileDiff); - break; - } - fileDiffsToTerminate.add(fileDiff); - // discard rename file diff due to not synced - updateFileDiffInCache(fileDiff.getDiffId(), FileDiffState.FAILED); - discardDirtyData(fileDiff); + mergeRename(fileDiff); break; case DELETE: mergeDelete(fileDiff); @@ -878,19 +875,55 @@ void addToChain(FileDiff fileDiff) throws MetaStoreException { } } - void discardDirtyData(FileDiff fileDiff) throws MetaStoreException { - // Clean dirty data - List backUpInfos = metaStore.getBackUpInfoBySrc(fileDiff.getSrc()); - for (BackUpInfo backUpInfo : backUpInfos) { - FileDiff deleteFileDiff = new FileDiff(DELETE, FileDiffState.PENDING); - // use the rename file diff's src as delete file diff src - deleteFileDiff.setSrc(fileDiff.getSrc()); - String destPath = deleteFileDiff.getSrc().replaceFirst(backUpInfo.getSrc(), backUpInfo.getDest()); - //put sync's dest path in parameter for delete use - deleteFileDiff.getParameters().put("-dest", destPath); - long did = metaStore.insertFileDiff(deleteFileDiff); - deleteFileDiff.setDiffId(did); - fileDiffArchive.add(deleteFileDiff); + void mergeRename(FileDiff renameDiff) throws MetaStoreException { + if (fileLocks.contains(filePath)) { + return; + } + + LOG.debug("Rename chain merge triggered for path {}", filePath); + boolean hasCreateDiff = false; + + fileLocks.add(filePath); + try { + while (!diffChain.isEmpty()) { + long diffId = removeHead(); + + FileDiff diff = fileDiffCache.get(diffId); + if (diff == null + // we don't want to delete renamed file in case + // if there are appends after delete diff + || diff.getDiffType() == DELETE + || diff.getState() == APPLIED + || diff.getState() == MERGED) { + continue; + } + + fileDiffsToTerminate.add(diff); + updateFileDiffInCache(diffId, FileDiffState.FAILED); + + FileDiff newDiff = diff.toBuilder() + .src(getDest(renameDiff)) + .state(PENDING) + .createTime(System.currentTimeMillis()) + .build(); + + long newDiffId = metaStore.insertFileDiff(newDiff); + newDiff.setDiffId(newDiffId); + + hasCreateDiff |= isCreateFileDiff(diff); + } + + // if chain had create file diff, then the file is not yet present, + // so we can't add rename event. Simple create diff path replacement is sufficient + if (!hasCreateDiff && isBaseOperation(renameDiff)) { + diffChain.add(renameDiff.getDiffId()); + } else { + // mark rename event as applied, because we've already + // changed create (append) event path + updateFileDiffInCache(renameDiff.getDiffId(), FileDiffState.APPLIED); + } + } finally { + fileLocks.remove(filePath); } } @@ -912,7 +945,7 @@ void mergeAppend() throws MetaStoreException { continue; } - long currOffset = Long.parseLong(getOffset(fileDiff)); + long currOffset = getOffset(fileDiff); offset = Math.min(offset, currOffset); @@ -948,7 +981,7 @@ void mergeDelete(FileDiff fileDiff) throws MetaStoreException { if (archiveDiff.getDiffId() == fileDiff.getDiffId()) { break; } - if (FileDiffState.isTerminalState(archiveDiff.getState())) { + if (isTerminalState(archiveDiff.getState())) { continue; } @@ -960,100 +993,20 @@ void mergeDelete(FileDiff fileDiff) throws MetaStoreException { diffChain.add(fileDiff.getDiffId()); } - void mergeRename(FileDiff fileDiff) throws MetaStoreException { - // Rename action will affect all append actions - if (fileLocks.contains(filePath)) { - return; - } - LOG.debug("Rename chain merge triggered for path {}", filePath); - // Lock file to avoid File Chain being processed - fileLocks.add(filePath); - try { - String newName = getDest(fileDiff); - boolean isCreate = false; - for (long diffId : appendChain) { - FileDiff appendFileDiff = fileDiffCache.get(diffId); - if (appendFileDiff != null && - appendFileDiff.getState() != FileDiffState.APPLIED) { - // update append diff path with renamed one - appendFileDiff.setSrc(newName); - changedFileInCacheDiffIds.add(appendFileDiff.getDiffId()); - } - if (Objects.equals(getOffset(fileDiff), "0")) { - isCreate = true; - } - } - if (isCreate) { - // mark rename event as applied, because we've already - // changed create (append) event path - updateFileDiffInCache(fileDiff.getDiffId(), FileDiffState.APPLIED); - } else { - // Insert rename fileDiff to head - diffChain.add(0, fileDiff.getDiffId()); - } - } finally { - // Unlock file - fileLocks.remove(filePath); - } - } - - boolean isRenameSyncedFile(FileDiff renameFileDiff) throws MetaStoreException { - String path = renameFileDiff.getSrc(); - // get unfinished append file diff - List unfinishedAppendFileDiff = new ArrayList<>(); - FileDiff renameDiffInArchive = null; - for (FileDiff fileDiff : fileDiffArchive) { - if (fileDiff.getDiffId() == renameFileDiff.getDiffId()) { - renameDiffInArchive = fileDiff; - break; - } - - if (fileDiff.getDiffType() == FileDiffType.APPEND - && fileDiff.getState() == FileDiffState.PENDING - && pathStartsWith(path, fileDiff.getSrc())) { - unfinishedAppendFileDiff.add(fileDiff); - } - } - - if (unfinishedAppendFileDiff.isEmpty()) { - return true; - } - - for (FileDiff unfinished : unfinishedAppendFileDiff) { - FileDiff fileDiff = fileDiffCache.get(unfinished.getDiffId()); - if (fileDiff == null) { - fileDiff = unfinished; - } - fileDiffsToTerminate.add(fileDiff); - updateFileDiffInCache(fileDiff.getDiffId(), FileDiffState.FAILED); - // add a new append file diff with new name - FileDiff newFileDiff = new FileDiff(FileDiffType.APPEND, FileDiffState.PENDING); - newFileDiff.getParameters().putAll(fileDiff.getParameters()); - newFileDiff.setSrc(fileDiff.getSrc().replaceFirst( - renameFileDiff.getSrc(), getDest(fileDiff))); - long did = metaStore.insertFileDiff(newFileDiff); - newFileDiff.setDiffId(did); - fileDiffArchive.add(fileDiffArchive.indexOf(renameDiffInArchive), newFileDiff); - } - return false; - } - long getHead() { - if (diffChain.isEmpty()) { - return -1; - } - return diffChain.get(0); + return Optional.ofNullable(diffChain.peek()) + .orElse(-1L); } - void removeHead() { + long removeHead() { if (diffChain.isEmpty()) { - return; + return -1; } - long fid = diffChain.get(0); - if (!appendChain.isEmpty() && fid == appendChain.get(0)) { - appendChain.remove(0); + long diffId = diffChain.poll(); + if (!appendChain.isEmpty() && diffId == appendChain.peek()) { + appendChain.poll(); } - diffChain.remove(0); + return diffId; } void removeFromChain(long diffId) { diff --git a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/ErasureCodingScheduler.java b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/ErasureCodingScheduler.java index 0ead82d37fb..be5c9675216 100644 --- a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/ErasureCodingScheduler.java +++ b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/ErasureCodingScheduler.java @@ -176,7 +176,7 @@ public ScheduleResult onSchedule(CmdletInfo cmdletInfo, ActionInfo actionInfo, } FileInfo fileinfo = metaStore.getFile(srcPath); - if (fileinfo != null && fileinfo.isdir()) { + if (fileinfo != null && fileinfo.isDir()) { return ScheduleResult.SUCCESS; } diff --git a/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/file/equality/TestFileEqualityStrategy.java b/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/file/equality/TestFileEqualityStrategy.java index 09bf2779d94..d1f21c1e3ab 100644 --- a/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/file/equality/TestFileEqualityStrategy.java +++ b/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/file/equality/TestFileEqualityStrategy.java @@ -111,7 +111,7 @@ public void testCompareNotEqualFilesWithEqualLength() throws IOException { private boolean checkFilesEquality(Path srcPath, Path destPath) throws IOException { FileStatus srcFileStatus = dfs.getFileStatus(srcPath); - FileInfo srcFileInfo = FileInfo.newBuilder() + FileInfo srcFileInfo = FileInfo.builder() .setPath(srcPath.toUri().getPath()) .setLength(srcFileStatus.getLen()) .build(); diff --git a/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/metric/fetcher/TestCachedListFetcher.java b/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/metric/fetcher/TestCachedListFetcher.java index 7af852a722e..1bae2bc4d72 100644 --- a/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/metric/fetcher/TestCachedListFetcher.java +++ b/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/metric/fetcher/TestCachedListFetcher.java @@ -140,7 +140,7 @@ public void testFetcher() throws Exception { // System.out.println(cacheAction.isCached(path)); } metaStore.insertFiles(fileInfos - .toArray(new FileInfo[fileInfos.size()])); + .toArray(new FileInfo[fileInfos.size()]), false); List ret = metaStore.getFile(); Assert.assertTrue(ret.size() == fids.length); cachedListFetcher.start(); diff --git a/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/metric/fetcher/TestFileDiffGenerator.java b/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/metric/fetcher/TestFileDiffGenerator.java new file mode 100644 index 00000000000..6f615cc9a99 --- /dev/null +++ b/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/metric/fetcher/TestFileDiffGenerator.java @@ -0,0 +1,483 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.smartdata.hdfs.metric.fetcher; + +import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.inotify.Event; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.smartdata.hdfs.action.CopyFileAction; +import org.smartdata.metastore.MetaStoreException; +import org.smartdata.metastore.TestDaoBase; +import org.smartdata.model.BackUpInfo; +import org.smartdata.model.FileDiff; +import org.smartdata.model.FileDiffState; +import org.smartdata.model.FileDiffType; +import org.smartdata.model.FileInfo; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Stream; + +import static org.apache.hadoop.hdfs.inotify.Event.MetadataUpdateEvent.MetadataType.OWNER; +import static org.apache.hadoop.hdfs.inotify.Event.MetadataUpdateEvent.MetadataType.PERMS; +import static org.apache.hadoop.hdfs.inotify.Event.MetadataUpdateEvent.MetadataType.REPLICATION; +import static org.apache.hadoop.hdfs.inotify.Event.MetadataUpdateEvent.MetadataType.TIMES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.smartdata.action.SyncAction.BASE_OPERATION; +import static org.smartdata.action.SyncAction.DEST; +import static org.smartdata.hdfs.action.MetaDataAction.BLOCK_REPLICATION; +import static org.smartdata.hdfs.action.MetaDataAction.GROUP_NAME; +import static org.smartdata.hdfs.action.MetaDataAction.MTIME; +import static org.smartdata.hdfs.action.MetaDataAction.OWNER_NAME; +import static org.smartdata.hdfs.action.MetaDataAction.PERMISSION; + +public class TestFileDiffGenerator extends TestDaoBase { + + private FileDiffGenerator fileDiffGenerator; + + @Before + public void init() throws Exception { + fileDiffGenerator = new FileDiffGenerator(metaStore, () -> 0L); + + BackUpInfo backUpInfo = new BackUpInfo(-1, "/backup/src", "/dest", 1000L); + metaStore.insertBackUpInfo(backUpInfo); + } + + @Test + public void testCreateFile() throws Exception { + FileInfo file = FileInfo.builder() + .setPath("/backup/src/file") + .setLength(42) + .build(); + + Optional fileDiff = fileDiffGenerator.onFileCreate(file); + + Map expectedParameters = ImmutableMap.of( + CopyFileAction.OFFSET_INDEX, "0", + CopyFileAction.LENGTH, "42" + ); + + FileDiff expectedFileDiff = fileDiffBuilder("/backup/src/file") + .diffType(FileDiffType.APPEND) + .parameters(expectedParameters) + .build(); + + assertTrue(fileDiff.isPresent()); + assertEquals(expectedFileDiff, fileDiff.get()); + } + + @Test + public void testCreateDirectory() throws Exception { + FileInfo file = FileInfo.builder() + .setPath("/backup/src/1/dir") + .setIsDir(true) + .build(); + + Optional fileDiff = fileDiffGenerator.onFileCreate(file); + FileDiff expectedFileDiff = fileDiffBuilder("/backup/src/1/dir") + .diffType(FileDiffType.MKDIR) + .build(); + + assertTrue(fileDiff.isPresent()); + assertEquals(expectedFileDiff, fileDiff.get()); + } + + @Test + public void testSkipCreateEventIfNotInBackupDir() throws Exception { + FileInfo file = FileInfo.builder() + .setPath("/another/src/1") + .build(); + + Optional fileDiff = fileDiffGenerator.onFileCreate(file); + assertFalse(fileDiff.isPresent()); + } + + @Test + public void testCloseNewFile() throws Exception { + Event.CloseEvent closeEvent = + new Event.CloseEvent("/backup/src/file", 100, 1L); + + Optional fileDiff = fileDiffGenerator.onFileClose(closeEvent); + + Map expectedParameters = ImmutableMap.of( + CopyFileAction.OFFSET_INDEX, "0", + CopyFileAction.LENGTH, "100" + ); + FileDiff expectedFileDiff = fileDiffBuilder("/backup/src/file") + .diffType(FileDiffType.APPEND) + .parameters(expectedParameters) + .build(); + + assertTrue(fileDiff.isPresent()); + assertEquals(expectedFileDiff, fileDiff.get()); + } + + @Test + public void testCloseExistingFile() throws Exception { + Event.CloseEvent closeEvent = + new Event.CloseEvent("/backup/src/file", 100, 1L); + + FileInfo fileInfo = FileInfo.builder() + .setPath("/backup/src/file") + .setLength(20) + .build(); + metaStore.insertFile(fileInfo, true); + + Optional fileDiff = fileDiffGenerator.onFileClose(closeEvent); + + Map expectedParameters = ImmutableMap.of( + CopyFileAction.OFFSET_INDEX, "20", + CopyFileAction.LENGTH, "80" + ); + FileDiff expectedFileDiff = fileDiffBuilder("/backup/src/file") + .diffType(FileDiffType.APPEND) + .parameters(expectedParameters) + .build(); + + assertTrue(fileDiff.isPresent()); + assertEquals(expectedFileDiff, fileDiff.get()); + } + + @Test + public void testSkipCloseEventIfEqualFileLength() throws Exception { + Event.CloseEvent closeEvent = + new Event.CloseEvent("/backup/src/file", 20, 1L); + + FileInfo fileInfo = FileInfo.builder() + .setPath("/backup/src/file") + .setLength(20) + .build(); + metaStore.insertFile(fileInfo, true); + + Optional fileDiff = fileDiffGenerator.onFileClose(closeEvent); + assertFalse(fileDiff.isPresent()); + } + + @Test + public void testSkipCloseEventIfNotInBackupDir() throws Exception { + Event.CloseEvent closeEvent = + new Event.CloseEvent("/another/src/file", 20, 1L); + + Optional fileDiff = fileDiffGenerator.onFileClose(closeEvent); + assertFalse(fileDiff.isPresent()); + } + + @Test + public void testDeleteFile() throws Exception { + Optional fileDiff = fileDiffGenerator.onFileDelete("/backup/src/file"); + + FileDiff expectedFileDiff = fileDiffBuilder("/backup/src/file") + .diffType(FileDiffType.DELETE) + .build(); + + assertTrue(fileDiff.isPresent()); + assertEquals(expectedFileDiff, fileDiff.get()); + } + + @Test + public void testSkipDeleteEventIfNotInBackupDir() throws Exception { + Optional fileDiff = fileDiffGenerator.onFileDelete("/dir/file"); + assertFalse(fileDiff.isPresent()); + } + + @Test + public void testChangeFileTimes() throws Exception { + Event.MetadataUpdateEvent metadataUpdateEvent = updateMetadataEvent(TIMES); + + Map expectedParameters = ImmutableMap.of( + MTIME, "1" + ); + + testChangeFileMetadata(metadataUpdateEvent, expectedParameters); + } + + @Test + public void testChangeFileReplication() throws Exception { + Event.MetadataUpdateEvent metadataUpdateEvent = updateMetadataEvent(REPLICATION); + + Map expectedParameters = ImmutableMap.of( + BLOCK_REPLICATION, "42" + ); + + testChangeFileMetadata(metadataUpdateEvent, expectedParameters); + } + + @Test + public void testChangeFileOwnerGroup() throws Exception { + Event.MetadataUpdateEvent metadataUpdateEvent = updateMetadataEvent(OWNER); + + Map expectedParameters = ImmutableMap.of( + OWNER_NAME, "newOwner", + GROUP_NAME, "newGroup" + ); + + testChangeFileMetadata(metadataUpdateEvent, expectedParameters); + } + + @Test + public void testChangeFilePermissions() throws Exception { + Event.MetadataUpdateEvent metadataUpdateEvent = updateMetadataEvent(PERMS); + + Map expectedParameters = ImmutableMap.of( + PERMISSION, "777" + ); + + testChangeFileMetadata(metadataUpdateEvent, expectedParameters); + } + + @Test + public void testSkipUpdateMetadataEventIfNotInBackupDir() throws Exception { + Event.MetadataUpdateEvent event = new Event.MetadataUpdateEvent.Builder() + .metadataType(OWNER) + .path("/test") + .build(); + + Optional fileDiff = fileDiffGenerator.onMetadataUpdate(event); + assertFalse(fileDiff.isPresent()); + } + + @Test + public void testRenameDir() throws Exception { + createFilesForRename(); + + Event.RenameEvent renameEvent = new Event.RenameEvent.Builder() + .srcPath("/backup/src/dir") + .dstPath("/backup/src/second_dir") + .build(); + + List fileDiffs = fileDiffGenerator.onFileRename( + renameEvent, metaStore.getFile(renameEvent.getSrcPath())); + + List expectedFileDiffs = Arrays.asList( + fileDiffBuilder("/backup/src/dir") + .diffType(FileDiffType.RENAME) + .parameters(ImmutableMap.of( + DEST, "/backup/src/second_dir", + BASE_OPERATION, "" + )) + .build(), + fileDiffBuilder("/backup/src/dir/1") + .diffType(FileDiffType.RENAME) + .parameters(ImmutableMap.of(DEST, "/backup/src/second_dir/1")) + .build(), + fileDiffBuilder("/backup/src/dir/2") + .diffType(FileDiffType.RENAME) + .parameters(ImmutableMap.of(DEST, "/backup/src/second_dir/2")) + .build() + ); + + assertEquals(expectedFileDiffs, fileDiffs); + } + + @Test + public void testRenameFile() throws Exception { + createFilesForRename(); + + Event.RenameEvent renameEvent = new Event.RenameEvent.Builder() + .srcPath("/backup/src/3") + .dstPath("/backup/src/4") + .build(); + + List fileDiffs = fileDiffGenerator.onFileRename( + renameEvent, metaStore.getFile(renameEvent.getSrcPath())); + + List expectedFileDiffs = Collections.singletonList( + fileDiffBuilder("/backup/src/3") + .diffType(FileDiffType.RENAME) + .parameters(ImmutableMap.of( + DEST, "/backup/src/4", + BASE_OPERATION, "" + )) + .build() + ); + + assertEquals(expectedFileDiffs, fileDiffs); + } + + @Test + public void testRenameFileDestNotInBackup() throws Exception { + createFilesForRename(); + + Event.RenameEvent renameEvent = new Event.RenameEvent.Builder() + .srcPath("/backup/src/dir") + .dstPath("/somewhere") + .build(); + + List fileDiffs = fileDiffGenerator.onFileRename( + renameEvent, metaStore.getFile(renameEvent.getSrcPath())); + + List expectedFileDiffs = Collections.singletonList( + fileDiffBuilder("/backup/src/dir") + .diffType(FileDiffType.DELETE) + .parameters(ImmutableMap.of(BASE_OPERATION, "")) + .build() + ); + + assertEquals(expectedFileDiffs, fileDiffs); + } + + @Test + public void testRenameSrcDirNotInBackup() throws Exception { + createFilesForRename(); + + Event.RenameEvent renameEvent = new Event.RenameEvent.Builder() + .srcPath("/another_dir/dir") + .dstPath("/backup/src/renamed") + .build(); + + List fileDiffs = fileDiffGenerator.onFileRename( + renameEvent, metaStore.getFile(renameEvent.getSrcPath())); + + Map appendParameters = ImmutableMap.of( + CopyFileAction.OFFSET_INDEX, "0", + CopyFileAction.LENGTH, "128" + ); + + List expectedFileDiffs = Arrays.asList( + fileDiffBuilder("/backup/src/renamed") + .diffType(FileDiffType.MKDIR) + .parameters(ImmutableMap.of(BASE_OPERATION, "")) + .build(), + fileDiffBuilder("/backup/src/renamed/4") + .diffType(FileDiffType.APPEND) + .parameters(appendParameters) + .build(), + fileDiffBuilder("/backup/src/renamed/5") + .diffType(FileDiffType.APPEND) + .parameters(appendParameters) + .build() + ); + + assertEquals(expectedFileDiffs, fileDiffs); + } + + @Test + public void testRenameSrcFileNotInBackup() throws Exception { + createFilesForRename(); + + Event.RenameEvent renameEvent = new Event.RenameEvent.Builder() + .srcPath("/another_dir/file") + .dstPath("/backup/src/file") + .build(); + + List fileDiffs = fileDiffGenerator.onFileRename( + renameEvent, metaStore.getFile(renameEvent.getSrcPath())); + + List expectedFileDiffs = Collections.singletonList( + fileDiffBuilder("/backup/src/file") + .diffType(FileDiffType.APPEND) + .parameters(ImmutableMap.of( + BASE_OPERATION, "", + CopyFileAction.OFFSET_INDEX, "0", + CopyFileAction.LENGTH, "128" + )) + .build() + ); + + assertEquals(expectedFileDiffs, fileDiffs); + } + + + @Test + public void testSkipRenameEventIfNotInBackupDir() throws Exception { + Event.RenameEvent renameEvent = new Event.RenameEvent.Builder() + .srcPath("/another_dir/file") + .dstPath("/another_dir/another_file") + .build(); + + List fileDiffs = fileDiffGenerator.onFileRename( + renameEvent, metaStore.getFile(renameEvent.getSrcPath())); + assertTrue(fileDiffs.isEmpty()); + } + + private void createFilesForRename() { + Stream.of( + "/backup", + "/backup/src", + "/backup/src/dir", + "/another_dir", + "/another_dir/dir" + ).forEach(path -> createFile(path, true)); + + Stream.of( + "/backup/src/dir/1", + "/backup/src/dir/2", + "/backup/src/3", + "/another_dir/dir/4", + "/another_dir/dir/5", + "/another_dir/file" + ).forEach(path -> createFile(path, false)); + } + + private void createFile(String path, boolean isDir) { + FileInfo fileInfo = FileInfo.builder() + .setPath(path) + .setIsDir(isDir) + .setLength(128) + .build(); + try { + metaStore.insertFile(fileInfo, true); + } catch (MetaStoreException e) { + Assert.fail(e.getMessage()); + } + } + + private void testChangeFileMetadata( + Event.MetadataUpdateEvent event, Map expectedParameters) throws MetaStoreException { + FileDiff expectedFileDiff = fileDiffBuilder("/backup/src/2/file") + .diffType(FileDiffType.METADATA) + .parameters(expectedParameters) + .build(); + + Optional fileDiff = fileDiffGenerator.onMetadataUpdate(event); + + assertTrue(fileDiff.isPresent()); + assertEquals(expectedFileDiff, fileDiff.get()); + } + + private Event.MetadataUpdateEvent updateMetadataEvent( + Event.MetadataUpdateEvent.MetadataType metadataType) { + return new Event.MetadataUpdateEvent.Builder() + .metadataType(metadataType) + .path("/backup/src/2/file") + .ownerName("newOwner") + .groupName("newGroup") + .mtime(1) + .atime(2) + .replication(42) + .perms(new FsPermission(777)) + .build(); + } + + private FileDiff.Builder fileDiffBuilder(String src) { + return FileDiff.builder() + .src(src) + .state(FileDiffState.PENDING) + .createTime(0L) + .parameters(new HashMap<>()); + } +} diff --git a/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/metric/fetcher/TestInotifyEventApplier.java b/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/metric/fetcher/TestInotifyEventApplier.java index 4565666a7a8..7505772d639 100644 --- a/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/metric/fetcher/TestInotifyEventApplier.java +++ b/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/metric/fetcher/TestInotifyEventApplier.java @@ -24,15 +24,13 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.junit.Assert; import org.junit.Test; -import org.mockito.Matchers; +import org.mockito.ArgumentMatchers; import org.mockito.Mockito; import org.smartdata.conf.SmartConf; -import org.smartdata.hdfs.CompatibilityHelperLoader; import org.smartdata.hdfs.HadoopUtil; import org.smartdata.metastore.TestDaoBase; import org.smartdata.model.BackUpInfo; import org.smartdata.model.FileDiff; -import org.smartdata.model.FileDiffType; import org.smartdata.model.FileInfo; import java.util.ArrayList; @@ -46,10 +44,10 @@ public void testApplier() throws Exception { DFSClient client = Mockito.mock(DFSClient.class); FileInfo root = HadoopUtil.convertFileStatus(getDummyDirStatus("/", 1000), "/"); - metaStore.insertFile(root); + metaStore.insertFile(root, false); BackUpInfo backUpInfo = new BackUpInfo(1L, "/file", "remote/dest/", 10); metaStore.insertBackUpInfo(backUpInfo); - InotifyEventApplier applier = new InotifyEventApplier(metaStore, client); + InotifyEventApplier applier = new InotifyEventApplier(new SmartConf(), metaStore, client); Event.CreateEvent createEvent = new Event.CreateEvent.Builder() @@ -80,27 +78,21 @@ public void testApplier() throws Exception { .feInfo(null) .storagePolicy((byte) 0) .build(); - Mockito.when(client.getFileInfo(Matchers.startsWith("/file"))).thenReturn(status1); - Mockito.when(client.getFileInfo(Matchers.startsWith("/dir"))) + Mockito.when(client.getFileInfo(ArgumentMatchers.startsWith("/file"))).thenReturn(status1); + Mockito.when(client.getFileInfo(ArgumentMatchers.startsWith("/dir"))) .thenReturn(getDummyDirStatus("", 1010)); applier.apply(new Event[]{createEvent}); FileInfo result1 = metaStore.getFile().get(1); - Assert.assertEquals(result1.getPath(), "/file"); - Assert.assertEquals(result1.getFileId(), 1010L); - Assert.assertEquals(result1.getPermission(), 511); + Assert.assertEquals("/file", result1.getPath()); + Assert.assertEquals(1L, result1.getFileId()); + Assert.assertEquals(511, result1.getPermission()); Event close = new Event.CloseEvent("/file", 1024, 0); applier.apply(new Event[]{close}); FileInfo result2 = metaStore.getFile().get(1); - Assert.assertEquals(result2.getLength(), 1024); - Assert.assertEquals(result2.getModificationTime(), 0L); - -// Event truncate = new Event.TruncateEvent("/file", 512, 16); -// applier.apply(new Event[] {truncate}); -// ResultSet result3 = metaStore.executeQuery("SELECT * FROM files"); -// Assert.assertEquals(result3.getLong("length"), 512); -// Assert.assertEquals(result3.getLong("modification_time"), 16L); + Assert.assertEquals(1024, result2.getLength()); + Assert.assertEquals(0L, result2.getModificationTime()); Event meta = new Event.MetadataUpdateEvent.Builder() @@ -129,8 +121,8 @@ public void testApplier() throws Exception { Assert.assertEquals(result4.getOwner(), "user1"); Assert.assertEquals(result4.getGroup(), "cg1"); // check metadata event didn't flush other FileInfo fields - Assert.assertEquals(result4.getFileId(), 1010L); - Assert.assertEquals(result4.getPermission(), new FsPermission("777").toShort()); + Assert.assertEquals(1L, result4.getFileId()); + Assert.assertEquals(new FsPermission("777").toShort(), result4.getPermission()); Event.CreateEvent createEvent2 = new Event.CreateEvent.Builder() @@ -165,7 +157,7 @@ public void testApplier() throws Exception { actualPaths.add(s.getPath()); } Collections.sort(actualPaths); - Assert.assertTrue(actualPaths.size() == 4); + Assert.assertEquals(4, actualPaths.size()); Assert.assertTrue(actualPaths.containsAll(expectedPaths)); Event unlink = new Event.UnlinkEvent.Builder().path("/").timestamp(6).build(); @@ -174,13 +166,13 @@ public void testApplier() throws Exception { Assert.assertEquals(metaStore.getFile().size(), 0); System.out.println("Files in table " + metaStore.getFile().size()); List fileDiffList = metaStore.getPendingDiff(); - Assert.assertTrue(fileDiffList.size() == 4); + Assert.assertEquals(4, fileDiffList.size()); } @Test public void testApplierCreateEvent() throws Exception { DFSClient client = Mockito.mock(DFSClient.class); - InotifyEventApplier applier = new InotifyEventApplier(metaStore, client); + InotifyEventApplier applier = new InotifyEventApplier(new SmartConf(), metaStore, client); BackUpInfo backUpInfo = new BackUpInfo(1L, "/file1", "remote/dest/", 10); metaStore.insertBackUpInfo(backUpInfo); @@ -213,25 +205,18 @@ public void testApplierCreateEvent() throws Exception { Mockito.when(client.getFileInfo("/file1")).thenReturn(status1); applier.apply(events); - Assert.assertTrue(metaStore.getFile("/file1").getOwner().equals("test")); + Assert.assertEquals("test", metaStore.getFile("/file1").getOwner()); //judge file diff List fileDiffs = metaStore.getFileDiffsByFileName("/file1"); - Assert.assertTrue(fileDiffs.size() > 0); - for (FileDiff fileDiff : fileDiffs) { - if (fileDiff.getDiffType().equals(FileDiffType.APPEND)) { - //find create diff and compare - Assert.assertTrue(fileDiff.getParameters().get("-owner").equals("test")); - } - } + Assert.assertFalse(fileDiffs.isEmpty()); } @Test public void testApplierRenameEvent() throws Exception { DFSClient client = Mockito.mock(DFSClient.class); SmartConf conf = new SmartConf(); - NamespaceFetcher namespaceFetcher = new NamespaceFetcher(client, metaStore, null, conf); - InotifyEventApplier applier = new InotifyEventApplier(conf, metaStore, client, namespaceFetcher); + InotifyEventApplier applier = new InotifyEventApplier(conf, metaStore, client); FileInfo[] fileInfos = new FileInfo[]{ HadoopUtil.convertFileStatus(getDummyFileStatus("/dirfile", 7000), "/dirfile"), @@ -245,22 +230,22 @@ public void testApplierRenameEvent() throws Exception { HadoopUtil.convertFileStatus(getDummyFileStatus("/dir/dir/f1", 8201), "/dir/dir/f1"), HadoopUtil.convertFileStatus(getDummyFileStatus("/file", 2000), "/file"), }; - metaStore.insertFiles(fileInfos); + metaStore.insertFiles(fileInfos, false); Mockito.when(client.getFileInfo("/dir1")).thenReturn(getDummyDirStatus("/dir1", 8000)); Event.RenameEvent dirRenameEvent = new Event.RenameEvent.Builder() .srcPath("/dir") .dstPath("/dir1") .build(); applier.apply(new Event[]{dirRenameEvent}); - Assert.assertTrue(metaStore.getFile("/dir") == null); - Assert.assertTrue(metaStore.getFile("/dir/file1") == null); - Assert.assertTrue(metaStore.getFile("/dirfile") != null); - Assert.assertTrue(metaStore.getFile("/dir1") != null); - Assert.assertTrue(metaStore.getFile("/dir1/file1") != null); - Assert.assertTrue(metaStore.getFile("/dir1/dir/f1") != null); - Assert.assertTrue(metaStore.getFile("/dir2") != null); - Assert.assertTrue(metaStore.getFile("/dir2/file1") != null); - Assert.assertTrue(metaStore.getFile("/file") != null); + Assert.assertNull(metaStore.getFile("/dir")); + Assert.assertNull(metaStore.getFile("/dir/file1")); + Assert.assertNotNull(metaStore.getFile("/dirfile")); + Assert.assertNotNull(metaStore.getFile("/dir1")); + Assert.assertNotNull(metaStore.getFile("/dir1/file1")); + Assert.assertNotNull(metaStore.getFile("/dir1/dir/f1")); + Assert.assertNotNull(metaStore.getFile("/dir2")); + Assert.assertNotNull(metaStore.getFile("/dir2/file1")); + Assert.assertNotNull(metaStore.getFile("/file")); List events = new ArrayList<>(); Event.RenameEvent renameEvent = new Event.RenameEvent.Builder() @@ -269,14 +254,7 @@ public void testApplierRenameEvent() throws Exception { .build(); events.add(renameEvent); applier.apply(events); - Assert.assertTrue(metaStore.getFile("/file2") == null); - - /* - Mockito.when(client.getFileInfo("/file2")).thenReturn(getDummyFileStatus("/file2", 2000)); - applier.apply(events); - FileInfo info = metaStore.getFile("/file2"); - Assert.assertTrue(info != null && info.getFileId() == 2000); - */ + Assert.assertNull(metaStore.getFile("/file2")); events.clear(); renameEvent = new Event.RenameEvent.Builder() @@ -286,9 +264,9 @@ public void testApplierRenameEvent() throws Exception { events.add(renameEvent); applier.apply(events); FileInfo info2 = metaStore.getFile("/file"); - Assert.assertTrue(info2 == null); + Assert.assertNull(info2); FileInfo info3 = metaStore.getFile("/file1"); - Assert.assertTrue(info3 != null); + Assert.assertNotNull(info3); renameEvent = new Event.RenameEvent.Builder() .srcPath("/file1") diff --git a/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/metric/fetcher/TestInotifyFetcher.java b/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/metric/fetcher/TestInotifyFetcher.java index b5cec71f3c4..5278ac7ec9f 100644 --- a/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/metric/fetcher/TestInotifyFetcher.java +++ b/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/metric/fetcher/TestInotifyFetcher.java @@ -56,7 +56,7 @@ private static class EventApplierForTest extends InotifyEventApplier { private final List events = new ArrayList<>(); public EventApplierForTest(MetaStore metaStore, DFSClient client) { - super(metaStore, client); + super(new SmartConf(), metaStore, client); } @Override diff --git a/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/metric/fetcher/TestNamespaceFetcher.java b/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/metric/fetcher/TestNamespaceFetcher.java index 36644bb69f5..67db78e9b4b 100644 --- a/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/metric/fetcher/TestNamespaceFetcher.java +++ b/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/metric/fetcher/TestNamespaceFetcher.java @@ -74,7 +74,7 @@ public Void answer(InvocationOnMock invocationOnMock) { } return null; } - }).when(adapter).insertFiles(any(FileInfo[].class)); + }).when(adapter).insertFiles(any(FileInfo[].class), anyBoolean()); SmartConf nonNullConfig = Optional.ofNullable(conf) .orElseGet(SmartConf::new); @@ -104,28 +104,6 @@ public void testFetchingFromRoot() throws IOException, InterruptedException, } } - @Test - public void testFetchingFromGivenDir() throws IOException, InterruptedException, - MetaStoreException { - pathesInDB.clear(); - final Configuration conf = new SmartConf(); - final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) - .numDataNodes(2).build(); - String fetchDir = "/user"; - try { - NamespaceFetcher fetcher = init(cluster, null); - fetcher.startFetch(fetchDir); - Set expected = Sets.newHashSet("/user", "/user/user1", "/user/user2"); - while (!fetcher.fetchFinished()) { - Thread.sleep(100); - } - Assert.assertEquals(expected, pathesInDB); - fetcher.stop(); - } finally { - cluster.shutdown(); - } - } - @Test public void testIgnore() throws IOException, InterruptedException, MetaStoreException { diff --git a/smart-metastore/src/main/java/org/smartdata/metastore/MetaStore.java b/smart-metastore/src/main/java/org/smartdata/metastore/MetaStore.java index 913833a3717..679cc894891 100644 --- a/smart-metastore/src/main/java/org/smartdata/metastore/MetaStore.java +++ b/smart-metastore/src/main/java/org/smartdata/metastore/MetaStore.java @@ -216,20 +216,20 @@ public Long queryForLong(String sql) throws MetaStoreException { /** * Store a single file info into database. */ - public void insertFile(FileInfo file) + public void insertFile(FileInfo file, boolean generateId) throws MetaStoreException { updateCache(); - fileInfoDao.insert(file); + fileInfoDao.insert(file, generateId); } /** * Store files info into database. */ - public void insertFiles(FileInfo[] files) + public void insertFiles(FileInfo[] files, boolean generateIds) throws MetaStoreException { updateCache(); - fileInfoDao.insert(files); + fileInfoDao.insert(files, generateIds); } public void updateFileByPath(String path, FileInfoDiff fileUpdate) { diff --git a/smart-metastore/src/main/java/org/smartdata/metastore/dao/FileInfoDao.java b/smart-metastore/src/main/java/org/smartdata/metastore/dao/FileInfoDao.java index 40f17ad16be..d72f34b06e9 100644 --- a/smart-metastore/src/main/java/org/smartdata/metastore/dao/FileInfoDao.java +++ b/smart-metastore/src/main/java/org/smartdata/metastore/dao/FileInfoDao.java @@ -43,9 +43,9 @@ public interface FileInfoDao { Map getPathFids(Collection paths) throws SQLException; - void insert(FileInfo fileInfo); + void insert(FileInfo fileInfo, boolean generateId); - void insert(FileInfo[] fileInfos); + void insert(FileInfo[] fileInfos, boolean generateIds); int update(String path, int storagePolicy); diff --git a/smart-metastore/src/main/java/org/smartdata/metastore/dao/impl/DefaultFileDiffDao.java b/smart-metastore/src/main/java/org/smartdata/metastore/dao/impl/DefaultFileDiffDao.java index 3ab296e46e5..09dc5ce3db4 100644 --- a/smart-metastore/src/main/java/org/smartdata/metastore/dao/impl/DefaultFileDiffDao.java +++ b/smart-metastore/src/main/java/org/smartdata/metastore/dao/impl/DefaultFileDiffDao.java @@ -17,6 +17,8 @@ */ package org.smartdata.metastore.dao.impl; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; import org.apache.commons.lang3.StringUtils; import org.smartdata.metastore.dao.AbstractDao; import org.smartdata.metastore.dao.FileDiffDao; @@ -29,6 +31,7 @@ import javax.sql.DataSource; +import java.lang.reflect.Type; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; @@ -40,6 +43,11 @@ public class DefaultFileDiffDao extends AbstractDao implements FileDiffDao { private static final String TABLE_NAME = "file_diff"; + + private static final Gson DIFF_PARAMS_SERIALIZER = new Gson(); + private static final Type DIFF_PARAMS_TYPE = + new TypeToken>() {}.getType(); + public String uselessFileDiffStates; public DefaultFileDiffDao(DataSource dataSource) { @@ -225,7 +233,7 @@ public int update(long did, String src) { @Override public int update(long did, FileDiffState state, - String parameters) { + String parameters) { String sql = "UPDATE " + TABLE_NAME + " SET state = ?, " + "parameters = ? WHERE did = ?"; return jdbcTemplate.update(sql, state.getValue(), parameters, did); @@ -245,12 +253,13 @@ public void update(final List fileDiffs) { new BatchPreparedStatementSetter() { @Override public void setValues(PreparedStatement ps, - int i) throws SQLException { + int i) throws SQLException { FileDiff fileDiff = fileDiffs.get(i); ps.setLong(1, fileDiff.getRuleId()); ps.setInt(2, fileDiff.getDiffType().getValue()); ps.setString(3, fileDiff.getSrc()); - ps.setString(4, fileDiff.getParametersJsonString()); + ps.setString(4, + DIFF_PARAMS_SERIALIZER.toJson(fileDiff.getParameters())); ps.setInt(5, fileDiff.getState().getValue()); ps.setLong(6, fileDiff.getCreateTime()); ps.setLong(7, fileDiff.getDiffId()); @@ -275,7 +284,8 @@ public int update(final FileDiff fileDiff) { + "WHERE did = ?"; return jdbcTemplate.update(sql, fileDiff.getRuleId(), fileDiff.getDiffType().getValue(), fileDiff.getSrc(), - fileDiff.getParametersJsonString(), fileDiff.getState().getValue(), + DIFF_PARAMS_SERIALIZER.toJson(fileDiff.getParameters()), + fileDiff.getState().getValue(), fileDiff.getCreateTime(), fileDiff.getDiffId()); } @@ -287,13 +297,13 @@ public void deleteAll() { } private Map toMap(FileDiff fileDiff) { - // System.out.println(fileDiff.getDiffType()); Map parameters = new HashMap<>(); parameters.put("did", fileDiff.getDiffId()); parameters.put("rid", fileDiff.getRuleId()); parameters.put("diff_type", fileDiff.getDiffType().getValue()); parameters.put("src", fileDiff.getSrc()); - parameters.put("parameters", fileDiff.getParametersJsonString()); + parameters.put("parameters", + DIFF_PARAMS_SERIALIZER.toJson(fileDiff.getParameters())); parameters.put("state", fileDiff.getState().getValue()); parameters.put("create_time", fileDiff.getCreateTime()); return parameters; @@ -305,10 +315,11 @@ public FileDiff mapRow(ResultSet resultSet, int i) throws SQLException { FileDiff fileDiff = new FileDiff(); fileDiff.setDiffId(resultSet.getLong("did")); fileDiff.setRuleId(resultSet.getLong("rid")); - fileDiff.setDiffType(FileDiffType.fromValue((int) resultSet.getByte("diff_type"))); + fileDiff.setDiffType(FileDiffType.fromValue(resultSet.getByte("diff_type"))); fileDiff.setSrc(resultSet.getString("src")); - fileDiff.setParametersFromJsonString(resultSet.getString("parameters")); - fileDiff.setState(FileDiffState.fromValue((int) resultSet.getByte("state"))); + fileDiff.setParameters(DIFF_PARAMS_SERIALIZER.fromJson( + resultSet.getString("parameters"), DIFF_PARAMS_TYPE)); + fileDiff.setState(FileDiffState.fromValue(resultSet.getByte("state"))); fileDiff.setCreateTime(resultSet.getLong("create_time")); return fileDiff; } diff --git a/smart-metastore/src/main/java/org/smartdata/metastore/dao/impl/DefaultFileInfoDao.java b/smart-metastore/src/main/java/org/smartdata/metastore/dao/impl/DefaultFileInfoDao.java index c685d8769f4..ece73f34e7b 100644 --- a/smart-metastore/src/main/java/org/smartdata/metastore/dao/impl/DefaultFileInfoDao.java +++ b/smart-metastore/src/main/java/org/smartdata/metastore/dao/impl/DefaultFileInfoDao.java @@ -24,11 +24,13 @@ import org.springframework.jdbc.core.RowMapper; import org.springframework.jdbc.core.namedparam.MapSqlParameterSource; import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; +import org.springframework.jdbc.core.simple.SimpleJdbcInsert; import javax.sql.DataSource; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -101,13 +103,21 @@ public Map getPathFids(Collection paths) } @Override - public void insert(FileInfo fileInfo) { - insert(fileInfo, this::toMap); + public void insert(FileInfo fileInfo, boolean generateId) { + SimpleJdbcInsert simpleJdbcInsert = simpleJdbcInsert(); + if (generateId) { + simpleJdbcInsert.usingGeneratedKeyColumns("fid"); + } + simpleJdbcInsert.execute(toMap(fileInfo)); } @Override - public void insert(FileInfo[] fileInfos) { - insert(fileInfos, this::toMap); + public void insert(FileInfo[] fileInfos, boolean generateId) { + SimpleJdbcInsert simpleJdbcInsert = simpleJdbcInsert(); + if (generateId) { + simpleJdbcInsert.usingGeneratedKeyColumns("fid"); + } + insert(simpleJdbcInsert, Arrays.asList(fileInfos), this::toMap); } @Override @@ -128,7 +138,7 @@ public void deleteById(long fid) { } @Override - public void deleteByPath(String path, boolean recursive) { + public void deleteByPath(String path, boolean recursive) { String sql = "DELETE FROM file WHERE path = ?"; jdbcTemplate.update(sql, path); if (recursive) { @@ -171,6 +181,7 @@ private Map updateToMap(FileInfoDiff fileInfo) { .put("owner_group", fileInfo.getGroup()); parameters.put("permission", fileInfo.getPermission()); parameters.put("ec_policy_id", fileInfo.getErasureCodingPolicy()); + parameters.put("sid", fileInfo.getStoragePolicy()); return parameters; } @@ -180,10 +191,10 @@ private Map toMap(FileInfo fileInfo) { parameters.put("fid", fileInfo.getFileId()); parameters.put("length", fileInfo.getLength()); parameters.put("block_replication", fileInfo.getBlockReplication()); - parameters.put("block_size", fileInfo.getBlocksize()); + parameters.put("block_size", fileInfo.getBlockSize()); parameters.put("modification_time", fileInfo.getModificationTime()); parameters.put("access_time", fileInfo.getAccessTime()); - parameters.put("is_dir", fileInfo.isdir()); + parameters.put("is_dir", fileInfo.isDir()); parameters.put("sid", fileInfo.getStoragePolicy()); parameters .put("owner", fileInfo.getOwner()); diff --git a/smart-metastore/src/main/java/org/smartdata/metastore/ingestion/FileStatusIngester.java b/smart-metastore/src/main/java/org/smartdata/metastore/ingestion/FileStatusIngester.java index bfee567fed6..e6467d357e4 100644 --- a/smart-metastore/src/main/java/org/smartdata/metastore/ingestion/FileStatusIngester.java +++ b/smart-metastore/src/main/java/org/smartdata/metastore/ingestion/FileStatusIngester.java @@ -45,12 +45,12 @@ public void run() { if (batch != null) { FileInfo[] statuses = batch.getFileInfos(); if (statuses.length == batch.actualSize()) { - this.dbAdapter.insertFiles(batch.getFileInfos()); + this.dbAdapter.insertFiles(batch.getFileInfos(), true); IngestionTask.numPersisted.addAndGet(statuses.length); } else { FileInfo[] actual = new FileInfo[batch.actualSize()]; System.arraycopy(statuses, 0, actual, 0, batch.actualSize()); - this.dbAdapter.insertFiles(actual); + this.dbAdapter.insertFiles(actual, true); IngestionTask.numPersisted.addAndGet(actual.length); } diff --git a/smart-metastore/src/main/resources/db/changelog/changelog-6.make-fileId-autoincrement.xml b/smart-metastore/src/main/resources/db/changelog/changelog-6.make-fileId-autoincrement.xml new file mode 100644 index 00000000000..783062b32fb --- /dev/null +++ b/smart-metastore/src/main/resources/db/changelog/changelog-6.make-fileId-autoincrement.xml @@ -0,0 +1,24 @@ + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/smart-metastore/src/main/resources/db/changelog/changelog-root.xml b/smart-metastore/src/main/resources/db/changelog/changelog-root.xml index 55021b0e158..09947ec5902 100644 --- a/smart-metastore/src/main/resources/db/changelog/changelog-root.xml +++ b/smart-metastore/src/main/resources/db/changelog/changelog-root.xml @@ -13,4 +13,5 @@ + \ No newline at end of file diff --git a/smart-metastore/src/main/resources/db/changelog/sql/set_file_id_sequence_start_value.sql b/smart-metastore/src/main/resources/db/changelog/sql/set_file_id_sequence_start_value.sql new file mode 100644 index 00000000000..dd8c2329da5 --- /dev/null +++ b/smart-metastore/src/main/resources/db/changelog/sql/set_file_id_sequence_start_value.sql @@ -0,0 +1,3 @@ +SELECT setval(pg_get_serial_sequence('file', 'fid'), + COALESCE(MAX(fid), 0) + 1, false) +FROM file; \ No newline at end of file diff --git a/smart-metastore/src/test/java/org/smartdata/metastore/TestMetaStore.java b/smart-metastore/src/test/java/org/smartdata/metastore/TestMetaStore.java index 9fc13519579..016bcbb8a10 100644 --- a/smart-metastore/src/test/java/org/smartdata/metastore/TestMetaStore.java +++ b/smart-metastore/src/test/java/org/smartdata/metastore/TestMetaStore.java @@ -173,7 +173,7 @@ public void testGetFiles() throws Exception { group, storagePolicy, erasureCodingPolicy); - metaStore.insertFile(fileInfo); + metaStore.insertFile(fileInfo, false); FileInfo dbFileInfo = metaStore.getFile(56); Assert.assertEquals(fileInfo, dbFileInfo); dbFileInfo = metaStore.getFile("/tmp/des"); @@ -331,7 +331,7 @@ public void testInsetFiles() throws Exception { storagePolicy, erasureCodingPolicy) }; - metaStore.insertFiles(files); + metaStore.insertFiles(files, false); FileInfo dbFileInfo = metaStore.getFile("/tmp/testFile"); Assert.assertEquals(files[0], dbFileInfo); } diff --git a/smart-metastore/src/test/java/org/smartdata/metastore/dao/TestFileDiffDao.java b/smart-metastore/src/test/java/org/smartdata/metastore/dao/TestFileDiffDao.java index 105a8d1d43c..b70da97b5b9 100644 --- a/smart-metastore/src/test/java/org/smartdata/metastore/dao/TestFileDiffDao.java +++ b/smart-metastore/src/test/java/org/smartdata/metastore/dao/TestFileDiffDao.java @@ -17,6 +17,7 @@ */ package org.smartdata.metastore.dao; +import com.google.gson.Gson; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -79,13 +80,14 @@ public void testBatchUpdateAndQuery() { List fileInfoList = fileDiffDao.getAll(); assertEquals(fileDiffs, fileInfoList); + Gson diffParametersSerializer = new Gson(); //update List dids = new ArrayList<>(); dids.add(1L); dids.add(2L); List parameters = new ArrayList<>(); - parameters.add(fileDiffs.get(0).getParametersJsonString()); - parameters.add(fileDiffs.get(1).getParametersJsonString()); + parameters.add(diffParametersSerializer.toJson(fileDiffs.get(0).getParameters())); + parameters.add(diffParametersSerializer.toJson(fileDiffs.get(1).getParameters())); List fileDiffStates = new ArrayList<>(); fileDiffStates.add(FileDiffState.APPLIED); fileDiffStates.add(fileDiffs.get(1).getState()); diff --git a/smart-metastore/src/test/java/org/smartdata/metastore/dao/TestFileInfoDao.java b/smart-metastore/src/test/java/org/smartdata/metastore/dao/TestFileInfoDao.java index a099c09b1c8..4142f7731cf 100644 --- a/smart-metastore/src/test/java/org/smartdata/metastore/dao/TestFileInfoDao.java +++ b/smart-metastore/src/test/java/org/smartdata/metastore/dao/TestFileInfoDao.java @@ -23,9 +23,7 @@ import org.smartdata.metastore.TestDaoBase; import org.smartdata.model.FileInfo; -import java.util.HashMap; import java.util.List; -import java.util.Map; public class TestFileInfoDao extends TestDaoBase { private FileInfoDao fileInfoDao; @@ -36,7 +34,7 @@ public void initFileDao() { } @Test - public void testInsetGetDeleteFiles() throws Exception { + public void testInsetGetDeleteFiles() { String path = "/testFile"; long length = 123L; boolean isDir = false; @@ -47,33 +45,39 @@ public void testInsetGetDeleteFiles() throws Exception { short permission = 1; String owner = "root"; String group = "admin"; - long fileId = 312321L; + long fileId = 1L; byte storagePolicy = 0; byte erasureCodingPolicy = 0; FileInfo fileInfo = new FileInfo(path, fileId, length, isDir, blockReplication, blockSize, modTime, accessTime, permission, owner, group, storagePolicy, erasureCodingPolicy); - fileInfoDao.insert(fileInfo); + fileInfoDao.insert(fileInfo, true); + FileInfo file1 = fileInfoDao.getByPath("/testFile"); - Assert.assertTrue(fileInfo.equals(file1)); + Assert.assertEquals(fileInfo, file1); + FileInfo file2 = fileInfoDao.getById(fileId); - Assert.assertTrue(fileInfo.equals(file2)); + Assert.assertEquals(fileInfo, file2); + FileInfo fileInfo1 = new FileInfo(path, fileId + 1, length, isDir, blockReplication, blockSize, modTime, accessTime, permission, owner, group, storagePolicy, erasureCodingPolicy); - fileInfoDao.insert(fileInfo1); + fileInfoDao.insert(fileInfo1, true); List fileInfos = fileInfoDao.getFilesByPrefix("/testaaFile"); - Assert.assertTrue(fileInfos.size() == 0); + Assert.assertEquals(0, fileInfos.size()); + fileInfos = fileInfoDao.getFilesByPrefix("/testFile"); - Assert.assertTrue(fileInfos.size() == 2); + Assert.assertEquals(2, fileInfos.size()); + fileInfoDao.deleteById(fileId); fileInfos = fileInfoDao.getAll(); - Assert.assertTrue(fileInfos.size() == 1); + Assert.assertEquals(1, fileInfos.size()); + fileInfoDao.deleteAll(); fileInfos = fileInfoDao.getAll(); - Assert.assertTrue(fileInfos.size() == 0); + Assert.assertTrue(fileInfos.isEmpty()); } @Test - public void testInsertUpdateFiles() throws Exception { + public void testInsertUpdateFiles() { String path = "/testFile"; long length = 123L; boolean isDir = false; @@ -84,19 +88,17 @@ public void testInsertUpdateFiles() throws Exception { short permission = 1; String owner = "root"; String group = "admin"; - long fileId = 312321L; + long fileId = 1L; byte storagePolicy = 0; byte erasureCodingPolicy = 0; - Map mapOwnerIdName = new HashMap<>(); - mapOwnerIdName.put(1, "root"); - Map mapGroupIdName = new HashMap<>(); - mapGroupIdName.put(1, "admin"); + FileInfo fileInfo = new FileInfo(path, fileId, length, isDir, blockReplication, blockSize, modTime, accessTime, permission, owner, group, storagePolicy, erasureCodingPolicy); - fileInfoDao.insert(fileInfo); + fileInfoDao.insert(fileInfo, true); fileInfoDao.update(path, 10); FileInfo file = fileInfoDao.getById(fileId); fileInfo.setStoragePolicy((byte) 10); - Assert.assertTrue(file.equals(fileInfo)); + + Assert.assertEquals(file, fileInfo); } } diff --git a/smart-metastore/src/test/java/org/smartdata/metastore/dao/accesscount/TestAccessEventAggregator.java b/smart-metastore/src/test/java/org/smartdata/metastore/dao/accesscount/TestAccessEventAggregator.java index 77a7e5719f2..58b59a1aa2f 100644 --- a/smart-metastore/src/test/java/org/smartdata/metastore/dao/accesscount/TestAccessEventAggregator.java +++ b/smart-metastore/src/test/java/org/smartdata/metastore/dao/accesscount/TestAccessEventAggregator.java @@ -61,7 +61,7 @@ public void setup() { aggregator = new DbAccessEventAggregator(metaStore.fileInfoDao(), dbTableManager, new Failover(){}); - metaStore.fileInfoDao().insert(testFileInfos()); + metaStore.fileInfoDao().insert(testFileInfos(), false); } @Test @@ -106,7 +106,7 @@ private FileInfo[] testFileInfos() { } private FileInfo dummyFileInfo(String path, long fileId) { - return FileInfo.newBuilder() + return FileInfo.builder() .setPath(path) .setFileId(fileId) .build(); diff --git a/smart-metastore/src/test/java/org/smartdata/metastore/dao/accesscount/TestFileAccessManager.java b/smart-metastore/src/test/java/org/smartdata/metastore/dao/accesscount/TestFileAccessManager.java index d589f501fe1..e9fcf490a37 100644 --- a/smart-metastore/src/test/java/org/smartdata/metastore/dao/accesscount/TestFileAccessManager.java +++ b/smart-metastore/src/test/java/org/smartdata/metastore/dao/accesscount/TestFileAccessManager.java @@ -150,12 +150,12 @@ private void insertFileAccessCounts(long currentTimeMillis) throws MetaStoreExce private void createTestFiles() throws MetaStoreException { FileInfo[] fileInfos = IntStream.range(0, TEST_FILES.size()) - .mapToObj(id -> FileInfo.newBuilder() + .mapToObj(id -> FileInfo.builder() .setFileId(id) .setPath(TEST_FILES.get(id)) .build()) .toArray(FileInfo[]::new); - metaStore.insertFiles(fileInfos); + metaStore.insertFiles(fileInfos, false); } } diff --git a/smart-server/src/test/java/org/smartdata/server/TestCopy2S3Scheduler.java b/smart-server/src/test/java/org/smartdata/server/TestCopy2S3Scheduler.java index 551c204205c..43b4e2c4bcb 100644 --- a/smart-server/src/test/java/org/smartdata/server/TestCopy2S3Scheduler.java +++ b/smart-server/src/test/java/org/smartdata/server/TestCopy2S3Scheduler.java @@ -74,10 +74,10 @@ public void testThrowIfSrcFileNotFound() throws Exception { public void testThrowIfSrcFileIsEmpty() throws Exception { waitTillSSMExitSafeMode(); - ssm.getMetaStore().insertFile(FileInfo.newBuilder() + ssm.getMetaStore().insertFile(FileInfo.builder() .setPath("/empty.file") .setLength(0L) - .build()); + .build(), true); ActionRejectedException exception = assertThrows( ActionRejectedException.class, @@ -90,10 +90,10 @@ public void testThrowIfSrcFileIsEmpty() throws Exception { public void testThrowIfSrcFileIsAlreadyCopied() throws Exception { waitTillSSMExitSafeMode(); - ssm.getMetaStore().insertFile(FileInfo.newBuilder() + ssm.getMetaStore().insertFile(FileInfo.builder() .setPath("/test.file") .setLength(10L) - .build()); + .build(), true); ssm.getMetaStore().insertUpdateFileState(new S3FileState("/test.file")); @@ -108,10 +108,10 @@ public void testThrowIfSrcFileIsAlreadyCopied() throws Exception { public void testThrowIfSrcFileIsLocked() throws Exception { waitTillSSMExitSafeMode(); - ssm.getMetaStore().insertFile(FileInfo.newBuilder() + ssm.getMetaStore().insertFile(FileInfo.builder() .setPath("/test.file") .setLength(10L) - .build()); + .build(), true); ssm.getCmdletManager() .submitCmdlet("sleep -ms 10000; copy2s3 -file /test.file"); diff --git a/smart-server/src/test/java/org/smartdata/server/engine/rule/TestRuleManager.java b/smart-server/src/test/java/org/smartdata/server/engine/rule/TestRuleManager.java index 04cd6ce12ef..84f6f572abb 100644 --- a/smart-server/src/test/java/org/smartdata/server/engine/rule/TestRuleManager.java +++ b/smart-server/src/test/java/org/smartdata/server/engine/rule/TestRuleManager.java @@ -300,7 +300,7 @@ public void testMultiThreadChangeState() throws Exception { FileInfo[] files = {new FileInfo("/tmp/testfile", fid, length, false, (short) 3, 1024, now, now, (short) 1, null, null, (byte) 3, (byte) 0)}; - metaStore.insertFiles(files); + metaStore.insertFiles(files, false); long rid = ruleManager.submitRule(rule, RuleState.ACTIVE); long start = System.currentTimeMillis(); diff --git a/supports/tools/docker/multihost/conf/core-site.xml b/supports/tools/docker/multihost/conf/core-site.xml index 0731a0321ee..36a0e6926c6 100644 --- a/supports/tools/docker/multihost/conf/core-site.xml +++ b/supports/tools/docker/multihost/conf/core-site.xml @@ -22,4 +22,9 @@ smart.server.kerberos.principal ssm/ssm-server.demo@DEMO + + + ipc.client.fallback-to-simple-auth-allowed + true + diff --git a/supports/tools/docker/multihost/conf/smart-default.xml b/supports/tools/docker/multihost/conf/smart-default.xml index 2e064e26f5d..7f9fa74ba85 100644 --- a/supports/tools/docker/multihost/conf/smart-default.xml +++ b/supports/tools/docker/multihost/conf/smart-default.xml @@ -425,17 +425,6 @@ - - smart.sync.schedule.strategy - UNORDERED - - Strategy of copying files during 'sync' rule. Possible values: - FIFO - the files created/modified first will be scheduled for transfer first - LIFO - the files created/modified last will be scheduled for transfer first - UNORDERED - no guarantees of the file scheduling order - - - smart.sync.file.equality.strategy CHECKSUM