diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index e64578911af..6a44bbcc433 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -638,38 +638,14 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows workflow.WorkflowSubType = res.WorkflowSubType.String() workflow.DeferSecondaryKeys = res.DeferSecondaryKeys - // MaxVReplicationTransactionLag estimates the actual statement processing lag - // between the source and the target. If we are still processing source events it - // is the difference b/w current time and the timestamp of the last event. If - // heartbeats are more recent than the last event, then the lag is the time since - // the last heartbeat as there can be an actual event immediately after the - // heartbeat, but which has not yet been processed on the target. - // We don't allow switching during the copy phase, so in that case we just return - // a large lag. All timestamps are in seconds since epoch. + // MaxVReplicationTransactionLag estimates the max statement processing lag + // between the source and the target across all of the workflow streams. if _, ok := maxVReplicationTransactionLagByWorkflow[workflow.Name]; !ok { maxVReplicationTransactionLagByWorkflow[workflow.Name] = 0 } - if rstream.TransactionTimestamp == nil { - rstream.TransactionTimestamp = &vttimepb.Time{} - } - lastTransactionTime := rstream.TransactionTimestamp.Seconds - if rstream.TimeHeartbeat == nil { - rstream.TimeHeartbeat = &vttimepb.Time{} - } - lastHeartbeatTime := rstream.TimeHeartbeat.Seconds - if stream.State == binlogdatapb.VReplicationWorkflowState_Copying.String() { - maxVReplicationTransactionLagByWorkflow[workflow.Name] = math.MaxInt64 - } else { - if lastTransactionTime == 0 /* no new events after copy */ || - lastHeartbeatTime > lastTransactionTime /* no recent transactions, so all caught up */ { - - lastTransactionTime = lastHeartbeatTime - } - now := time.Now().Unix() /* seconds since epoch */ - transactionReplicationLag := float64(now - lastTransactionTime) - if transactionReplicationLag > maxVReplicationTransactionLagByWorkflow[workflow.Name] { - maxVReplicationTransactionLagByWorkflow[workflow.Name] = transactionReplicationLag - } + transactionReplicationLag := getVReplicationTrxLag(rstream.TransactionTimestamp, rstream.TimeUpdated, rstream.TimeHeartbeat, rstream.State) + if transactionReplicationLag > maxVReplicationTransactionLagByWorkflow[workflow.Name] { + maxVReplicationTransactionLagByWorkflow[workflow.Name] = transactionReplicationLag } } @@ -4072,3 +4048,41 @@ func (s *Server) getWorkflowStatus(ctx context.Context, keyspace string, workflo } return workflowStatus, nil } + +// getVReplicationTrxLag estimates the actual statement processing lag between the +// source and the target. If we are still processing source events it is the +// difference between current time and the timestamp of the last event. If +// heartbeats are more recent than the last event, then the lag is the time since +// the last heartbeat as there can be an actual event immediately after the +// heartbeat, but which has not yet been processed on the target. We don't allow +// switching during the copy phase, so in that case we just return a large lag. +// All timestamps are in seconds since epoch. +func getVReplicationTrxLag(trxTs, updatedTs, heartbeatTs *vttimepb.Time, state binlogdatapb.VReplicationWorkflowState) float64 { + if state == binlogdatapb.VReplicationWorkflowState_Copying { + return math.MaxInt64 + } + if trxTs == nil { + trxTs = &vttimepb.Time{} + } + lastTransactionTime := trxTs.Seconds + if updatedTs == nil { + updatedTs = &vttimepb.Time{} + } + lastUpdatedTime := updatedTs.Seconds + if heartbeatTs == nil { + heartbeatTs = &vttimepb.Time{} + } + lastHeartbeatTime := heartbeatTs.Seconds + // We do NOT update the heartbeat timestamp when we are regularly updating the + // position as we replicate transactions (GTIDs). + // When we DO record a heartbeat, we set the updated time to the same value. + // When recording that we are throttled, we update the updated time but NOT + // the heartbeat time. + if lastTransactionTime == 0 /* No replicated events after copy */ || + (lastUpdatedTime == lastHeartbeatTime && /* The last update was from a heartbeat */ + lastUpdatedTime > lastTransactionTime /* No recent transactions, only heartbeats, so all caught up */) { + lastTransactionTime = lastUpdatedTime + } + now := time.Now().Unix() // Seconds since epoch + return float64(now - lastTransactionTime) +}