Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[QUERY] How to handle/recover Close Seesion exception for spring-cloud-azure-stream-binder-eventhub SDK #43838

Open
2 tasks done
yunbozhang-msft opened this issue Jan 20, 2025 · 0 comments
Labels
needs-triage Workflow: This is a new issue that needs to be triaged to the appropriate team.

Comments

@yunbozhang-msft
Copy link
Member

yunbozhang-msft commented Jan 20, 2025

Query/Question
Hi team, we are using spring-cloud-azure-stream-binder-eventhub to send messages to Azure Eventhub, but recently we encountered an exception: Cannot create send link from a closed session.

Image

Click to see my original error stack

2025-01-14 16:08:10.051 ERROR 9 --- [oundedElastic-1] reactor.core.publisher.Operators - [ error, 324] :
Operator called default onErrorDropped reactor.core.Exceptions$ErrorCallbackNotImplemented: com.azure.core.amqp.exception.AmqpException: Cannot create send link from a closed session., errorContext[NAMESPACE: xxxxxxx.servicebus.chinacloudapi.cn. ERROR CONTEXT: N/A, PATH: xxxxxxxxxxx]
Caused by: com.azure.core.amqp.exception.AmqpException: Cannot create send link from a closed session., errorContext[NAMESPACE: xxxxxxx.servicebus.chinacloudapi.cn. ERROR CONTEXT: N/A, PATH: xxxxxxxxxxxxxxxxx]
at com.azure.core.amqp.implementation.ReactorSession.createProducer(ReactorSession.java:467)
at com.azure.core.amqp.implementation.ReactorSession.createProducer(ReactorSession.java:449)
at com.azure.messaging.eventhubs.implementation.EventHubReactorSession.createProducer(EventHubReactorSession.java:84)
at com.azure.messaging.eventhubs.implementation.EventHubReactorAmqpConnection.lambda$createSendLink$0(EventHubReactorAmqpConnection.java:117)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:125)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.lambda$onNext$1(TracingSubscriber.java:62)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.onNext(TracingSubscriber.java:62)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.lambda$onNext$1(TracingSubscriber.java:62)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.onNext(TracingSubscriber.java:62)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1839)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.lambda$onNext$1(TracingSubscriber.java:62)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.onNext(TracingSubscriber.java:62)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:292)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2400)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onSubscribe(MonoIgnoreThen.java:134)
at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:240)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.onComplete(TracingSubscriber.java:72)
at reactor.core.publisher.SerializedSubscriber.onComplete(SerializedSubscriber.java:146)
at reactor.core.publisher.SerializedSubscriber.onComplete(SerializedSubscriber.java:146)
at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.onComplete(FluxTimeout.java:234)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.onComplete(TracingSubscriber.java:72)
at reactor.core.publisher.MonoNext$NextSubscriber.onComplete(MonoNext.java:102)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.onComplete(TracingSubscriber.java:72)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onComplete(FluxFilterFuseable.java:171)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.onComplete(TracingSubscriber.java:72)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onComplete(FluxHide.java:147)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.onComplete(TracingSubscriber.java:72)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onComplete(FluxHide.java:147)
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:868)
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:965)
at reactor.core.publisher.FluxReplay$ReplayInner.request(FluxReplay.java:1711)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.request(FluxHide.java:152)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.request(FluxHide.java:152)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.request(FluxFilterFuseable.java:191)
at reactor.core.publisher.MonoNext$NextSubscriber.request(MonoNext.java:108)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2160)
at reactor.core.publisher.SerializedSubscriber.request(SerializedSubscriber.java:151)
at reactor.core.publisher.SerializedSubscriber.request(SerializedSubscriber.java:151)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onSubscribe(MonoIgnoreThen.java:134)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.lambda$onSubscribe$0(TracingSubscriber.java:57)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.onSubscribe(TracingSubscriber.java:57)
at reactor.core.publisher.SerializedSubscriber.onSubscribe(SerializedSubscriber.java:65)
at reactor.core.publisher.SerializedSubscriber.onSubscribe(SerializedSubscriber.java:65)
at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.onSubscribe(FluxTimeout.java:154)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.lambda$onSubscribe$0(TracingSubscriber.java:57)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.onSubscribe(TracingSubscriber.java:57)
at reactor.core.publisher.MonoNext$NextSubscriber.onSubscribe(MonoNext.java:70)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.lambda$onSubscribe$0(TracingSubscriber.java:57)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.onSubscribe(TracingSubscriber.java:57)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onSubscribe(FluxFilterFuseable.java:87)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.lambda$onSubscribe$0(TracingSubscriber.java:57)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.onSubscribe(TracingSubscriber.java:57)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onSubscribe(FluxHide.java:122)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.lambda$onSubscribe$0(TracingSubscriber.java:57)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.onSubscribe(TracingSubscriber.java:57)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onSubscribe(FluxHide.java:122)
at reactor.core.publisher.FluxReplay.subscribeOrReturn(FluxReplay.java:1181)
at reactor.core.publisher.InternalConnectableFluxOperator.subscribe(InternalConnectableFluxOperator.java:55)
at reactor.core.publisher.FluxAutoConnectFuseable.subscribe(FluxAutoConnectFuseable.java:61)
at reactor.core.publisher.Mono.subscribe(Mono.java:4490)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:263)
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.lambda$onNext$1(TracingSubscriber.java:62)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.onNext(TracingSubscriber.java:62)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.lambda$onNext$1(TracingSubscriber.java:62)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.onNext(TracingSubscriber.java:62)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.lambda$onNext$1(TracingSubscriber.java:62)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.onNext(TracingSubscriber.java:62)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1839)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.lambda$onNext$1(TracingSubscriber.java:62)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.onNext(TracingSubscriber.java:62)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:292)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2400)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onSubscribe(MonoIgnoreThen.java:134)
at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:240)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.onComplete(TracingSubscriber.java:72)
at reactor.core.publisher.SerializedSubscriber.onComplete(SerializedSubscriber.java:146)
at reactor.core.publisher.SerializedSubscriber.onComplete(SerializedSubscriber.java:146)
at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.onComplete(FluxTimeout.java:234)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.onComplete(TracingSubscriber.java:72)
at reactor.core.publisher.MonoNext$NextSubscriber.onComplete(MonoNext.java:102)
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:83)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.lambda$onNext$1(TracingSubscriber.java:62)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.onNext(TracingSubscriber.java:62)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onNext(FluxFilterFuseable.java:118)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.lambda$onNext$1(TracingSubscriber.java:62)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.onNext(TracingSubscriber.java:62)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.lambda$onNext$1(TracingSubscriber.java:62)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.onNext(TracingSubscriber.java:62)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:877)
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:965)
at reactor.core.publisher.FluxReplay$ReplayInner.request(FluxReplay.java:1711)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.request(FluxHide.java:152)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.request(FluxHide.java:152)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.request(FluxFilterFuseable.java:191)
at reactor.core.publisher.MonoNext$NextSubscriber.request(MonoNext.java:108)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2160)
at reactor.core.publisher.SerializedSubscriber.request(SerializedSubscriber.java:151)
at reactor.core.publisher.SerializedSubscriber.request(SerializedSubscriber.java:151)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onSubscribe(MonoIgnoreThen.java:134)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.lambda$onSubscribe$0(TracingSubscriber.java:57)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.onSubscribe(TracingSubscriber.java:57)
at reactor.core.publisher.SerializedSubscriber.onSubscribe(SerializedSubscriber.java:65)
at reactor.core.publisher.SerializedSubscriber.onSubscribe(SerializedSubscriber.java:65)
at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.onSubscribe(FluxTimeout.java:154)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.lambda$onSubscribe$0(TracingSubscriber.java:57)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.onSubscribe(TracingSubscriber.java:57)
at reactor.core.publisher.MonoNext$NextSubscriber.onSubscribe(MonoNext.java:70)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.lambda$onSubscribe$0(TracingSubscriber.java:57)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.onSubscribe(TracingSubscriber.java:57)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onSubscribe(FluxFilterFuseable.java:87)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.lambda$onSubscribe$0(TracingSubscriber.java:57)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.onSubscribe(TracingSubscriber.java:57)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onSubscribe(FluxHide.java:122)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.lambda$onSubscribe$0(TracingSubscriber.java:57)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.onSubscribe(TracingSubscriber.java:57)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onSubscribe(FluxHide.java:122)
at reactor.core.publisher.FluxReplay.subscribeOrReturn(FluxReplay.java:1181)
at reactor.core.publisher.InternalConnectableFluxOperator.subscribe(InternalConnectableFluxOperator.java:55)
at reactor.core.publisher.FluxAutoConnectFuseable.subscribe(FluxAutoConnectFuseable.java:61)
at reactor.core.publisher.Mono.subscribe(Mono.java:4490)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:263)
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.lambda$onNext$1(TracingSubscriber.java:62)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.onNext(TracingSubscriber.java:62)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1839)
at reactor.core.publisher.MonoCallable.subscribe(MonoCallable.java:62)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1839)
at com.azure.core.amqp.implementation.AmqpChannelProcessor.subscribe(AmqpChannelProcessor.java:264)
at reactor.core.publisher.Mono.subscribe(Mono.java:4490)
at reactor.core.publisher.Mono.block(Mono.java:1741)
at com.azure.spring.messaging.eventhubs.core.EventHubsTemplate.doSend(EventHubsTemplate.java:106)
at com.azure.spring.messaging.eventhubs.core.EventHubsTemplate.sendAsync(EventHubsTemplate.java:60)
at com.azure.spring.messaging.eventhubs.core.EventHubsTemplate.sendAsync(EventHubsTemplate.java:97)
at com.azure.spring.integration.core.handler.DefaultMessageHandler.handleMessageInternal(DefaultMessageHandler.java:86)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1074)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.router.AbstractMessageRouter.doSend(AbstractMessageRouter.java:213)
at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:195)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:88)
at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:37)
at org.springframework.integration.endpoint.ReactiveStreamsConsumer$SubscriberDecorator.hookOnNext(ReactiveStreamsConsumer.java:296)
at org.springframework.integration.endpoint.ReactiveStreamsConsumer$SubscriberDecorator.hookOnNext(ReactiveStreamsConsumer.java:277)
at reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.lambda$onNext$1(TracingSubscriber.java:62)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.withActiveSpan(TracingSubscriber.java:86)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.onNext(TracingSubscriber.java:62)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.lambda$onNext$1(TracingSubscriber.java:62)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.withActiveSpan(TracingSubscriber.java:86)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.onNext(TracingSubscriber.java:62)
at reactor.core.publisher.FluxRefCount$RefCountInner.onNext(FluxRefCount.java:200)
at reactor.core.publisher.FluxPublish$PublishSubscriber.drain(FluxPublish.java:490)
at reactor.core.publisher.FluxPublish$PublishSubscriber.onNext(FluxPublish.java:281)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.lambda$onNext$1(TracingSubscriber.java:62)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.withActiveSpan(TracingSubscriber.java:86)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.onNext(TracingSubscriber.java:62)
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:113)
at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:537)
at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:343)
at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)
at org.springframework.integration.channel.FluxMessageChannel.tryEmitMessage(FluxMessageChannel.java:82)
at org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:71)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.integration.channel.FluxMessageChannel.lambda$subscribeTo$5(FluxMessageChannel.java:123)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:196)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.lambda$onNext$1(TracingSubscriber.java:62)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.withActiveSpan(TracingSubscriber.java:86)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.onNext(TracingSubscriber.java:62)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Suppressed: java.lang.Exception: #block terminated with an error
\tat reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
\tat reactor.core.publisher.Mono.block(Mono.java:1742)
\t... 61 common frames omitted

