From d66502c467b096530e83641ca978f84258709065 Mon Sep 17 00:00:00 2001 From: Dery Rahman Ahaddienata Date: Sun, 10 Aug 2025 11:57:47 +0700 Subject: [PATCH] feat: termination on exhausted retry --- mc2mc/internal/client/odps.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/mc2mc/internal/client/odps.go b/mc2mc/internal/client/odps.go index 927941e..d1a31b1 100644 --- a/mc2mc/internal/client/odps.go +++ b/mc2mc/internal/client/odps.go @@ -49,7 +49,7 @@ func (c *odpsClient) ExecSQL(ctx context.Context, query string, additionalHints // generate log view url, err := c.generateLogView(taskIns) if err != nil { - err = e.Join(err, taskIns.Terminate()) + err = e.Join(err, c.terminate(taskIns)) return errors.WithStack(err) } c.logger.Info(fmt.Sprintf("taskId: %s, log view: %s , hints: (%s)", taskIns.Id(), url, getHintsString(hints))) @@ -64,7 +64,12 @@ func (c *odpsClient) ExecSQL(ctx context.Context, query string, additionalHints c.logger.Info(msg) return errors.WithStack(c.terminate(taskIns)) case err := <-c.wait(taskIns): - return errors.WithStack(err) + if err != nil { + c.logger.Error(fmt.Sprintf("task instance %s failed: %s", taskIns.Id(), err)) + err = e.Join(err, c.terminate(taskIns)) // terminate task instance on failure + return errors.WithStack(err) + } + return nil } } @@ -146,6 +151,11 @@ func (c *odpsClient) wait(taskIns *odps.Instance) <-chan error { if err != nil { err := errors.Wrap(err, fmt.Sprintf("task instance %s failed", taskIns.Id())) errChan <- errors.WithStack(err) + return + } + if err := taskIns.Load(); err != nil { + c.logger.Warn(fmt.Sprintf("failed to load task instance %s: %s", taskIns.Id(), err)) + return } c.logger.Info(fmt.Sprintf("task instance %s finished with status: %s", taskIns.Id(), taskIns.Status())) sum, err := taskIns.GetTaskSummary(taskIns.TaskNameCommitted())