Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timeout to SSE #8565

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions okhttp-sse/src/main/kotlin/okhttp3/sse/EventSourceListener.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ package okhttp3.sse
import okhttp3.Response

abstract class EventSourceListener {
/**
* Seconds elapsed between 2 events until connection failed. Doesn't timeout if null
*/
open var timeout: Long? = null
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From Call, I think there is a convention on timeout names. Such as readTimeoutMillis. So perhaps receiveTimeoutMillis?

I wonder if we should prefer Kotlin Duration here also?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Internally we use callTimeout in RealEventSource to track establishment, and it's obviously an existing top level think in OkHttp which extends to the end of a request.

read feel to I/O socket focused.


/**
* Invoked when an event source has been accepted by the remote peer and may begin transmitting
* events.
Expand All @@ -29,7 +34,12 @@ abstract class EventSourceListener {
}

/**
* TODO description.
* Invoked when an event is received.
*
* @param eventSource Source of the event
* @param id SSE event's id
* @param type SSE event's type
* @param data SSE event's data
*/
open fun onEvent(
eventSource: EventSource,
Expand All @@ -40,7 +50,7 @@ abstract class EventSourceListener {
}

/**
* TODO description.
* Invoked when the connection has been properly closed by the server.
*
* No further calls to this listener will be made.
*/
Expand Down
27 changes: 25 additions & 2 deletions okhttp-sse/src/main/kotlin/okhttp3/sse/internal/RealEventSource.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package okhttp3.sse.internal

import java.io.IOException
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds
import okhttp3.Call
import okhttp3.Callback
import okhttp3.Request
Expand All @@ -24,6 +26,8 @@ import okhttp3.ResponseBody
import okhttp3.internal.stripBody
import okhttp3.sse.EventSource
import okhttp3.sse.EventSourceListener
import okio.AsyncTimeout
import okio.Timeout.Companion.timeout

internal class RealEventSource(
private val request: Request,
Expand All @@ -40,6 +44,18 @@ internal class RealEventSource(
}
}

private fun updateTimeout(call: Call?, duration: Duration) {
if (call?.timeout() is AsyncTimeout) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit nervous about reusing call.timeout() here. I guess two reasons

  1. it isn't clear we are the owner of it, perhaps some other component or an interceptor is using it.
  2. the casting seems unsafe, but if we create it, we know the type and can set the duration ourselves.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you suggest then ?

For the first point: currently (without the PR), we cancel this exact timeout (L69), so the point would be valid for the current state.
For the 2nd, the check prevent unsafe casting

(call.timeout() as AsyncTimeout).apply {
if (this.timeoutNanos() > 0L) {
exit()
}
timeout(duration)
enter()
}
}
}

override fun onResponse(
call: Call,
response: Response,
Expand All @@ -65,8 +81,11 @@ internal class RealEventSource(
return
}

// This is a long-lived response. Cancel full-call timeouts.
call?.timeout()?.cancel()
// This is a long-lived response. Cancel full-call timeouts if no timeout has been set
listener.timeout?.let {
// We spend at most timeout seconds if set
updateTimeout(call, it.seconds)
} ?: call?.timeout()?.cancel()

// Replace the body with a stripped one so the callbacks can't see real data.
val response = response.stripBody()
Expand All @@ -76,6 +95,10 @@ internal class RealEventSource(
if (!canceled) {
listener.onOpen(this, response)
while (!canceled && reader.processNextEvent()) {
listener.timeout?.let {
// We spend at most timeout seconds if set
updateTimeout(call, it.seconds)
}
}
}
} catch (e: Exception) {
Expand Down