diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index fc46325cb53..f65a42c1fc7 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -805,6 +805,14 @@ public int getMaxConcurrentJobLimit() { return Integer.parseInt(getOptional("kylin.job.max-concurrent-jobs", "10")); } + public int getLowPriorityBar() { + return Integer.parseInt(getOptional("kylin.job.low-priority-bar", "0")); + } + + public int getLowPriorityJobLimit() { + return Integer.parseInt(getOptional("kylin.job.low-priority-limit", "10")); + } + public String getHiveDependencyFilterList() { return this.getOptional("kylin.job.dependency-filter-list", "[^,]*hive-exec[^,]*?\\.jar" + "|" + "[^,]*hive-metastore[^,]*?\\.jar" + "|" + "[^,]*hive-hcatalog-core[^,]*?\\.jar"); @@ -876,6 +884,10 @@ public boolean getSchedulerPriorityConsidered() { return Boolean.parseBoolean(getOptional("kylin.job.scheduler.priority-considered", FALSE)); } + public boolean IsJobPreemptiveExecution() { + return Boolean.parseBoolean(getOptional("kylin.job.scheduler.priority-preemptive-execution", TRUE)); + } + public Integer getSchedulerPriorityBarFetchFromQueue() { return Integer.parseInt(getOptional("kylin.job.scheduler.priority-bar-fetch-from-queue", "20")); } diff --git a/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java index 5fb1187a619..b053ee5b742 100644 --- a/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java +++ b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java @@ -74,7 +74,11 @@ public static DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegme /** Merge multiple small segments into a big one. */ public static DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) { - return batchEngine(mergeSegment).createBatchMergeJob(mergeSegment, submitter); + return batchEngine(mergeSegment).createBatchMergeJob(mergeSegment, submitter, 0); + } + + public static DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter, Integer priorityOffset) { + return batchEngine(mergeSegment).createBatchMergeJob(mergeSegment, submitter, priorityOffset); } /** Optimize a segment based on the cuboid recommend list produced by the cube planner. */ diff --git a/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java b/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java index 08f9c97ab39..57c66655fbc 100644 --- a/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java +++ b/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java @@ -36,7 +36,7 @@ public interface IBatchCubingEngine { public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter, Integer priorityOffset); /** Merge multiple small segments into a big one. */ - public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter); + public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter, Integer priorityOffset); /** Optimize a segment based on the cuboid recommend list produced by the cube planner. */ public DefaultChainedExecutable createBatchOptimizeJob(CubeSegment optimizeSegment, String submitter); diff --git a/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java b/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java index 2694e25e073..d28a54c9d71 100644 --- a/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java +++ b/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java @@ -114,6 +114,18 @@ public boolean getJobPriorityConsidered() { return config.getSchedulerPriorityConsidered(); } + public boolean IsJobPreemptiveExecution() { + return config.IsJobPreemptiveExecution(); + } + + public int getLowPriorityBar() { + return config.getLowPriorityBar(); + } + + public int getLowPriorityJobLimit() { + return config.getLowPriorityJobLimit(); + } + /** * @return the priority bar for fetching jobs from job priority queue */ diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/FetcherRunner.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/FetcherRunner.java index 1022dfb82c5..8623531b0b2 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/FetcherRunner.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/FetcherRunner.java @@ -20,7 +20,6 @@ import java.util.Map; import java.util.Set; - import org.apache.kylin.shaded.com.google.common.collect.Sets; import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.job.execution.AbstractExecutable; diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java index fe3703f5ff6..0d9b8a8b8f5 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java @@ -61,7 +61,9 @@ synchronized public void run() { // fetch job from jobPriorityQueue first to reduce chance to scan job list Map leftJobPriorities = Maps.newHashMap(); Pair executableWithPriority; - while ((executableWithPriority = jobPriorityQueue.peek()) != null + + while (jobEngineConfig.IsJobPreemptiveExecution() + && (executableWithPriority = jobPriorityQueue.peek()) != null // the priority of jobs in pendingJobPriorities should be above a threshold && executableWithPriority.getSecond() >= jobEngineConfig.getFetchQueuePriorityBar()) { executableWithPriority = jobPriorityQueue.poll(); @@ -147,7 +149,9 @@ synchronized public void run() { jobPriorityQueue.add(new Pair<>(executable, priority)); } - while ((executableWithPriority = jobPriorityQueue.poll()) != null && !isJobPoolFull()) { + while ((executableWithPriority = jobPriorityQueue.poll()) != null && !isJobPoolFull() + && (executableWithPriority.getSecond() > jobEngineConfig.getLowPriorityBar() + || runningJobs.size() < jobEngineConfig.getLowPriorityJobLimit())) { addToJobPool(executableWithPriority.getFirst(), executableWithPriority.getSecond()); } diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngineParquet.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngineParquet.java index 89e5e65ca0c..13d505a7555 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngineParquet.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngineParquet.java @@ -43,12 +43,12 @@ public IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeSegment newSegment) { @Override public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter, Integer priorityOffset) { - return NSparkCubingJob.create(Sets.newHashSet(newSegment), submitter); + return NSparkCubingJob.create(Sets.newHashSet(newSegment), submitter, priorityOffset); } @Override - public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) { - return NSparkMergingJob.merge(mergeSegment, submitter); + public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter, Integer priorityOffset) { + return NSparkMergingJob.merge(mergeSegment, submitter, priorityOffset); } @Override diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java index abb7f658fbb..a419c463fe1 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java @@ -26,7 +26,6 @@ import java.util.TimeZone; import java.util.UUID; import java.util.stream.Collectors; - import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; @@ -54,12 +53,12 @@ public class NSparkCubingJob extends CubingJob { private CubeInstance cube; // for test use only - public static NSparkCubingJob create(Set segments, String submitter) { - return create(segments, submitter, CubingJobTypeEnum.BUILD, UUID.randomUUID().toString()); + public static NSparkCubingJob create(Set segments, String submitter, Integer priorityOffset) { + return create(segments, submitter, CubingJobTypeEnum.BUILD, UUID.randomUUID().toString(), priorityOffset); } public static NSparkCubingJob create(Set segments, String submitter, CubingJobTypeEnum jobType, - String jobId) { + String jobId, Integer priorityOffset) { Preconditions.checkArgument(!segments.isEmpty()); Preconditions.checkArgument(submitter != null); NSparkCubingJob job = new NSparkCubingJob(); @@ -79,6 +78,7 @@ public static NSparkCubingJob create(Set segments, String submitter } builder.append(format.format(new Date(System.currentTimeMillis()))); job.setId(jobId); + job.setPriority(priorityOffset); job.setName(builder.toString()); job.setProjectName(job.cube.getProject()); job.setTargetSubject(job.cube.getModel().getId()); diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingJob.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingJob.java index 2e6f79411f0..c77cd05e4c4 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingJob.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingJob.java @@ -42,8 +42,8 @@ public class NSparkMergingJob extends CubingJob { @SuppressWarnings("unused") private static final Logger logger = LoggerFactory.getLogger(NSparkMergingJob.class); - public static NSparkMergingJob merge(CubeSegment mergedSegment, String submitter) { - return NSparkMergingJob.merge(mergedSegment, submitter, CubingJobTypeEnum.MERGE, UUID.randomUUID().toString()); + public static NSparkMergingJob merge(CubeSegment mergedSegment, String submitter, Integer priorityOffset) { + return NSparkMergingJob.merge(mergedSegment, submitter, CubingJobTypeEnum.MERGE, UUID.randomUUID().toString(), priorityOffset); } /** @@ -51,7 +51,7 @@ public static NSparkMergingJob merge(CubeSegment mergedSegment, String submitter * * @param mergedSegment, new segment that expect to merge, which should contains a couple of ready segments. */ - public static NSparkMergingJob merge(CubeSegment mergedSegment, String submitter, CubingJobTypeEnum jobType, String jobId) { + public static NSparkMergingJob merge(CubeSegment mergedSegment, String submitter, CubingJobTypeEnum jobType, String jobId, Integer priorityOffset) { CubeInstance cube = mergedSegment.getCubeInstance(); NSparkMergingJob job = new NSparkMergingJob(); @@ -66,6 +66,7 @@ public static NSparkMergingJob merge(CubeSegment mergedSegment, String submitter builder.append(format.format(new Date(System.currentTimeMillis()))); job.setName(builder.toString()); job.setId(jobId); + job.setPriority(priorityOffset); job.setTargetSubject(mergedSegment.getModel().getUuid()); job.setTargetSegments(Lists.newArrayList(String.valueOf(mergedSegment.getUuid()))); job.setProject(mergedSegment.getProject()); diff --git a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java index ae2bb1503f9..6ee50fb7617 100644 --- a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java +++ b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java @@ -177,7 +177,7 @@ public ExecutableState buildCuboid(String cubeName, SegmentRange.TSRange tsRange DataModelManager.getInstance(config).getModels(); // ready cube, segment, cuboid layout CubeSegment oneSeg = cubeMgr.appendSegment(cube, tsRange); - NSparkCubingJob job = NSparkCubingJob.create(Sets.newHashSet(oneSeg), "ADMIN"); + NSparkCubingJob job = NSparkCubingJob.create(Sets.newHashSet(oneSeg), "ADMIN", 0); NSparkCubingStep sparkStep = job.getSparkCubingStep(); StorageURL distMetaUrl = StorageURL.valueOf(sparkStep.getDistMetaUrl()); Assert.assertEquals("hdfs", distMetaUrl.getScheme()); @@ -199,7 +199,7 @@ protected ExecutableState mergeSegments(String cubeName, long start, long end, b ExecutableManager execMgr = ExecutableManager.getInstance(config); CubeInstance cube = cubeMgr.reloadCube(cubeName); CubeSegment mergeSegment = cubeMgr.mergeSegments(cube, new SegmentRange.TSRange(start, end), null, force); - NSparkMergingJob mergeJob = NSparkMergingJob.merge(mergeSegment, "ADMIN"); + NSparkMergingJob mergeJob = NSparkMergingJob.merge(mergeSegment, "ADMIN", 0); execMgr.addJob(mergeJob); ExecutableState result = wait(mergeJob); if (config.cleanStorageAfterDelOperation()) { diff --git a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/job/JobStepFactoryTest.java b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/job/JobStepFactoryTest.java index 64ef0eeef32..566e92d94c1 100644 --- a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/job/JobStepFactoryTest.java +++ b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/job/JobStepFactoryTest.java @@ -62,7 +62,7 @@ public void testAddStepInCubing() throws IOException { cleanupSegments(CUBE_NAME); CubeSegment oneSeg = cubeMgr.appendSegment(cube, new SegmentRange.TSRange(0L, Long.MAX_VALUE)); Set segments = Sets.newHashSet(oneSeg); - NSparkCubingJob job = NSparkCubingJob.create(segments, "ADMIN"); + NSparkCubingJob job = NSparkCubingJob.create(segments, "ADMIN", 0); Assert.assertEquals(CUBE_NAME, job.getParam(MetadataConstants.P_CUBE_NAME)); NSparkExecutable resourceDetectStep = job.getResourceDetectStep(); @@ -110,7 +110,7 @@ public void testAddStepInMerging() throws Exception { reloadCube = cubeMgr.reloadCube(CUBE_NAME); CubeSegment mergedSegment = cubeMgr.mergeSegments(reloadCube, new SegmentRange.TSRange(dateToLong("2010-01-01"), dateToLong("2015-01-01")) , null, true); - NSparkMergingJob job = NSparkMergingJob.merge(mergedSegment, "ADMIN"); + NSparkMergingJob job = NSparkMergingJob.merge(mergedSegment, "ADMIN", 0); Assert.assertEquals(CUBE_NAME, job.getParam(MetadataConstants.P_CUBE_NAME)); NSparkExecutable resourceDetectStep = job.getResourceDetectStep(); diff --git a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/job/SparkCubingJobTest.java b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/job/SparkCubingJobTest.java index 91768ac693d..bcd694c6cea 100644 --- a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/job/SparkCubingJobTest.java +++ b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/job/SparkCubingJobTest.java @@ -91,7 +91,7 @@ public void testBuildJob() throws Exception { long date3 = dateToLong("2013-07-01"); CubeSegment segment = cubeMgr.appendSegment(cubeInstance, new SegmentRange.TSRange(date1, date2)); - NSparkCubingJob job = NSparkCubingJob.create(Sets.newHashSet(segment), "ADMIN"); + NSparkCubingJob job = NSparkCubingJob.create(Sets.newHashSet(segment), "ADMIN", 0); jobService.addJob(job); // wait job done ExecutableState state = wait(job); @@ -103,7 +103,7 @@ public void testBuildJob() throws Exception { // Test build 2nd segment CubeSegment segment2 = cubeMgr.appendSegment(cubeInstance, new SegmentRange.TSRange(date2, date3)); - NSparkCubingJob job2 = NSparkCubingJob.create(Sets.newHashSet(segment2), "ADMIN"); + NSparkCubingJob job2 = NSparkCubingJob.create(Sets.newHashSet(segment2), "ADMIN", 0); jobService.addJob(job2); // wait job done ExecutableState state2 = wait(job2); @@ -128,14 +128,14 @@ public void testBuildTwoSegmentsAndMerge() throws Exception { long date3 = dateToLong("2014-01-01"); CubeSegment segment = cubeMgr.appendSegment(cubeInstance, new SegmentRange.TSRange(date1, date2)); - NSparkCubingJob job = NSparkCubingJob.create(Sets.newHashSet(segment), "ADMIN"); + NSparkCubingJob job = NSparkCubingJob.create(Sets.newHashSet(segment), "ADMIN", 0); jobService.addJob(job); // wait job done ExecutableState state = wait(job); Assert.assertEquals(ExecutableState.SUCCEED, state); CubeSegment segment2 = cubeMgr.appendSegment(cubeInstance, new SegmentRange.TSRange(date2, date3)); - NSparkCubingJob job2 = NSparkCubingJob.create(Sets.newHashSet(segment2), "ADMIN"); + NSparkCubingJob job2 = NSparkCubingJob.create(Sets.newHashSet(segment2), "ADMIN", 0); jobService.addJob(job2); // wait job done ExecutableState state2 = wait(job2); @@ -148,7 +148,7 @@ public void testBuildTwoSegmentsAndMerge() throws Exception { */ CubeSegment firstMergeSeg = cubeMgr.mergeSegments(cubeInstance, new SegmentRange.TSRange(date1, date3), null, true); - NSparkMergingJob firstMergeJob = NSparkMergingJob.merge(firstMergeSeg, "ADMIN"); + NSparkMergingJob firstMergeJob = NSparkMergingJob.merge(firstMergeSeg, "ADMIN", 0); jobService.addJob(firstMergeJob); // wait job done Assert.assertEquals(ExecutableState.SUCCEED, wait(firstMergeJob)); diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java index e0d08f91eca..eda1f761c78 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -266,7 +266,7 @@ public JobInstance submitJobInternal(CubeInstance cube, TSRange tsRange, Segment } else if (buildType == JobTypeEnum.MERGE) { newSeg = getCubeManager().mergeSegments(cube, tsRange, segRange, force); - job = EngineFactory.createBatchMergeJob(newSeg, submitter); + job = EngineFactory.createBatchMergeJob(newSeg, submitter, priorityOffset); } else if (buildType == JobTypeEnum.REFRESH) { newSeg = getCubeManager().refreshSegment(cube, tsRange, segRange); job = EngineFactory.createBatchCubingJob(newSeg, submitter, priorityOffset);