diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index beb556f21f5..3a8e98f1c66 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -162,15 +162,15 @@ func (vsm *vstreamManager) VStream(ctx context.Context, tabletType topodatapb.Ta filter *binlogdatapb.Filter, flags *vtgatepb.VStreamFlags, send func(events []*binlogdatapb.VEvent) error) error { vgtid, filter, flags, err := vsm.resolveParams(ctx, tabletType, vgtid, filter, flags) if err != nil { - return err + return vterrors.Wrap(err, "failed to resolve vstream parameters") } ts, err := vsm.toposerv.GetTopoServer() if err != nil { - return err + return vterrors.Wrap(err, "failed to get topology server") } if ts == nil { log.Errorf("unable to get topo server in VStream()") - return fmt.Errorf("unable to get topo server") + return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "unable to get topoology server") } vs := &vstream{ vgtid: vgtid, @@ -215,7 +215,7 @@ func (vsm *vstreamManager) resolveParams(ctx context.Context, tabletType topodat flags = &vtgatepb.VStreamFlags{} } if vgtid == nil || len(vgtid.ShardGtids) == 0 { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "vgtid must have at least one value with a starting position") + return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "vgtid must have at least one value with a starting position in ShardGtids") } // To fetch from all keyspaces, the input must contain a single ShardGtid // that has an empty keyspace, and the Gtid must be "current". @@ -228,7 +228,7 @@ func (vsm *vstreamManager) resolveParams(ctx context.Context, tabletType topodat newvgtid := &binlogdatapb.VGtid{} keyspaces, err := vsm.toposerv.GetSrvKeyspaceNames(ctx, vsm.cell, false) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, vterrors.Wrapf(err, "failed to get keyspace names in cell %s", vsm.cell) } if isEmpty { @@ -244,7 +244,7 @@ func (vsm *vstreamManager) resolveParams(ctx context.Context, tabletType topodat } else { re, err := regexp.Compile(strings.Trim(inputKeyspace, "/")) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, vterrors.Wrapf(err, "failed to compile regexp using %s", inputKeyspace) } for _, keyspace := range keyspaces { if re.MatchString(keyspace) { @@ -262,12 +262,13 @@ func (vsm *vstreamManager) resolveParams(ctx context.Context, tabletType topodat for _, sgtid := range vgtid.ShardGtids { if sgtid.Shard == "" { if sgtid.Gtid != "current" && sgtid.Gtid != "" { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "if shards are unspecified, the Gtid value must be 'current' or empty; got: %v", vgtid) + return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "if shards are unspecified, the Gtid value must be 'current' or empty; got: %+v", + vgtid) } // TODO(sougou): this should work with the new Migrate workflow _, _, allShards, err := vsm.resolver.GetKeyspaceShards(ctx, sgtid.Keyspace, tabletType) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, vterrors.Wrapf(err, "failed to get shards in keyspace %s", sgtid.Keyspace) } for _, shard := range allShards { newvgtid.ShardGtids = append(newvgtid.ShardGtids, &binlogdatapb.ShardGtid{ @@ -329,9 +330,9 @@ func (vs *vstream) sendEvents(ctx context.Context) { send := func(evs []*binlogdatapb.VEvent) error { if err := vs.send(evs); err != nil { vs.once.Do(func() { - vs.setError(err) + vs.setError(err, "error sending events") }) - return err + return vterrors.Wrap(err, "error sending events") } return nil } @@ -339,13 +340,13 @@ func (vs *vstream) sendEvents(ctx context.Context) { select { case <-ctx.Done(): vs.once.Do(func() { - vs.setError(fmt.Errorf("context canceled")) + vs.setError(ctx.Err(), "context ended while sending events") }) return case evs := <-vs.eventCh: if err := send(evs); err != nil { vs.once.Do(func() { - vs.setError(err) + vs.setError(err, "error sending events") }) return } @@ -359,7 +360,7 @@ func (vs *vstream) sendEvents(ctx context.Context) { }} if err := send(evs); err != nil { vs.once.Do(func() { - vs.setError(err) + vs.setError(err, "error sending heartbeat") }) return } @@ -378,7 +379,7 @@ func (vs *vstream) startOneStream(ctx context.Context, sgtid *binlogdatapb.Shard if err != nil { log.Errorf("Error in vstream for %+v: %s", sgtid, err) vs.once.Do(func() { - vs.setError(err) + vs.setError(err, fmt.Sprintf("error starting stream from shard GTID %+v", sgtid)) vs.cancel() }) } @@ -464,10 +465,12 @@ func (vs *vstream) alignStreams(ctx context.Context, event *binlogdatapb.VEvent, } select { case <-ctx.Done(): - return ctx.Err() + return vterrors.Wrapf(ctx.Err(), "context ended while waiting for skew to reduce for stream %s from %s/%s", + streamID, keyspace, shard) case <-time.After(time.Duration(vs.skewTimeoutSeconds) * time.Second): log.Errorf("timed out while waiting for skew to reduce: %s", streamID) - return fmt.Errorf("timed out while waiting for skew to reduce: %s", streamID) + return vterrors.Errorf(vtrpcpb.Code_CANCELED, "timed out while waiting for skew to reduce for stream %s from %s/%s", + streamID, keyspace, shard) case <-vs.skewCh: // once skew is fixed the channel is closed and all waiting streams "wake up" } @@ -500,7 +503,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha for { select { case <-ctx.Done(): - return ctx.Err() + return vterrors.Wrapf(ctx.Err(), "context ended while streaming from %s/%s", sgtid.Keyspace, sgtid.Shard) case <-journalDone: // Unreachable. // This can happen if a server misbehaves and does not end @@ -544,8 +547,14 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha if err != nil { return tabletPickerErr(err) } - log.Infof("Picked a %s tablet for VStream in %s/%s within the %s cell(s)", - vs.tabletType.String(), sgtid.GetKeyspace(), sgtid.GetShard(), strings.Join(cells, ",")) + var tabletPortStr string + if tablet.PortMap["grpc"] != 0 { + tabletPortStr = fmt.Sprintf(":%d", tablet.PortMap["grpc"]) + } + tabletMsgDetails := fmt.Sprintf("%s (currently at %s%s)", + topoproto.TabletAliasString(tablet.Alias), tablet.Hostname, tabletPortStr) + log.Infof("Picked %s tablet %s for VStream in %s/%s within the %s cell(s)", + vs.tabletType.String(), tabletMsgDetails, sgtid.GetKeyspace(), sgtid.GetShard(), strings.Join(cells, ",")) target := &querypb.Target{ Keyspace: sgtid.Keyspace, @@ -556,7 +565,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha tabletConn, err := vs.vsm.resolver.GetGateway().QueryServiceByAlias(ctx, tablet.Alias, target) if err != nil { log.Errorf(err.Error()) - return err + return vterrors.Wrapf(err, "failed to get tablet connection to %s", tabletMsgDetails) } errCh := make(chan error, 1) @@ -565,9 +574,9 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha var err error switch { case ctx.Err() != nil: - err = fmt.Errorf("context has ended") + err = vterrors.Wrapf(ctx.Err(), "context ended while streaming tablet health from %s", tabletMsgDetails) case shr == nil || shr.RealtimeStats == nil || shr.Target == nil: - err = fmt.Errorf("health check failed on %s", topoproto.TabletAliasString(tablet.Alias)) + err = fmt.Errorf("health check failed on %s", tabletMsgDetails) case vs.tabletType != shr.Target.TabletType: err = fmt.Errorf("tablet %s type has changed from %s to %s, restarting vstream", topoproto.TabletAliasString(tablet.Alias), vs.tabletType, shr.Target.TabletType) @@ -580,6 +589,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha } if err != nil { log.Warningf("Tablet state changed: %s, attempting to restart", err) + err = vterrors.Wrapf(err, "error streaming tablet health from %s", tabletMsgDetails) errCh <- err return err } @@ -604,7 +614,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha Options: options, } var vstreamCreatedOnce sync.Once - log.Infof("Starting to vstream from %s, with req %+v", topoproto.TabletAliasString(tablet.Alias), req) + log.Infof("Starting to vstream from %s, with req %+v", tabletMsgDetails, req) err = tabletConn.VStream(ctx, req, func(events []*binlogdatapb.VEvent) error { // We received a valid event. Reset error count. errCount = 0 @@ -617,9 +627,11 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha select { case <-ctx.Done(): - return ctx.Err() + return vterrors.Wrapf(ctx.Err(), "context ended while streaming from tablet %s in %s/%s", + tabletMsgDetails, sgtid.Keyspace, sgtid.Shard) case streamErr := <-errCh: - return vterrors.New(vtrpcpb.Code_UNAVAILABLE, streamErr.Error()) + return vterrors.Wrapf(streamErr, "error streaming from tablet %s in %s/%s", + tabletMsgDetails, sgtid.Keyspace, sgtid.Shard) case <-journalDone: // Unreachable. // This can happen if a server misbehaves and does not end @@ -628,6 +640,9 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha default: } + aligningStreamsErr := fmt.Sprintf("error aligning streams across %s/%s", sgtid.Keyspace, sgtid.Shard) + sendingEventsErr := fmt.Sprintf("error sending event batch from tablet %s", tabletMsgDetails) + sendevents := make([]*binlogdatapb.VEvent, 0, len(events)) for i, event := range events { switch event.Type { @@ -648,11 +663,11 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha eventss = append(eventss, sendevents) if err := vs.alignStreams(ctx, event, sgtid.Keyspace, sgtid.Shard); err != nil { - return err + return vterrors.Wrap(err, aligningStreamsErr) } if err := vs.sendAll(ctx, sgtid, eventss); err != nil { - return err + return vterrors.Wrap(err, sendingEventsErr) } eventss = nil sendevents = nil @@ -664,11 +679,11 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha eventss = append(eventss, sendevents) if err := vs.alignStreams(ctx, event, sgtid.Keyspace, sgtid.Shard); err != nil { - return err + return vterrors.Wrap(err, aligningStreamsErr) } if err := vs.sendAll(ctx, sgtid, eventss); err != nil { - return err + return vterrors.Wrap(err, sendingEventsErr) } eventss = nil sendevents = nil @@ -677,7 +692,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha // Otherwise they can accumulate indefinitely if there are no real events. // TODO(sougou): figure out a model for this. if err := vs.alignStreams(ctx, event, sgtid.Keyspace, sgtid.Shard); err != nil { - return err + return vterrors.Wrap(err, aligningStreamsErr) } case binlogdatapb.VEventType_JOURNAL: journal := event.Journal @@ -698,14 +713,15 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha } eventss = append(eventss, sendevents) if err := vs.sendAll(ctx, sgtid, eventss); err != nil { - return err + return vterrors.Wrap(err, sendingEventsErr) } eventss = nil sendevents = nil } je, err := vs.getJournalEvent(ctx, sgtid, journal) if err != nil { - return err + return vterrors.Wrapf(err, "error getting journal event for shard GTID %+v on tablet %s", + sgtid, tabletMsgDetails) } if je != nil { var endTimer *time.Timer @@ -725,7 +741,8 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha journalDone = je.done select { case <-ctx.Done(): - return ctx.Err() + return vterrors.Wrapf(ctx.Err(), "context ended while waiting for journal event for shard GTID %+v on tablet %s", + sgtid, tabletMsgDetails) case <-journalDone: if endTimer != nil { <-endTimer.C @@ -752,13 +769,15 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha } if err == nil { // Unreachable. - err = vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "vstream ended unexpectedly") + err = vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "vstream ended unexpectedly on tablet %s in %s/%s", + tabletMsgDetails, sgtid.Keyspace, sgtid.Shard) } retry, ignoreTablet := vs.shouldRetry(err) if !retry { log.Errorf("vstream for %s/%s error: %v", sgtid.Keyspace, sgtid.Shard, err) - return err + return vterrors.Wrapf(err, "error in vstream for %s/%s on tablet %s", + sgtid.Keyspace, sgtid.Shard, tabletMsgDetails) } if ignoreTablet { ignoreTablets = append(ignoreTablets, tablet.GetAlias()) @@ -768,7 +787,8 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha // Retry, at most, 3 times if the error can be retried. if errCount >= 3 { log.Errorf("vstream for %s/%s had three consecutive failures: %v", sgtid.Keyspace, sgtid.Shard, err) - return err + return vterrors.Wrapf(err, "persistent error in vstream for %s/%s on tablet %s; giving up", + sgtid.Keyspace, sgtid.Shard, tabletMsgDetails) } log.Infof("vstream for %s/%s error, retrying: %v", sgtid.Keyspace, sgtid.Shard, err) } @@ -884,10 +904,10 @@ func (vs *vstream) getError() error { return vs.err } -func (vs *vstream) setError(err error) { +func (vs *vstream) setError(err error, msg string) { vs.errMu.Lock() defer vs.errMu.Unlock() - vs.err = err + vs.err = vterrors.Wrap(err, msg) } // getJournalEvent returns a journalEvent. The caller has to wait on its done channel. @@ -936,7 +956,8 @@ func (vs *vstream) getJournalEvent(ctx context.Context, sgtid *binlogdatapb.Shar mode = matchAll je.participants[inner] = false case matchNone: - return nil, fmt.Errorf("not all journaling participants are in the stream: journal: %v, stream: %v", journal.Participants, vs.vgtid.ShardGtids) + return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "not all journaling participants are in the stream: journal: %v, stream: %v", + journal.Participants, vs.vgtid.ShardGtids) } continue nextParticipant } @@ -945,7 +966,8 @@ func (vs *vstream) getJournalEvent(ctx context.Context, sgtid *binlogdatapb.Shar case undecided, matchNone: mode = matchNone case matchAll: - return nil, fmt.Errorf("not all journaling participants are in the stream: journal: %v, stream: %v", journal.Participants, vs.vgtid.ShardGtids) + return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "not all journaling participants are in the stream: journal: %v, stream: %v", + journal.Participants, vs.vgtid.ShardGtids) } } if mode == matchNone { @@ -1017,7 +1039,8 @@ func (vs *vstream) keyspaceHasBeenResharded(ctx context.Context, keyspace string for _, s := range ksShardGTIDs { shard := shards[s.GetShard()] if shard == nil { - return false, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "shard provided in VGTID, %s, not found in the %s keyspace", s.GetShard(), keyspace) + return false, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "shard provided in VGTID, %s, not found in the %s keyspace", + s.GetShard(), keyspace) } if !shard.GetIsPrimaryServing() { reshardPossible = true diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index fb4cb324047..10e9ffb1062 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -184,7 +184,7 @@ func (vs *vstreamer) Stream() error { if err != nil { vs.vse.errorCounts.Add("StreamRows", 1) vs.vse.vstreamersEndedWithErrors.Add(1) - return err + return vterrors.Wrapf(err, "failed to determine starting position") } vs.pos = pos return vs.replicate(ctx) @@ -289,7 +289,8 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog bufferedEvents = append(bufferedEvents, vevent) default: vs.vse.errorCounts.Add("BufferAndTransmit", 1) - return fmt.Errorf("unexpected event: %v", vevent) + return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "usupported event type %s found for event: %+v", + vevent.Type.String(), vevent) } return nil } @@ -369,11 +370,13 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog case ev, ok := <-throttledEvents: if !ok { select { + case err := <-errs: + return err case <-ctx.Done(): return nil default: } - return fmt.Errorf("unexpected server EOF") + return vterrors.Errorf(vtrpcpb.Code_ABORTED, "unexpected server EOF while parsing events") } vevents, err := vs.parseEvent(ev, bufferAndTransmit) if err != nil { @@ -386,16 +389,18 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog return nil } vs.vse.errorCounts.Add("BufferAndTransmit", 1) - return fmt.Errorf("error sending event: %v", err) + return vterrors.Wrapf(err, "error sending event: %+v", vevent) } } case vs.vschema = <-vs.vevents: select { + case err := <-errs: + return err case <-ctx.Done(): return nil default: if err := vs.rebuildPlans(); err != nil { - return err + return vterrors.Wrap(err, "failed to rebuild replication plans") } } case err := <-errs: @@ -409,7 +414,7 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog return nil } vs.vse.errorCounts.Add("Send", 1) - return fmt.Errorf("error sending event: %v", err) + return vterrors.Wrapf(err, "failed to send heartbeat event") } } } @@ -460,7 +465,7 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent, bufferAndTransmit func(vev case ev.IsGTID(): gtid, hasBegin, err := ev.GTID(vs.format) if err != nil { - return nil, fmt.Errorf("can't get GTID from binlog event: %v, event data: %#v", err, ev) + return nil, vterrors.Wrapf(err, "failed to get GTID from binlog event: %#v", ev) } if hasBegin { vevents = append(vevents, &binlogdatapb.VEvent{ @@ -478,7 +483,7 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent, bufferAndTransmit func(vev case ev.IsQuery(): q, err := ev.Query(vs.format) if err != nil { - return nil, fmt.Errorf("can't get query from binlog event: %v, event data: %#v", err, ev) + return nil, vterrors.Wrapf(err, "failed to get query from binlog event: %#v", ev) } // Insert/Delete/Update are supported only to be used in the context of external mysql streams where source databases // could be using SBR. Vitess itself will never run into cases where it needs to consume non rbr statements. @@ -579,7 +584,7 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent, bufferAndTransmit func(vev tm, err := ev.TableMap(vs.format) if err != nil { - return nil, err + return nil, vterrors.Wrapf(err, "failed to parse table map from binlog event: %#v", ev) } if plan, ok := vs.plans[id]; ok { // When the underlying mysql server restarts the table map can change. @@ -618,7 +623,7 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent, bufferAndTransmit func(vev vevent, err := vs.buildTablePlan(id, tm) if err != nil { vs.vse.errorCounts.Add("TablePlan", 1) - return nil, err + return nil, vterrors.Wrapf(err, "failed to build table replication plan for table %s", tm.Name) } if vevent != nil { vevents = append(vevents, vevent) @@ -907,7 +912,7 @@ func getExtColInfos(ctx context.Context, cp dbconfigs.Connector, se *schema.Engi extColInfos := make(map[string]*extColInfo) conn, err := cp.Connect(ctx) if err != nil { - return nil, err + return nil, vterrors.Wrapf(err, "failed to connect to database %s", database) } defer conn.Close() queryTemplate := "select column_name, column_type, collation_name from information_schema.columns where table_schema=%s and table_name=%s;" @@ -975,7 +980,7 @@ nextrow: for _, row := range rows.Rows { afterOK, afterValues, _, err := vs.extractRowAndFilter(plan, row.Data, rows.DataColumns, row.NullColumns, row.JSONPartialValues) if err != nil { - return nil, err + return nil, vterrors.Wrap(err, "failed to extract journal from binlog event and apply filters") } if !afterOK { // This can happen if someone manually deleted rows. @@ -997,7 +1002,7 @@ nextrow: return nil, err } if err := prototext.Unmarshal(avBytes, journal); err != nil { - return nil, err + return nil, vterrors.Wrap(err, "failed to unmarshal journal event") } vevents = append(vevents, &binlogdatapb.VEvent{ Type: binlogdatapb.VEventType_JOURNAL, @@ -1014,13 +1019,13 @@ func (vs *vstreamer) processRowEvent(vevents []*binlogdatapb.VEvent, plan *strea // The BEFORE image does not have partial JSON values so we pass an empty bitmap. beforeOK, beforeValues, _, err := vs.extractRowAndFilter(plan, row.Identify, rows.IdentifyColumns, row.NullIdentifyColumns, mysql.Bitmap{}) if err != nil { - return nil, err + return nil, vterrors.Wrap(err, "failed to extract row's before values from binlog event and apply filters") } // The AFTER image is where we may have partial JSON values, as reflected in the // row's JSONPartialValues bitmap. afterOK, afterValues, partial, err := vs.extractRowAndFilter(plan, row.Data, rows.DataColumns, row.NullColumns, row.JSONPartialValues) if err != nil { - return nil, err + return nil, vterrors.Wrap(err, "failed to extract row's after values from binlog event and apply filters") } if !beforeOK && !afterOK { continue @@ -1125,7 +1130,8 @@ func (vs *vstreamer) extractRowAndFilter(plan *streamerPlan, data []byte, dataCo if err != nil { log.Errorf("extractRowAndFilter: %s, table: %s, colNum: %d, fields: %+v, current values: %+v", err, plan.Table.Name, colNum, plan.Table.Fields, values) - return false, nil, false, err + return false, nil, false, vterrors.Wrapf(err, "failed to extract row's value for column %s from binlog event", + plan.Table.Fields[colNum].Name) } pos += l @@ -1201,7 +1207,7 @@ func buildEnumStringValue(env *vtenv.Environment, plan *streamerPlan, colNum int // table map event to initialize it. if plan.EnumSetValuesMap == nil { if err := addEnumAndSetMappingstoPlan(env, plan.Plan, plan.Table.Fields, plan.TableMap.Metadata); err != nil { - return sqltypes.Value{}, err + return sqltypes.Value{}, vterrors.Wrap(err, "failed to build ENUM column integer to string mappings") } } // ENUM columns are stored as an unsigned 16-bit integer as they can contain a maximum @@ -1209,7 +1215,7 @@ func buildEnumStringValue(env *vtenv.Environment, plan *streamerPlan, colNum int // reserved for any integer value that has no string mapping. iv, err := value.ToUint16() if err != nil { - return sqltypes.Value{}, fmt.Errorf("no valid integer value found for column %s in table %s, bytes: %b", + return sqltypes.Value{}, vterrors.Wrapf(err, "no valid integer value found for column %s in table %s, bytes: %b", plan.Table.Fields[colNum].Name, plan.Table.Name, iv) } var strVal string @@ -1236,7 +1242,7 @@ func buildSetStringValue(env *vtenv.Environment, plan *streamerPlan, colNum int, // table map event to initialize it. if plan.EnumSetValuesMap == nil { if err := addEnumAndSetMappingstoPlan(env, plan.Plan, plan.Table.Fields, plan.TableMap.Metadata); err != nil { - return sqltypes.Value{}, err + return sqltypes.Value{}, vterrors.Wrap(err, "failed to build SET column integer to string mappings") } } // A SET column can have 64 unique values: https://dev.mysql.com/doc/refman/en/set.html @@ -1245,7 +1251,7 @@ func buildSetStringValue(env *vtenv.Environment, plan *streamerPlan, colNum int, val := bytes.Buffer{} iv, err := value.ToUint64() if err != nil { - return value, fmt.Errorf("no valid integer value found for column %s in table %s, bytes: %b", + return value, vterrors.Wrapf(err, "no valid integer value found for column %s in table %s, bytes: %b", plan.Table.Fields[colNum].Name, plan.Table.Name, iv) } idx := 1