Skip to content

Commit 075d8d0

Browse files
committed
Allow retrying tasks from coordinator stages
If coordinator stage is processing data output by workers we want to enable retries. Even if coordinator is working fine the task from coordinator stage may fail e.g. if upstream task fails while data it produces is speculatively processed by coordinator stage task.
1 parent 59884cc commit 075d8d0

File tree

4 files changed

+183
-65
lines changed

4 files changed

+183
-65
lines changed

core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1519,7 +1519,19 @@ private void createStageExecution(
15191519
private static boolean shouldRetry(SqlStage stage)
15201520
{
15211521
boolean coordinatorStage = stage.getFragment().getPartitioning().equals(COORDINATOR_DISTRIBUTION);
1522-
return !coordinatorStage;
1522+
1523+
if (!coordinatorStage) {
1524+
return true;
1525+
}
1526+
1527+
if (!stage.getFragment().getRemoteSourceNodes().isEmpty()) {
1528+
// If coordinator stage is processing workers data we want to enable retries.
1529+
// Even if coordinator is working fine the task from coordinator stage may fail e.g. if
1530+
// upstream task fails while data it produces is speculatively processed by coordinator stage task.
1531+
return true;
1532+
}
1533+
1534+
return false;
15231535
}
15241536

15251537
private StageId getStageId(PlanFragmentId fragmentId)

plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaFailureRecoveryTest.java

Lines changed: 65 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import static io.trino.execution.FailureInjector.InjectedFailureType.TASK_GET_RESULTS_REQUEST_TIMEOUT;
3434
import static io.trino.execution.FailureInjector.InjectedFailureType.TASK_MANAGEMENT_REQUEST_FAILURE;
3535
import static io.trino.execution.FailureInjector.InjectedFailureType.TASK_MANAGEMENT_REQUEST_TIMEOUT;
36+
import static io.trino.operator.RetryPolicy.TASK;
3637
import static io.trino.plugin.exchange.filesystem.containers.MinioStorage.getExchangeManagerProperties;
3738
import static io.trino.testing.TestingNames.randomNameSuffix;
3839
import static java.util.Locale.ENGLISH;
@@ -92,19 +93,39 @@ protected void testDelete()
9293
Optional<String> cleanupQuery = Optional.of("DROP TABLE <table>");
9394
String deleteQuery = "DELETE FROM <table> WHERE orderkey = 1";
9495

95-
assertThatQuery(deleteQuery)
96-
.withSetupQuery(setupQuery)
97-
.withCleanupQuery(cleanupQuery)
98-
.experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR))
99-
.at(boundaryCoordinatorStage())
100-
.failsAlways(failure -> failure.hasMessageContaining(FAILURE_INJECTION_MESSAGE));
96+
if (getRetryPolicy() == TASK) {
97+
assertThatQuery(deleteQuery)
98+
.withSetupQuery(setupQuery)
99+
.withCleanupQuery(cleanupQuery)
100+
.experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR))
101+
.at(boundaryCoordinatorStage())
102+
.finishesSuccessfully();
103+
}
104+
else {
105+
assertThatQuery(deleteQuery)
106+
.withSetupQuery(setupQuery)
107+
.withCleanupQuery(cleanupQuery)
108+
.experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR))
109+
.at(boundaryCoordinatorStage())
110+
.failsAlways(failure -> failure.hasMessageContaining(FAILURE_INJECTION_MESSAGE));
111+
}
101112

102-
assertThatQuery(deleteQuery)
103-
.withSetupQuery(setupQuery)
104-
.withCleanupQuery(cleanupQuery)
105-
.experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR))
106-
.at(rootStage())
107-
.failsAlways(failure -> failure.hasMessageContaining(FAILURE_INJECTION_MESSAGE));
113+
if (getRetryPolicy() == TASK) {
114+
assertThatQuery(deleteQuery)
115+
.withSetupQuery(setupQuery)
116+
.withCleanupQuery(cleanupQuery)
117+
.experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR))
118+
.at(rootStage())
119+
.finishesSuccessfully();
120+
}
121+
else {
122+
assertThatQuery(deleteQuery)
123+
.withSetupQuery(setupQuery)
124+
.withCleanupQuery(cleanupQuery)
125+
.experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR))
126+
.at(rootStage())
127+
.failsAlways(failure -> failure.hasMessageContaining(FAILURE_INJECTION_MESSAGE));
128+
}
108129

109130
assertThatQuery(deleteQuery)
110131
.withSetupQuery(setupQuery)
@@ -177,19 +198,39 @@ protected void testUpdate()
177198
Optional<String> cleanupQuery = Optional.of("DROP TABLE <table>");
178199
String updateQuery = "UPDATE <table> SET shippriority = 101 WHERE custkey = 1";
179200

