Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Interceptor for retrying requests #1997

Open
o15a3d4l11s2 opened this issue Jul 27, 2024 · 1 comment
Open

Interceptor for retrying requests #1997

o15a3d4l11s2 opened this issue Jul 27, 2024 · 1 comment
Labels

Comments

@o15a3d4l11s2
Copy link

o15a3d4l11s2 commented Jul 27, 2024

What are you trying to achieve?

The server sometimes returns errors or throttles the requests, so I want to be able to retry requests.

What have you tried so far?

Update:
I also looked into

class NotReallyAuthClientInterceptor<Request: Message, Response: Message>:
ClientInterceptor<Request, Response>, @unchecked Sendable
{
private let client: Helloworld_GreeterNIOClient
private enum State {
// We're trying the call, these are the parts we've sent so far.
case trying([GRPCClientRequestPart<Request>])
// We're retrying using this call.
case retrying(Call<Request, Response>)
}
private var state: State = .trying([])
init(client: Helloworld_GreeterNIOClient) {
self.client = client
}
override func cancel(
promise: EventLoopPromise<Void>?,
context: ClientInterceptorContext<Request, Response>
) {
switch self.state {
case .trying:
context.cancel(promise: promise)
case let .retrying(call):
call.cancel(promise: promise)
context.cancel(promise: nil)
}
}
override func send(
_ part: GRPCClientRequestPart<Request>,
promise: EventLoopPromise<Void>?,
context: ClientInterceptorContext<Request, Response>
) {
switch self.state {
case var .trying(parts):
// Record the part, incase we need to retry.
parts.append(part)
self.state = .trying(parts)
// Forward the request part.
context.send(part, promise: promise)
case let .retrying(call):
// We're retrying, send the part to the retry call.
call.send(part, promise: promise)
}
}
override func receive(
_ part: GRPCClientResponsePart<Response>,
context: ClientInterceptorContext<Request, Response>
) {
switch self.state {
case var .trying(parts):
switch part {
// If 'authentication' fails this is the only part we expect, we can forward everything else.
case let .end(status, trailers) where status.code == .unauthenticated:
// We only know how to deal with magic.
guard trailers.first(name: "www-authenticate") == "Magic" else {
// We can't handle this, fail.
context.receive(part)
return
}
// We know how to handle this: make a new call.
let call: Call<Request, Response> = self.client.channel.makeCall(
path: context.path,
type: context.type,
callOptions: context.options,
// We could grab interceptors from the client, but we don't need to.
interceptors: []
)
// We're retying the call now.
self.state = .retrying(call)
// Invoke the call and redirect responses here.
call.invoke(onError: context.errorCaught(_:), onResponsePart: context.receive(_:))
// Parts must contain the metadata as the first item if we got that first response.
if case var .some(.metadata(metadata)) = parts.first {
metadata.replaceOrAdd(name: "authorization", value: "Magic")
parts[0] = .metadata(metadata)
}
// Now replay any requests on the retry call.
for part in parts {
call.send(part, promise: nil)
}
default:
context.receive(part)
}
case .retrying:
// Ignore anything we receive on the original call.
()
}
}
}
but once it retried, it did not get any response out. Also modified it to accommodate for doing N retries with a X seconds delay in between, but got multiple error while using.

Original:
I have implemented what I think should be a sufficient interceptor for retrying, but now I start thinking if this is even possible from inside the interceptor itself.

Here is the sample code. The basic idea is that on sending the parts I store them for a possible retrying and if I receive response that is not OK or get a transport error, I just re-send the parts. But the problem is that after I re-send the parts, nothing happens - I get no more interceptor events at all and there is nothing happening at all based on the logs.

import Foundation
import GRPC
import NIO

class RetryingInterceptor<Request, Response>: ClientInterceptor<Request, Response> {
    private let delay: Int64
    private let maxRetries: Int

    private var remainingRetries: Int
    private var initialRequestParts: [GRPCClientRequestPart<Request>] = []

    init(maxRetries: Int, delay: Int64) {
        self.maxRetries = maxRetries
        self.delay = delay
        self.remainingRetries = maxRetries
    }

    override func send(_ part: GRPCClientRequestPart<Request>, promise: EventLoopPromise<Void>?, context: ClientInterceptorContext<Request, Response>) {
        initialRequestParts.append(part)

        context.send(part, promise: promise)
    }

    override func receive(_ part: GRPCClientResponsePart<Response>, context: ClientInterceptorContext<Request, Response>) {
        switch part {
        case .end(let status, _) where !status.isOk && remainingRetries > 0:
            NSLog("TEST Response error, retrying...")
            remainingRetries -= 1
            context.eventLoop.scheduleTask(in: .seconds(delay)) {
                self.retry(context: context)
            }
        default:
            context.receive(part)
        }
    }

    private func retry(context: ClientInterceptorContext<Request, Response>) {
        for part in initialRequestParts {
            context.send(part, promise: nil)
        }
    }

    override func errorCaught(_ error: any Error, context: ClientInterceptorContext<Request, Response>) {
        if remainingRetries > 0 {
            NSLog("TEST Transport error, retrying...")
            remainingRetries -= 1
            context.eventLoop.scheduleTask(in: .seconds(Int64(delay))) {
                self.retry(context: context)
            }
        } else {
            NSLog("TEST Transport error, no more retries.")
            context.errorCaught(error)
        }
    }
}
@glbrntt
Copy link
Collaborator

glbrntt commented Jul 29, 2024

Note that in the interceptor you linked from the tests that when the call is retried it creates a new call into which the buffered messages are sent:

let call: Call<Request, Response> = self.client.channel.makeCall(
path: context.path,
type: context.type,
callOptions: context.options,
// We could grab interceptors from the client, but we don't need to.
interceptors: []
)
// We're retying the call now.
self.state = .retrying(call)
// Invoke the call and redirect responses here.
call.invoke(onError: context.errorCaught(_:), onResponsePart: context.receive(_:))
// Parts must contain the metadata as the first item if we got that first response.
if case var .some(.metadata(metadata)) = parts.first {
metadata.replaceOrAdd(name: "authorization", value: "Magic")
parts[0] = .metadata(metadata)
}
// Now replay any requests on the retry call.
for part in parts {
call.send(part, promise: nil)
}

In your code you're sending parts on the initial RPC which will be dropped if the RPC has already completed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants