Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handling browser close while streaming response #45249

Open
JarekW opened this issue Dec 22, 2024 · 2 comments
Open

Handling browser close while streaming response #45249

JarekW opened this issue Dec 22, 2024 · 2 comments
Labels
area/mutiny area/rest kind/question Further information is requested

Comments

@JarekW
Copy link

JarekW commented Dec 22, 2024

Describe the bug

Hi I have a problem with quarkus exceptions after what i think is browser closing data stream. Code below is a proxy, based on documentation on quarkus s3 client - downloading asynchronously . It's streaming video file from s3 to browser. There is also example with local filesystem. Streaming itself is working fine. What is the problem is exceptions in logs about closed channel/pipe.

This is what I think is happening:
When user seek video to other position then browser is closing previous stream and opening another with different range header. If browser close stream there will be cases with some bytes that was received from S3 but not yet send to client. When items in multi are delayed error rate is reduced, at 20 millis they almost don't occur anymore.

Streaming method:

//this comes from aws-sdk
CompletableFuture<CompletedDownload<ResponsePublisher<GetObjectResponse>>> downloadComplFuture;

return RestMulti.fromUniResponse(  
	Uni.createFrom().completionStage(downloadComplFuture),  
	(response) -> {  
		ResponsePublisher<GetObjectResponse> publisher1 = response.result();  

		return Multi.createFrom().safePublisher(AdaptersToFlow.publisher(publisher1))  
			.map(BufferUtil::toBuffer)

//delaying items reduces exception rate
/*
			.onItem().call(buffer -> {  
				return Uni.createFrom().nullItem().onItem().delayIt().by(Duration.ofMillis(20));  
			})  
*/
			;  
	},  
	(response) -> {  
		var headers = new HashMap<>(Map.of(  
			"Accept-Ranges", List.of("bytes"),  
			"Content-Length", List.of(response.result().response().contentLength().toString()),  
			"Content-Type", List.of(response.result().response().contentType())  
		));  

		String contentRange = response.result().response().contentRange();  
		if (contentRange != null) {  
			headers.put("Content-Range", List.of(contentRange));  
		}  

		return headers;  
	},  
	(response) -> {  
		String contentRange = response.result().response().contentRange();  
		if (contentRange != null) {  
			return Response.Status.PARTIAL_CONTENT.getStatusCode();  
		}  
		return Response.Status.OK.getStatusCode();  
	}  
);
}

When serving file from local filesystem the first and second exception also accurs.

return RestMulti.fromUniResponse(  
	vertx.fileSystem().open(filePath, new OpenOptions()),  
	(response) -> {  
		response.setReadPos(rangeFrom);  
		return response.toMulti().map(buffer -> {  
			return Buffer.buffer(buffer.getBytes());  
		})
//delaying items reduces exception rate
/*
			.onItem().call(buffer -> {  
				return Uni.createFrom().nullItem().onItem().delayIt().by(Duration.ofMillis(20));  
			})  
*/
                ;  
	},  
	(response) -> {  
		Long size = response.sizeBlocking();  
		Map<String, List<String>> headers = new HashMap<>(Map.of(  
			"Accept-Ranges", List.of("bytes"),  
			"Content-Length", List.of(String.valueOf(size)),  
			"Content-Type", List.of("video/mp4")  
		));  
		if (range != null) {  
			headers.put("Content-Range", List.of(String.format("bytes %s-%s/%s", rangeFrom, size - 1, size)));  
		}  

		return headers;  
	},  
	(response) -> {  
		if (range != null) {  
			return Response.Status.PARTIAL_CONTENT.getStatusCode();  
		}  
		return Response.Status.OK.getStatusCode();  
	}  
);

Exceptions:

ERROR [org.jbo.res.rea.ser.han.PublisherResponseHandler] (vert.x-eventloop-thread-3) Exception in SSE server handling, impossible to send it to client: io.netty.channel.StacklessClosedChannelException
	at io.netty.channel.AbstractChannel$AbstractUnsafe.write(Object, ChannelPromise)(Unknown Source)
ERROR [org.jbo.res.rea.ser.han.PublisherResponseHandler] (vert.x-eventloop-thread-2) Exception in SSE server handling, impossible to send it to client: io.netty.channel.StacklessClosedChannelException
	at io.netty.channel.AbstractChannel.close(ChannelPromise)(Unknown Source)
ERROR [org.jbo.res.rea.ser.han.PublisherResponseHandler] (vert.x-eventloop-thread-2) Exception in SSE server handling, impossible to send it to client: java.io.IOException: Broken pipe
	at java.base/sun.nio.ch.FileDispatcherImpl.writev0(Native Method)
	at java.base/sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:66)
	at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:217)
	at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:153)
	at java.base/sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:563)
	at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:429)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:929)
	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.forceFlush(AbstractNioChannel.java:366)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:782)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:833)

Also error staters that this is for SSE, which is not. It's because handling of that error is in parent AbstractMultiSubscriber

It might be related to #26253 this comment, I don't have knowledge to tell if the multi is terminated or not.

Expected behavior

handling browser close without exceptions

Actual behavior

exception on browser stream close

How to Reproduce?

  1. stream video file to browser
  2. seek video position

Output of uname -a or ver

Darwin wa 23.6.0 Darwin Kernel Version 23.6.0: Mon Jul 29 21:13:04 PDT 2024; root:xnu-10063.141.2~1/RELEASE_ARM64_T6020 arm64

Output of java -version

java version "17.0.7" 2023-04-18 LTS GraalVM

Quarkus version or git rev

3.17.5

Build tool (ie. output of mvnw --version or gradlew --version)

Apache Maven 3.9.8

Additional information

No response

@JarekW JarekW added the kind/bug Something isn't working label Dec 22, 2024
@geoand geoand added kind/question Further information is requested area/rest area/mutiny and removed triage/needs-triage kind/bug Something isn't working labels Dec 23, 2024
@cescoffier
Copy link
Member

Can you provide a standalone reproducer?

it should cancel the stream (now, whether the stream source handles cancellation is a different question)

@JarekW
Copy link
Author

JarekW commented Jan 9, 2025

I've made a repository with above code: quarkus-45249
There need to be mp4 file in resources folder, in that repo example I used this one
If you seek video in browser to different position there should be exceptions in log

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/mutiny area/rest kind/question Further information is requested
Projects
None yet
Development

No branches or pull requests

3 participants