180-
assertThatQuery(updateQuery)
181-
.withSetupQuery(setupQuery)
182-
.withCleanupQuery(cleanupQuery)
183-
.experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR))
184-
.at(boundaryCoordinatorStage())
185-
.failsAlways(failure -> failure.hasMessageContaining(FAILURE_INJECTION_MESSAGE));
201+
if (getRetryPolicy() == TASK) {
202+
assertThatQuery(updateQuery)
203+
.withSetupQuery(setupQuery)
204+
.withCleanupQuery(cleanupQuery)
205+
.experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR))
206+
.at(boundaryCoordinatorStage())
207+
.finishesSuccessfully();
208+
}
209+
else {
210+
assertThatQuery(updateQuery)
211+
.withSetupQuery(setupQuery)
212+
.withCleanupQuery(cleanupQuery)
213+
.experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR))
214+
.at(boundaryCoordinatorStage())
215+
.failsAlways(failure -> failure.hasMessageContaining(FAILURE_INJECTION_MESSAGE));
216+
}
186217

187-
assertThatQuery(updateQuery)
188-
.withSetupQuery(setupQuery)
189-
.withCleanupQuery(cleanupQuery)
190-
.experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR))
191-
.at(rootStage())
192-
.failsAlways(failure -> failure.hasMessageContaining(FAILURE_INJECTION_MESSAGE));
218+
if (getRetryPolicy() == TASK) {
219+
assertThatQuery(updateQuery)
220+
.withSetupQuery(setupQuery)
221+
.withCleanupQuery(cleanupQuery)
222+
.experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR))
223+
.at(rootStage())
224+
.finishesSuccessfully();
225+
}
226+
else {
227+
assertThatQuery(updateQuery)
228+
.withSetupQuery(setupQuery)
229+
.withCleanupQuery(cleanupQuery)
230+
.experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR))
231+
.at(rootStage())
232+
.failsAlways(failure -> failure.hasMessageContaining(FAILURE_INJECTION_MESSAGE));
233+
}
193234

194235
assertThatQuery(updateQuery)
195236
.withSetupQuery(setupQuery)

plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergFailureRecoveryTest.java

Lines changed: 65 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import static io.trino.execution.FailureInjector.InjectedFailureType.TASK_GET_RESULTS_REQUEST_TIMEOUT;
2727
import static io.trino.execution.FailureInjector.InjectedFailureType.TASK_MANAGEMENT_REQUEST_FAILURE;
2828
import static io.trino.execution.FailureInjector.InjectedFailureType.TASK_MANAGEMENT_REQUEST_TIMEOUT;
29+
import static io.trino.operator.RetryPolicy.TASK;
2930

3031
public abstract class BaseIcebergFailureRecoveryTest
3132
extends BaseFailureRecoveryTest
@@ -62,19 +63,39 @@ protected void testDelete()
6263
Optional<String> cleanupQuery = Optional.of("DROP TABLE <table>");
6364
String deleteQuery = "DELETE FROM <table> WHERE orderkey = 1";
6465

65-
assertThatQuery(deleteQuery)
66-
.withSetupQuery(setupQuery)
67-
.withCleanupQuery(cleanupQuery)
68-
.experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR))
69-
.at(boundaryCoordinatorStage())
70-
.failsAlways(failure -> failure.hasMessageContaining(FAILURE_INJECTION_MESSAGE));
66+
if (getRetryPolicy() == TASK) {
67+
assertThatQuery(deleteQuery)
68+
.withSetupQuery(setupQuery)
69+
.withCleanupQuery(cleanupQuery)
70+
.experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR))
71+
.at(boundaryCoordinatorStage())
72+
.finishesSuccessfully();
73+
}
74+
else {
75+
assertThatQuery(deleteQuery)
76+
.withSetupQuery(setupQuery)
77+
.withCleanupQuery(cleanupQuery)
78+
.experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR))
79+
.at(boundaryCoordinatorStage())
80+
.failsAlways(failure -> failure.hasMessageContaining(FAILURE_INJECTION_MESSAGE));
81+
}
7182

72-
assertThatQuery(deleteQuery)
73-
.withSetupQuery(setupQuery)
74-
.withCleanupQuery(cleanupQuery)
75-
.experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR))
76-
.at(rootStage())
77-
.failsAlways(failure -> failure.hasMessageContaining(FAILURE_INJECTION_MESSAGE));
83+
if (getRetryPolicy() == TASK) {
84+
assertThatQuery(deleteQuery)
85+
.withSetupQuery(setupQuery)
86+
.withCleanupQuery(cleanupQuery)
87+
.experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR))
88+
.at(rootStage())
89+
.finishesSuccessfully();
90+
}
91+
else {
92+
assertThatQuery(deleteQuery)
93+
.withSetupQuery(setupQuery)
94+
.withCleanupQuery(cleanupQuery)
95+
.experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR))
96+
.at(rootStage())
97+
.failsAlways(failure -> failure.hasMessageContaining(FAILURE_INJECTION_MESSAGE));
98+
}
7899

79100
assertThatQuery(deleteQuery)
80101
.withSetupQuery(setupQuery)
@@ -140,19 +161,39 @@ protected void testUpdate()
140161
Optional<String> cleanupQuery = Optional.of("DROP TABLE <table>");
141162
String updateQuery = "UPDATE <table> SET shippriority = 101 WHERE custkey = 1";
142163