We checked the Eventhub serivce dashboard, we confirmed that eventhub service is health, and no error request at that time.
In this error, it seems like the request is using a closed session, the close session may be a closed connection in the pool.

We think that the SDK should automatically recover from this connection error, but after checking the message sequence number, we found that the sender did not send this message to Eventhub due to the closure of the Session.

We want to find an exception capture mechanism, or any way to avoid this type of problem. So that when the problem occurs, we can retry, or at least record which message was not sent.

I did the following test:
I tried to add exception capture, then started the program, and disconnected from the network, issue can be reproduced but the exception capture method was never called back correctly:

Image

Why is this not a Bug or a feature Request?
Consult possible exception handling methods, maybe I'm just using it wrong.

Setup (please complete the following information if applicable):

  • OS: [e.g. iOS] Windows
  • IDE: [e.g. IntelliJ] IntelliJ
  • Library/Libraries: [e.g. com.azure:azure-core:1.16.0 (groupId:artifactId:version)] spring-cloud-azure-stream-binder-eventhubs 5.19.0 and 4.6

Information Checklist
Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report

  • Query Added
  • Setup information Added
@github-actions github-actions bot added the needs-triage Workflow: This is a new issue that needs to be triaged to the appropriate team. label Jan 20, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs-triage Workflow: This is a new issue that needs to be triaged to the appropriate team.
Projects
None yet
Development

No branches or pull requests

1 participant