-
Notifications
You must be signed in to change notification settings - Fork 368
Description
Expected behavior
Producer should not stuck in Send() function
Actual behavior
Producer stuck in Send() function and cannot recover
Steps to reproduce
This test case is similar to the existed TestChunkBlockIfQueueFull and I remove send() ctx timeout
func TestChunkBlockIfQueueFull(t *testing.T) {
sLogger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}))
client, err := NewClient(ClientOptions{
URL: lookupURL,
Logger: log.NewLoggerWithSlog(sLogger),
})
if err != nil {
t.Fatal(err)
}
topic := newTopicName()
producer, err := client.CreateProducer(ProducerOptions{
Name: "test",
Topic: topic,
EnableChunking: true,
DisableBatching: true,
MaxPendingMessages: 10,
ChunkMaxMessageSize: 10,
})
assert.NoError(t, err)
assert.NotNil(t, producer)
defer producer.Close()
ctx := context.Background()
_, err = producer.Send(ctx, &ProducerMessage{
Payload: createTestMessagePayload(101),
})
assert.Error(t, err)
}Output:
=== RUN TestChunkBlockIfQueueFull
time=2025-12-11T12:08:49.514+08:00 level=DEBUG msg="Getting pooled connection" logicalAddr=pulsar://localhost:6650 physicalAddr=pulsar://localhost:6650
time=2025-12-11T12:08:49.514+08:00 level=DEBUG msg="Wait until connection is ready state=Initializing" remote_addr=pulsar://localhost:6650
time=2025-12-11T12:08:49.514+08:00 level=INFO msg="Connecting to broker" remote_addr=pulsar://localhost:6650
time=2025-12-11T12:08:49.515+08:00 level=INFO msg="TCP connection established" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
time=2025-12-11T12:08:49.516+08:00 level=DEBUG msg="Write data: 49" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
time=2025-12-11T12:08:49.517+08:00 level=DEBUG msg="Got MaxMessageSize from handshake response:5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
time=2025-12-11T12:08:49.517+08:00 level=INFO msg="Connection is ready" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
time=2025-12-11T12:08:49.517+08:00 level=DEBUG msg="Connection run starting with request capacity=10 queued=0" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
time=2025-12-11T12:08:49.517+08:00 level=DEBUG msg="Write data: 63" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
time=2025-12-11T12:08:49.518+08:00 level=DEBUG msg="Got command! type:PARTITIONED_METADATA_RESPONSE partitionMetadataResponse:{partitions:0 request_id:1 response:Success} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
time=2025-12-11T12:08:49.518+08:00 level=DEBUG msg="Received command: type:PARTITIONED_METADATA_RESPONSE partitionMetadataResponse:{partitions:0 request_id:1 response:Success} -- payload: <nil>" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
time=2025-12-11T12:08:49.519+08:00 level=DEBUG msg="Got topic{my-topic-514059306} partitioned metadata response: &{Response:type:PARTITIONED_METADATA_RESPONSE partitionMetadataResponse:{partitions:0 request_id:1 response:Success} Cnx:0xc00017e300}" serviceURL=pulsar://localhost:6650 serviceURL=pulsar://localhost:6650
time=2025-12-11T12:08:49.519+08:00 level=DEBUG msg="Getting pooled connection" logicalAddr=pulsar://localhost:6650 physicalAddr=pulsar://localhost:6650
time=2025-12-11T12:08:49.519+08:00 level=DEBUG msg="Found connection in pool key=localhost:6650-localhost:6650-0 logical_addr=pulsar://localhost:6650 physical_addr=pulsar://localhost:6650"
time=2025-12-11T12:08:49.519+08:00 level=DEBUG msg="Write data: 67" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
time=2025-12-11T12:08:49.519+08:00 level=DEBUG msg="Got command! type:LOOKUP_RESPONSE lookupTopicResponse:{brokerServiceUrl:\"pulsar://localhost:6650\" response:Connect request_id:2 authoritative:true proxy_through_service_url:true} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
time=2025-12-11T12:08:49.519+08:00 level=DEBUG msg="Received command: type:LOOKUP_RESPONSE lookupTopicResponse:{brokerServiceUrl:\"pulsar://localhost:6650\" response:Connect request_id:2 authoritative:true proxy_through_service_url:true} -- payload: <nil>" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
time=2025-12-11T12:08:49.519+08:00 level=DEBUG msg="Got topic{persistent://public/default/my-topic-514059306} lookup response: &{Response:type:LOOKUP_RESPONSE lookupTopicResponse:{brokerServiceUrl:\"pulsar://localhost:6650\" response:Connect request_id:2 authoritative:true proxy_through_service_url:true} Cnx:0xc00017e300}" serviceURL=pulsar://localhost:6650 serviceURL=pulsar://localhost:6650
time=2025-12-11T12:08:49.520+08:00 level=DEBUG msg="Successfully looked up topic{persistent://public/default/my-topic-514059306} on broker. pulsar://localhost:6650 / - Use proxy: true" serviceURL=pulsar://localhost:6650 serviceURL=pulsar://localhost:6650
time=2025-12-11T12:08:49.520+08:00 level=DEBUG msg="Lookup result: &{pulsar://localhost:6650 pulsar://localhost:6650}" topic=persistent://public/default/my-topic-514059306
time=2025-12-11T12:08:49.520+08:00 level=DEBUG msg="The partition producer schema is nil" topic=persistent://public/default/my-topic-514059306
time=2025-12-11T12:08:49.520+08:00 level=DEBUG msg="Getting pooled connection" logicalAddr=pulsar://localhost:6650 physicalAddr=pulsar://localhost:6650
time=2025-12-11T12:08:49.520+08:00 level=DEBUG msg="Found connection in pool key=localhost:6650-localhost:6650-0 logical_addr=pulsar://localhost:6650 physical_addr=pulsar://localhost:6650"
time=2025-12-11T12:08:49.520+08:00 level=DEBUG msg="Write data: 78" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
time=2025-12-11T12:08:49.544+08:00 level=DEBUG msg="Got command! type:PRODUCER_SUCCESS producer_success:{request_id:3 producer_name:\"test\" last_sequence_id:-1 schema_version:\"\" producer_ready:true} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
time=2025-12-11T12:08:49.544+08:00 level=DEBUG msg="Received command: type:PRODUCER_SUCCESS producer_success:{request_id:3 producer_name:\"test\" last_sequence_id:-1 schema_version:\"\" producer_ready:true} -- payload: <nil>" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
time=2025-12-11T12:08:49.544+08:00 level=INFO msg="Connected producer" topic=persistent://public/default/my-topic-514059306 cnx="127.0.0.1:49974 -> 127.0.0.1:6650" epoch=0
time=2025-12-11T12:08:49.544+08:00 level=INFO msg="Created producer" topic=persistent://public/default/my-topic-514059306 producer_name=test producerID=1 cnx="127.0.0.1:49974 -> 127.0.0.1:6650"
time=2025-12-11T12:09:19.542+08:00 level=DEBUG msg="Got command! type:PING ping:{} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
time=2025-12-11T12:09:19.542+08:00 level=DEBUG msg="Received command: type:PING ping:{} -- payload: <nil>" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
time=2025-12-11T12:09:19.542+08:00 level=DEBUG msg="Responding to PING request" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
time=2025-12-11T12:09:19.542+08:00 level=DEBUG msg="Write data: 13" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
time=2025-12-11T12:09:19.542+08:00 level=DEBUG msg="Sending PING" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
time=2025-12-11T12:09:19.542+08:00 level=DEBUG msg="Write data: 13" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
time=2025-12-11T12:09:19.542+08:00 level=DEBUG msg="Got command! type:PONG pong:{} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
time=2025-12-11T12:09:19.542+08:00 level=DEBUG msg="Received command: type:PONG pong:{} -- payload: <nil>" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
time=2025-12-11T12:09:19.542+08:00 level=DEBUG msg="Received PONG response" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
System configuration
Broker version: apache-pulsar-4.0.8
Client version: master bdfa6a0
The current strategy when stuck in sending chunk messages is:
producer.Send() ->
p.internalSendAsync() ->
p.updateChunkInfo(sr): assemble sr.totalChunk num = msg's payload / maxChunkSize ->
p.reserveResources() ->
p.reserveSemaphore(sr): try to acquire p.publishSemaphore num = sr.totalChunk numWhen we produce a message with payload > maxChunkSize * MaxPendingMessages, this single message will occupy the entire partition producer's p.publishSemaphore and cannot be released, causing the entire partition producer sending progress block forever.
Since a message cannot be sent successfully if its payload size exceeds maxChunkSize * MaxPendingMessages, I believe we should perform payload size validation in p.validateMsg() or p.updateChunkInfo() before trying acquire p.publishSemaphore. If the payload exceeds the limit, we should directly generate an error similar to ErrMessageTooLarge and return it to the Send() caller function.