143-
assertThatQuery(updateQuery)
144-
.withSetupQuery(setupQuery)
145-
.withCleanupQuery(cleanupQuery)
146-
.experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR))
147-
.at(boundaryCoordinatorStage())
148-
.failsAlways(failure -> failure.hasMessageContaining(FAILURE_INJECTION_MESSAGE));
164+
if (getRetryPolicy() == TASK) {
165+
assertThatQuery(updateQuery)
166+
.withSetupQuery(setupQuery)
167+
.withCleanupQuery(cleanupQuery)
168+
.experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR))
169+
.at(boundaryCoordinatorStage())
170+
.finishesSuccessfully();
171+
}
172+
else {
173+
assertThatQuery(updateQuery)
174+
.withSetupQuery(setupQuery)
175+
.withCleanupQuery(cleanupQuery)
176+
.experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR))
177+
.at(boundaryCoordinatorStage())
178+
.failsAlways(failure -> failure.hasMessageContaining(FAILURE_INJECTION_MESSAGE));
179+
}
149180

150-
assertThatQuery(updateQuery)
151-
.withSetupQuery(setupQuery)
152-
.withCleanupQuery(cleanupQuery)
153-
.experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR))
154-
.at(rootStage())
155-
.failsAlways(failure -> failure.hasMessageContaining(FAILURE_INJECTION_MESSAGE));
181+
if (getRetryPolicy() == TASK) {
182+
assertThatQuery(updateQuery)
183+
.withSetupQuery(setupQuery)
184+
.withCleanupQuery(cleanupQuery)
185+
.experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR))
186+
.at(rootStage())
187+
.finishesSuccessfully();
188+
}
189+
else {
190+
assertThatQuery(updateQuery)
191+
.withSetupQuery(setupQuery)
192+
.withCleanupQuery(cleanupQuery)
193+
.experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR))
194+
.at(rootStage())
195+
.failsAlways(failure -> failure.hasMessageContaining(FAILURE_INJECTION_MESSAGE));
196+
}
156197

157198
assertThatQuery(updateQuery)
158199
.withSetupQuery(setupQuery)

testing/trino-testing/src/main/java/io/trino/testing/BaseFailureRecoveryTest.java

Lines changed: 40 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -320,23 +320,47 @@ protected void testNonSelect(Optional<Session> session, Optional<String> setupQu
320320
return;
321321
}
322322

323-
assertThatQuery(query)
324-
.withSession(session)
325-
.withSetupQuery(setupQuery)
326-
.withCleanupQuery(cleanupQuery)
327-
.experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR))
328-
.at(boundaryCoordinatorStage())
329-
.failsAlways(failure -> failure.hasMessageContaining(FAILURE_INJECTION_MESSAGE))
330-
.cleansUpTemporaryTables();
323+
if (retryPolicy == RetryPolicy.TASK) {
324+
assertThatQuery(query)
325+
.withSession(session)
326+
.withSetupQuery(setupQuery)
327+
.withCleanupQuery(cleanupQuery)
328+
.experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR))
329+
.at(boundaryCoordinatorStage())
330+
.finishesSuccessfully()
331+
.cleansUpTemporaryTables();
332+
}
333+
else {
334+
assertThatQuery(query)
335+
.withSession(session)
336+
.withSetupQuery(setupQuery)
337+
.withCleanupQuery(cleanupQuery)
338+
.experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR))
339+
.at(boundaryCoordinatorStage())
340+
.failsAlways(failure -> failure.hasMessageContaining(FAILURE_INJECTION_MESSAGE))
341+
.cleansUpTemporaryTables();
342+
}
331343

332-
assertThatQuery(query)
333-
.withSession(session)
334-
.withSetupQuery(setupQuery)
335-
.withCleanupQuery(cleanupQuery)
336-
.experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR))
337-
.at(rootStage())
338-
.failsAlways(failure -> failure.hasMessageContaining(FAILURE_INJECTION_MESSAGE))
339-
.cleansUpTemporaryTables();
344+
if (retryPolicy == RetryPolicy.TASK) {
345+
assertThatQuery(query)
346+
.withSession(session)
347+
.withSetupQuery(setupQuery)
348+
.withCleanupQuery(cleanupQuery)
349+
.experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR))
350+
.at(rootStage())
351+
.finishesSuccessfully()
352+
.cleansUpTemporaryTables();
353+
}
354+
else {
355+
assertThatQuery(query)
356+
.withSession(session)
357+
.withSetupQuery(setupQuery)
358+
.withCleanupQuery(cleanupQuery)
359+
.experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR))
360+
.at(rootStage())
361+
.failsAlways(failure -> failure.hasMessageContaining(FAILURE_INJECTION_MESSAGE))
362+
.cleansUpTemporaryTables();
363+
}
340364

341365
assertThatQuery(query)
342366
.withSession(session)

0 commit comments

Comments
 (0)