Skip to content

Commit

Permalink
TEZ-4580: Slow preemption of new containers when re-use is enabled (#374
Browse files Browse the repository at this point in the history
) (Himanshu Mishra reviewed by Laszlo Bodor)
  • Loading branch information
himanshu-mishra authored Dec 23, 2024
1 parent b95defc commit 9efa6f1
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1275,7 +1275,7 @@ boolean preemptIfNeeded() {
+ numHighestPriRequests + " pending requests at pri: "
+ highestPriRequest.getPriority());
}

int newContainersReleased = 0;
for (int i=0; i<numPendingRequestsToService; ++i) {
// This request must have been considered for matching with all existing
// containers when request was made.
Expand Down Expand Up @@ -1311,7 +1311,7 @@ boolean preemptIfNeeded() {
" with priority: " + lowestPriNewContainer.getPriority() +
" to free resource for request: " + highestPriRequest +
" . Current free resources: " + freeResources);
numPendingRequestsToService--;
newContainersReleased++;
releaseUnassignedContainers(Collections.singletonList(lowestPriNewContainer));
// We are returning an unused resource back the RM. The RM thinks it
// has serviced our initial request and will not re-allocate this back
Expand All @@ -1324,7 +1324,7 @@ boolean preemptIfNeeded() {
continue;
}
}

numPendingRequestsToService -= newContainersReleased;
if (numPendingRequestsToService < 1) {
return true;
}
Expand Down Expand Up @@ -1573,6 +1573,9 @@ private void releaseContainer(ContainerId containerId) {
if (delayedContainer != null) {
Resources.subtractFrom(allocatedResources,
delayedContainer.getContainer().getResource());
if (shouldReuseContainers) {
delayedContainerManager.removeDelayedContainer(delayedContainer);
}
}
if (delayedContainer != null || !shouldReuseContainers) {
amRmClient.releaseAssignedContainer(containerId);
Expand Down Expand Up @@ -2163,6 +2166,13 @@ void addDelayedContainer(Container container,
}
}

void removeDelayedContainer(HeldContainer container) {
synchronized(this) {
if (delayedContainers.remove(container)) {
LOG.debug("Removed {} from delayed containers", container.getContainer().getId());
}
}
}
}

synchronized void determineMinHeldContainers() {
Expand Down
101 changes: 101 additions & 0 deletions tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -989,6 +989,107 @@ public void testTaskSchedulerRandomReuseExpireTime() throws Exception {
scheduler2.shutdown();
}

@Test(timeout = 5000)
public void testTaskSchedulerPreemptionWithLowAndHighPriorityRequests() throws Exception {
TezAMRMClientAsync<CookieContainerRequest> mockRMClient = spy(
new AMRMClientAsyncForTest(new AMRMClientForTest(), 100));

Configuration conf = new Configuration();
conf.setInt(TezConfiguration.TEZ_AM_PREEMPTION_PERCENTAGE, 50);

TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(DEFAULT_APP_HOST, DEFAULT_APP_PORT, DEFAULT_APP_URL,
false, null, null, new PreemptionMatcher(), conf);
final TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp);

final TaskSchedulerWithDrainableContext scheduler =
new TaskSchedulerWithDrainableContext(drainableAppCallback, mockRMClient);
scheduler.initialize();
scheduler.start();
int initialRmCapacity = 4;
int lowPriorityTasks = 5;
int highPriorityTasks = 6;
Resource taskAsk = Resource.newInstance(1000, 1);

Resource totalResource = Resource.newInstance(4000, 4);
when(mockRMClient.getAvailableResources()).thenReturn(totalResource);

// Add lower priority tasks
Priority lowPriority = Priority.newInstance(74);
for (int i = 0; i < lowPriorityTasks; i++) {
Object low = new Object();
TaskAttempt ta = mock(TaskAttempt.class);
scheduler.allocateTask(ta, taskAsk, null, null, lowPriority, low, null);
}

scheduler.getProgress(); // Will update the highest priority
drainableAppCallback.drain();
// 5 containers requested for lower priority tasks
verify(mockRMClient, times(5)).addContainerRequest(any(CookieContainerRequest.class));

// Allocate requested containers
List<Container> lowPriorityContainers = new ArrayList<>();
for (int i = 0; i < initialRmCapacity; i++) {
ContainerId containerId = ContainerId.newContainerId(
ApplicationAttemptId.newInstance(ApplicationId.newInstance(1L, 1), 1), i);
NodeId nodeId = NodeId.newInstance("host-" + i, 8041);
Container container = Container.newInstance(containerId, nodeId, "host-" + i, taskAsk, lowPriority, null);
lowPriorityContainers.add(container);
}

totalResource = Resource.newInstance(0, 0);
when(mockRMClient.getAvailableResources()).thenReturn(totalResource);

// We don't want containers to be assigned to a task by delayedContainerManager as it invokes another preemption
// flow. Delayed thread first takes lock on delayedContainerManager instance to check if there are any containers
// We block the thread, ensure all delayed containers have schedule time beyond test's runtime to avoid assignment.
synchronized (scheduler.delayedContainerManager) {
scheduler.onContainersAllocated(lowPriorityContainers);
drainableAppCallback.drain();
for (HeldContainer container : scheduler.delayedContainerManager.delayedContainers) {
// Set next schedule beyond this test's time to avoid any assignment
container.setNextScheduleTime(System.currentTimeMillis() + 10000);
// No preemption if assignment attempt of new container < 3
container.incrementAssignmentAttempts();
container.incrementAssignmentAttempts();
container.incrementAssignmentAttempts();
}
}

// No releases so far
verify(mockRMClient, times(0)).releaseAssignedContainer(any());

// Add higher priority task
Priority highPriority = Priority.newInstance(71);
for (int i = 0; i < highPriorityTasks; i++) {
Object high = new Object();
TaskAttempt ta = mock(TaskAttempt.class);
scheduler.allocateTask(ta, taskAsk, null, null, highPriority, high, null);
}

drainableAppCallback.drain();
// low priority tasks + high priority tasks
verify(mockRMClient, times(11)).addContainerRequest(any(CookieContainerRequest.class));

// Trigger preemption to release containers as 50% of pending high priority requests
scheduler.getProgress();
drainableAppCallback.drain();

// 50% of 6 high priority requests = 3, 4 containers were held - hence 3 will be released
verify(mockRMClient, times(3)).releaseAssignedContainer(any());

// Trigger another preemption cycle
scheduler.getProgress();
drainableAppCallback.drain();
// 50% of 6 high priority requests = 3, but only 1 container is held - which will be released,
// incrementing total to 4
verify(mockRMClient, times(4)).releaseAssignedContainer(any());
AppFinalStatus finalStatus =
new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, "", DEFAULT_APP_URL);
when(mockApp.getFinalAppStatus()).thenReturn(finalStatus);
scheduler.shutdown();
drainableAppCallback.drain();
}

@SuppressWarnings({ "unchecked", "rawtypes" })
@Test (timeout=5000)
public void testTaskSchedulerPreemption() throws Exception {
Expand Down

0 comments on commit 9efa6f1

Please sign in to comment.