Skip to content

Commit

Permalink
Updated error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
dpattmann committed Nov 18, 2021
1 parent fd7dda2 commit 971e222
Showing 1 changed file with 6 additions and 14 deletions.
20 changes: 6 additions & 14 deletions timestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func newTimeStreamAdapter(logger *zap.SugaredLogger, cfg *config, writeSvc times
}

// So client makes HTTP/2 requests
http2.ConfigureTransport(tr)
_ = http2.ConfigureTransport(tr)

if writeSvc == nil || readSvc == nil {
sess := session.Must(session.NewSession(
Expand Down Expand Up @@ -228,13 +228,12 @@ func (t TimeStreamAdapter) runReadRequestQuery(q *prompb.Query) (result prompb.Q
}

var timeSeries []*prompb.TimeSeries
var innerErr error
err = t.QueryPages(
&timestreamquery.QueryInput{
QueryString: &task.query,
},
func(output *timestreamquery.QueryOutput, lastPage bool) bool {
timeSeries, innerErr = t.handleQueryResult(output, timeSeries, task.measureName)
timeSeries = t.handleQueryResult(output, timeSeries, task.measureName)
return !lastPage
},
)
Expand All @@ -243,14 +242,6 @@ func (t TimeStreamAdapter) runReadRequestQuery(q *prompb.Query) (result prompb.Q
return
}

if innerErr != nil {
return result, innerErr
}

if err != nil {
return
}

result = prompb.QueryResult{
Timeseries: timeSeries,
}
Expand Down Expand Up @@ -350,8 +341,7 @@ func (t TimeStreamAdapter) readLabels(labels []*prompb.Label) (task writeTask) {
return
}

func (t TimeStreamAdapter) handleQueryResult(qo *timestreamquery.QueryOutput, timeSeries []*prompb.TimeSeries, measureName string) ([]*prompb.TimeSeries, error) {
var err error
func (t TimeStreamAdapter) handleQueryResult(qo *timestreamquery.QueryOutput, timeSeries []*prompb.TimeSeries, measureName string) []*prompb.TimeSeries {
for _, row := range qo.Rows {
var ts prompb.TimeSeries

Expand All @@ -371,11 +361,13 @@ func (t TimeStreamAdapter) handleQueryResult(qo *timestreamquery.QueryOutput, ti
for _, p := range d.TimeSeriesValue {
value, err := strconv.ParseFloat(*p.Value.ScalarValue, 64)
if err != nil {
t.logger.Warnw("Can't convert scalar value")
continue
}

s, err := time.Parse("2006-01-02 15:04:05.999999999", *p.Time)
if err != nil {
t.logger.Warnw("Can't parse time")
continue
}

Expand All @@ -391,5 +383,5 @@ func (t TimeStreamAdapter) handleQueryResult(qo *timestreamquery.QueryOutput, ti
timeSeries = append(timeSeries, &ts)
}

return timeSeries, err
return timeSeries
}

0 comments on commit 971e222

Please sign in to comment.