Skip to content

Commit

Permalink
Send Accept header on raw streaming request to handle initial error
Browse files Browse the repository at this point in the history
Signed-off-by: Sunghoon Kang <[email protected]>
  • Loading branch information
Sunghoon Kang committed Nov 13, 2023
1 parent cfb347a commit 4a730c8
Showing 1 changed file with 33 additions and 33 deletions.
66 changes: 33 additions & 33 deletions pkg/grpc/gateway/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func DoRequest[T any](ctx context.Context, req *resty.Request) (*T, error) {
func DoStreamingRequest[T any](ctx context.Context, c Client, req *resty.Request) (<-chan *T, <-chan error, error) {
var resBody T
if _, ok := any(&resBody).(*httpbody.HttpBody); ok {
resCh, errCh, err := doHTTPStreamingRequest(ctx, req)
resCh, errCh, err := doHTTPStreamingRequest(ctx, c, req)
if err != nil {
return nil, nil, err
}
Expand All @@ -79,29 +79,7 @@ func DoStreamingRequest[T any](ctx context.Context, c Client, req *resty.Request
return nil, nil, fmt.Errorf("send request: %w", err)
}
if rawRes.IsError() {
body := rawRes.RawBody()
defer func() { _ = body.Close() }()
data, err := io.ReadAll(body)
if err != nil {
return nil, nil, fmt.Errorf("read error response body: %w", err)
}

var res streamingResponse
if err := json.Unmarshal(data, &res); err != nil {
return nil, nil, fmt.Errorf("unmarshal raw response: %w", err)
}
rawErrRes, ok := res[streamingResponseErrorKey]
if !ok {
return nil, nil, errors.New(string(data))
}
var errResp rpcstatus.Status
if err := c.Unmarshal(rawErrRes, &errResp); err != nil {
return nil, nil, fmt.Errorf("unmarshal error response: %w", err)
}
if err := status.ErrorProto(&errResp); err != nil {
return nil, nil, err
}
return nil, nil, status.Error(HTTPStatusToCode(rawRes.StatusCode()), rawRes.String())
return nil, nil, wrapStreamingResponseError(c, rawRes)
}

resCh := make(chan *T)
Expand Down Expand Up @@ -166,22 +144,18 @@ func doHTTPRequest(ctx context.Context, req *resty.Request) (any, error) {
}, nil
}

func doHTTPStreamingRequest(ctx context.Context, req *resty.Request) (any, <-chan error, error) {
func doHTTPStreamingRequest(ctx context.Context, c Client, req *resty.Request) (any, <-chan error, error) {
res, err := req.SetContext(ctx).
SetHeader("Accept", "text/event-stream").
SetHeader("Cache-Control", "no-cache").
SetHeader("Connection", "keep-alive").
SetDoNotParseResponse(true).
Send()
if err != nil {
return nil, nil, fmt.Errorf("send request: %w", err)
}
if res.IsError() {
errResp, ok := res.Error().(*rpcstatus.Status)
if !ok {
return nil, nil, fmt.Errorf("cast error response: %s", res.String())
}
if err := status.ErrorProto(errResp); err != nil {
return nil, nil, err
}
return nil, nil, status.Error(HTTPStatusToCode(res.StatusCode()), errResp.String())
return nil, nil, wrapStreamingResponseError(c, res)
}

resCh := make(chan *httpbody.HttpBody)
Expand Down Expand Up @@ -219,3 +193,29 @@ func doHTTPStreamingRequest(ctx context.Context, req *resty.Request) (any, <-cha
}()
return resCh, errCh, nil
}

func wrapStreamingResponseError(c Client, resp *resty.Response) error {
body := resp.RawBody()
defer func() { _ = body.Close() }()
data, err := io.ReadAll(body)
if err != nil {
return fmt.Errorf("read error response body: %w", err)
}

var streamingResp streamingResponse
if err := json.Unmarshal(data, &streamingResp); err != nil {
return fmt.Errorf("unmarshal raw response: %w", err)
}
rawErrRes, ok := streamingResp[streamingResponseErrorKey]
if !ok {
return errors.New(string(data))
}
var errResp rpcstatus.Status
if err := c.Unmarshal(rawErrRes, &errResp); err != nil {
return fmt.Errorf("unmarshal error response: %w", err)
}
if err := status.ErrorProto(&errResp); err != nil {
return err
}
return status.Error(HTTPStatusToCode(resp.StatusCode()), resp.String())
}

0 comments on commit 4a730c8

Please sign in to comment.