Skip to content

[Bug] Consumer cannot consume chunk messages after reconnect #1446

@geniusjoe

Description

@geniusjoe

Expected behavior

Consumer should consume chunk messages after reconnect

Actual behavior

Consumer will discard this bunch of chunk messages

Steps to reproduce

Test case:

func TestChunkWithReconnection(t *testing.T) {
	sLogger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}))
	client, err := NewClient(ClientOptions{
		URL:    lookupURL,
		Logger: log.NewLoggerWithSlog(sLogger),
	})
	assert.Nil(t, err)
	defer client.Close()

	topic := newTopicName()
	producer, err := client.CreateProducer(ProducerOptions{
		Topic:               topic,
		DisableBatching:     true,
		EnableChunking:      true,
		ChunkMaxMessageSize: 100,
		MaxPendingMessages:  200000,
		SendTimeout:         60 * time.Second,
	})
	assert.NoError(t, err)
	assert.NotNil(t, producer)

	c, err := client.Subscribe(ConsumerOptions{
		Topic:            topic,
		Type:             Exclusive,
		SubscriptionName: "chunk-subscriber",
	})
	assert.NoError(t, err)
	assert.NotNil(t, c)
	defer c.Close()

	// Reduce publish rate to prevent the producer sending messages too fast
	url := adminURL + "/" + "admin/v2/persistent/public/default/" + topic + "/publishRate"
	makeHTTPCall(t, http.MethodPost, url, "{\"publishThrottlingRateInMsg\": 1,\"publishThrottlingRateInByte\": 100}")
	// Need to wait some time to let the rate limiter take effect
	time.Sleep(2 * time.Second)

	// payload/ChunkMaxMessageSize = 1000/100 = 10 msg, and publishThrottlingRateInMsg = 1
	// so that this chunk msg will send finish after 10 seconds
	producer.SendAsync(context.Background(), &ProducerMessage{
		Payload: createTestMessagePayload(1000),
	}, func(id MessageID, producerMessage *ProducerMessage, err error) {
		assert.Nil(t, err)
	})
	assert.NoError(t, err)

	time.Sleep(5 * time.Second)
	//	trigger topic unload to test sending chunk msg with reconnection
	url = adminURL + "/" + "admin/v2/persistent/public/default/" + topic + "/unload"
	makeHTTPCall(t, http.MethodPut, url, "")
	// Need to wait some time to receive all chunk messages
	time.Sleep(10 * time.Second)

	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	msg, err := c.Receive(ctx)
	cancel()
	assert.NoError(t, err)
	assert.NotNil(t, msg.ID())
}

Log output:

GOROOT=/root/go/pkg/mod/golang.org/[email protected] #gosetup
GOPATH=/root/go #gosetup
/root/go/pkg/mod/golang.org/[email protected]/bin/go test -c -o /root/.cache/JetBrains/GoLand2024.3/tmp/GoLand/___TestChunkWithReconnection_in_github_com_apache_pulsar_client_go_pulsar.test github.com/apache/pulsar-client-go/pulsar #gosetup
/root/go/pkg/mod/golang.org/[email protected]/bin/go tool test2json -t /root/.cache/JetBrains/GoLand2024.3/tmp/GoLand/___TestChunkWithReconnection_in_github_com_apache_pulsar_client_go_pulsar.test -test.v=test2json -test.paniconexit0 -test.run ^\QTestChunkWithReconnection\E$ #gosetup
=== RUN   TestChunkWithReconnection
time=2025-12-10T23:22:07.100+08:00 level=DEBUG msg="Getting pooled connection" logicalAddr=pulsar://localhost:6650 physicalAddr=pulsar://localhost:6650
time=2025-12-10T23:22:07.100+08:00 level=DEBUG msg="Wait until connection is ready state=Initializing" remote_addr=pulsar://localhost:6650
time=2025-12-10T23:22:07.100+08:00 level=INFO msg="Connecting to broker" remote_addr=pulsar://localhost:6650
time=2025-12-10T23:22:07.100+08:00 level=INFO msg="TCP connection established" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:07.101+08:00 level=DEBUG msg="Write data: 49" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:07.102+08:00 level=DEBUG msg="Got MaxMessageSize from handshake response:5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:07.102+08:00 level=INFO msg="Connection is ready" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:07.102+08:00 level=DEBUG msg="Connection run starting with request capacity=10 queued=0" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:07.102+08:00 level=DEBUG msg="Write data: 63" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:07.104+08:00 level=DEBUG msg="Got command! type:PARTITIONED_METADATA_RESPONSE partitionMetadataResponse:{partitions:0 request_id:1 response:Success} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:07.104+08:00 level=DEBUG msg="Received command: type:PARTITIONED_METADATA_RESPONSE partitionMetadataResponse:{partitions:0 request_id:1 response:Success} -- payload: <nil>" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:07.104+08:00 level=DEBUG msg="Got topic{my-topic-100006541} partitioned metadata response: &{Response:type:PARTITIONED_METADATA_RESPONSE partitionMetadataResponse:{partitions:0 request_id:1 response:Success} Cnx:0xc00017e300}" serviceURL=pulsar://localhost:6650 serviceURL=pulsar://localhost:6650
time=2025-12-10T23:22:07.106+08:00 level=DEBUG msg="Getting pooled connection" logicalAddr=pulsar://localhost:6650 physicalAddr=pulsar://localhost:6650
time=2025-12-10T23:22:07.106+08:00 level=DEBUG msg="Found connection in pool key=localhost:6650-localhost:6650-0 logical_addr=pulsar://localhost:6650 physical_addr=pulsar://localhost:6650"
time=2025-12-10T23:22:07.107+08:00 level=DEBUG msg="Write data: 67" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:07.109+08:00 level=DEBUG msg="Got command! type:LOOKUP_RESPONSE lookupTopicResponse:{brokerServiceUrl:\"pulsar://localhost:6650\" response:Connect request_id:2 authoritative:true proxy_through_service_url:true} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:07.109+08:00 level=DEBUG msg="Received command: type:LOOKUP_RESPONSE lookupTopicResponse:{brokerServiceUrl:\"pulsar://localhost:6650\" response:Connect request_id:2 authoritative:true proxy_through_service_url:true} -- payload: <nil>" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:07.109+08:00 level=DEBUG msg="Got topic{persistent://public/default/my-topic-100006541} lookup response: &{Response:type:LOOKUP_RESPONSE lookupTopicResponse:{brokerServiceUrl:\"pulsar://localhost:6650\" response:Connect request_id:2 authoritative:true proxy_through_service_url:true} Cnx:0xc00017e300}" serviceURL=pulsar://localhost:6650 serviceURL=pulsar://localhost:6650
time=2025-12-10T23:22:07.109+08:00 level=DEBUG msg="Successfully looked up topic{persistent://public/default/my-topic-100006541} on broker. pulsar://localhost:6650 /  - Use proxy: true" serviceURL=pulsar://localhost:6650 serviceURL=pulsar://localhost:6650
time=2025-12-10T23:22:07.109+08:00 level=DEBUG msg="Lookup result: &{pulsar://localhost:6650 pulsar://localhost:6650}" topic=persistent://public/default/my-topic-100006541
time=2025-12-10T23:22:07.109+08:00 level=DEBUG msg="The partition producer schema is nil" topic=persistent://public/default/my-topic-100006541
time=2025-12-10T23:22:07.109+08:00 level=DEBUG msg="Getting pooled connection" logicalAddr=pulsar://localhost:6650 physicalAddr=pulsar://localhost:6650
time=2025-12-10T23:22:07.109+08:00 level=DEBUG msg="Found connection in pool key=localhost:6650-localhost:6650-0 logical_addr=pulsar://localhost:6650 physical_addr=pulsar://localhost:6650"
time=2025-12-10T23:22:07.109+08:00 level=DEBUG msg="Write data: 72" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:07.145+08:00 level=DEBUG msg="Got command! type:PRODUCER_SUCCESS producer_success:{request_id:3 producer_name:\"standalone-0-48\" last_sequence_id:-1 schema_version:\"\" producer_ready:true} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:07.145+08:00 level=DEBUG msg="Received command: type:PRODUCER_SUCCESS producer_success:{request_id:3 producer_name:\"standalone-0-48\" last_sequence_id:-1 schema_version:\"\" producer_ready:true} -- payload: <nil>" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:07.145+08:00 level=INFO msg="Connected producer" topic=persistent://public/default/my-topic-100006541 cnx="127.0.0.1:52280 -> 127.0.0.1:6650" epoch=0
time=2025-12-10T23:22:07.145+08:00 level=INFO msg="Created producer" topic=persistent://public/default/my-topic-100006541 producer_name=standalone-0-48 producerID=1 cnx="127.0.0.1:52280 -> 127.0.0.1:6650"
time=2025-12-10T23:22:07.145+08:00 level=DEBUG msg="Getting pooled connection" logicalAddr=pulsar://localhost:6650 physicalAddr=pulsar://localhost:6650
time=2025-12-10T23:22:07.145+08:00 level=DEBUG msg="Found connection in pool key=localhost:6650-localhost:6650-0 logical_addr=pulsar://localhost:6650 physical_addr=pulsar://localhost:6650"
time=2025-12-10T23:22:07.145+08:00 level=DEBUG msg="Write data: 63" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:07.146+08:00 level=DEBUG msg="Got command! type:PARTITIONED_METADATA_RESPONSE partitionMetadataResponse:{partitions:0 request_id:4 response:Success} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:07.146+08:00 level=DEBUG msg="Received command: type:PARTITIONED_METADATA_RESPONSE partitionMetadataResponse:{partitions:0 request_id:4 response:Success} -- payload: <nil>" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:07.146+08:00 level=DEBUG msg="Got topic{persistent://public/default/my-topic-100006541} partitioned metadata response: &{Response:type:PARTITIONED_METADATA_RESPONSE partitionMetadataResponse:{partitions:0 request_id:4 response:Success} Cnx:0xc00017e300}" serviceURL=pulsar://localhost:6650 serviceURL=pulsar://localhost:6650
time=2025-12-10T23:22:07.146+08:00 level=DEBUG msg="Getting pooled connection" logicalAddr=pulsar://localhost:6650 physicalAddr=pulsar://localhost:6650
time=2025-12-10T23:22:07.147+08:00 level=DEBUG msg="Found connection in pool key=localhost:6650-localhost:6650-0 logical_addr=pulsar://localhost:6650 physical_addr=pulsar://localhost:6650"
time=2025-12-10T23:22:07.147+08:00 level=DEBUG msg="Write data: 63" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:07.147+08:00 level=DEBUG msg="Got command! type:PARTITIONED_METADATA_RESPONSE partitionMetadataResponse:{partitions:0 request_id:5 response:Success} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:07.147+08:00 level=DEBUG msg="Received command: type:PARTITIONED_METADATA_RESPONSE partitionMetadataResponse:{partitions:0 request_id:5 response:Success} -- payload: <nil>" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:07.147+08:00 level=DEBUG msg="Got topic{persistent://public/default/my-topic-100006541} partitioned metadata response: &{Response:type:PARTITIONED_METADATA_RESPONSE partitionMetadataResponse:{partitions:0 request_id:5 response:Success} Cnx:0xc00017e300}" serviceURL=pulsar://localhost:6650 serviceURL=pulsar://localhost:6650
time=2025-12-10T23:22:07.147+08:00 level=DEBUG msg="Getting pooled connection" logicalAddr=pulsar://localhost:6650 physicalAddr=pulsar://localhost:6650
time=2025-12-10T23:22:07.147+08:00 level=DEBUG msg="Found connection in pool key=localhost:6650-localhost:6650-0 logical_addr=pulsar://localhost:6650 physical_addr=pulsar://localhost:6650"
time=2025-12-10T23:22:07.147+08:00 level=DEBUG msg="Write data: 67" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:07.148+08:00 level=DEBUG msg="Got command! type:LOOKUP_RESPONSE lookupTopicResponse:{brokerServiceUrl:\"pulsar://localhost:6650\" response:Connect request_id:6 authoritative:true proxy_through_service_url:true} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:07.148+08:00 level=DEBUG msg="Received command: type:LOOKUP_RESPONSE lookupTopicResponse:{brokerServiceUrl:\"pulsar://localhost:6650\" response:Connect request_id:6 authoritative:true proxy_through_service_url:true} -- payload: <nil>" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:07.148+08:00 level=DEBUG msg="Got topic{persistent://public/default/my-topic-100006541} lookup response: &{Response:type:LOOKUP_RESPONSE lookupTopicResponse:{brokerServiceUrl:\"pulsar://localhost:6650\" response:Connect request_id:6 authoritative:true proxy_through_service_url:true} Cnx:0xc00017e300}" serviceURL=pulsar://localhost:6650 serviceURL=pulsar://localhost:6650
time=2025-12-10T23:22:07.148+08:00 level=DEBUG msg="Successfully looked up topic{persistent://public/default/my-topic-100006541} on broker. pulsar://localhost:6650 /  - Use proxy: true" serviceURL=pulsar://localhost:6650 serviceURL=pulsar://localhost:6650
time=2025-12-10T23:22:07.148+08:00 level=DEBUG msg="Lookup result: &{pulsar://localhost:6650 pulsar://localhost:6650}" name=piefa topic=persistent://public/default/my-topic-100006541 subscription=chunk-subscriber consumerID=1
time=2025-12-10T23:22:07.148+08:00 level=DEBUG msg="The partition consumer schema is nil" name=piefa topic=persistent://public/default/my-topic-100006541 subscription=chunk-subscriber consumerID=1
time=2025-12-10T23:22:07.148+08:00 level=DEBUG msg="Getting pooled connection" logicalAddr=pulsar://localhost:6650 physicalAddr=pulsar://localhost:6650
time=2025-12-10T23:22:07.148+08:00 level=DEBUG msg="Found connection in pool key=localhost:6650-localhost:6650-0 logical_addr=pulsar://localhost:6650 physical_addr=pulsar://localhost:6650"
time=2025-12-10T23:22:07.148+08:00 level=DEBUG msg="Write data: 99" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:07.155+08:00 level=DEBUG msg="Got command! type:SUCCESS success:{request_id:7} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:07.155+08:00 level=DEBUG msg="Received command: type:SUCCESS success:{request_id:7} -- payload: <nil>" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:07.155+08:00 level=INFO msg="Connected consumer" name=piefa topic=persistent://public/default/my-topic-100006541 subscription=chunk-subscriber consumerID=1
time=2025-12-10T23:22:07.155+08:00 level=INFO msg="Created consumer" name=piefa topic=persistent://public/default/my-topic-100006541 subscription=chunk-subscriber consumerID=1
time=2025-12-10T23:22:07.155+08:00 level=DEBUG msg="dispatcher received connection event" name=piefa topic=persistent://public/default/my-topic-100006541 subscription=chunk-subscriber consumerID=1
time=2025-12-10T23:22:07.155+08:00 level=DEBUG msg="get into runEventsLoop" name=piefa topic=persistent://public/default/my-topic-100006541 subscription=chunk-subscriber consumerID=1
time=2025-12-10T23:22:07.155+08:00 level=DEBUG msg="dispatcher requesting initial permits=1000" name=piefa topic=persistent://public/default/my-topic-100006541 subscription=chunk-subscriber consumerID=1
time=2025-12-10T23:22:07.155+08:00 level=DEBUG msg="Write data: 17" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:09.174+08:00 level=DEBUG msg="Received send request: {[47 41 29 27 61 31 32 58 63 21 81 79 46 61 68 93 94 86 93 90 92 14 79 72 31 89 64 21 98 91 85 46 93 73 1 27 72 25 39 20 50 46 98 47 77 86 80 82 51 37 94 46 67 63 34 76 18 39 23 17 75 39 22 29 32 32 51 8 44 64 98 88 76 68 76 1 43 74 4 57 43 34 99 46 51 10 81 76 92 85 31 79 75 13 28 30 58 60 65 68 44 18 90 58 44 87 97 40 47 51 46 39 66 29 91 55 7 34 3 48 26 49 96 55 37 11 54 95 97 17 19 21 36 79 12 71 94 22 56 23 89 56 10 78 22 37 87 60 79 8 17 52 8 43 20 64 55 54 1 42 37 13 86 57 52 20 43 8 53 88 18 2 1 97 64 53 88 90 33 48 82 14 85 93 31 30 83 61 70 68 56 90 23 31 77 3 65 79 18 33 47 10 12 16 31 77 26 72 84 22 22 62 57 48 86 0 32 95 88 50 70 33 2 21 63 84 50 60 68 79 91 30 8 47 60 76 21 20 14 91 22 55 56 11 41 93 71 48 49 77 53 70 69 1 46 52 81 49 4 85 49 17 7 19 47 69 73 60 94 45 79 92 65 16 9 50 1 56 23 39 25 53 91 58 17 12 99 3 37 68 4 41 24 98 59 20 39 93 69 55 6 15 75 12 10 18 58 96 53 27 69 10 74 10 29 75 44 21 16 93 78 54 0 96 27 74 21 38 56 30 61 69 32 3 95 32 64 92 19 72 64 92 66 58 16 6 13 26 89 11 7 17 47 33 33 30 28 95 88 68 61 20 97 59 78 87 36 73 23 37 48 48 97 44 59 87 55 82 88 26 35 73 85 25 57 56 43 22 65 91 88 77 35 24 36 86 58 69 77 18 11 64 27 65 84 23 75 7 82 73 74 96 25 84 92 91 30 86 97 80 93 86 48 88 1 28 45 73 19 40 4 88 51 86 82 28 87 80 66 37 51 60 93 32 42 31 28 87 21 2 83 68 39 31 69 48 64 2 53 12 42 49 79 90 49 46 10 8 66 43 83 42 13 66 99 51 37 11 72 18 41 67 16 78 88 43 7 87 10 5 57 17 57 51 81 59 84 63 88 72 77 40 83 77 28 16 22 90 57 70 28 93 6 37 28 62 40 75 46 78 20 60 49 9 25 47 34 1 69 72 92 61 20 49 42 22 35 72 62 74 42 39 53 3 59 59 35 38 38 29 65 86 80 74 13 67 80 58 87 26 62 94 7 22 48 13 74 48 47 6 93 82 63 31 77 89 90 30 44 33 74 45 72 36 60 84 53 54 20 95 33 74 26 0 38 52 80 10 95 57 98 72 20 85 99 2 82 43 73 45 72 81 3 49 3 30 31 97 38 11 88 9 57 1 56 93 76 25 97 7 20 72 9 78 94 47 50 64 86 29 46 76 16 72 26 88 65 9 85 53 82 18 64 16 55 36 63 4 3 10 38 52 20 7 16 59 42 11 78 32 98 61 2 63 73 23 30 82 64 51 27 59 74 69 23 30 42 88 78 30 33 48 91 75 63 40 25 33 8 73 86 99 22 15 0 16 6 78 46 78 39 10 24 48 1 41 83 40 70 46 68 12 9 90 2 94 31 48 98 12 9 43 24 96 45 76 8 21 20 98 40 79 66 77 17 70 41 89 60 0 89 94 7 80 80 96 38 6 47 67 30 83 21 64 72 58 46 18 25 70 73 41 44 72 49 89 17 47 71 76 39 74 76 68 83 39 38 63 29 25 69 78 61 55 74 19 30 81 89 58 84 46 75 10 93 38 83 90 32 47 89 45 4 20 12 89 77 81 73 92 86 24 29 50 82 68 77 35 53 83 32 50 62 65 6 27 1 40 95 35 95 60 35 15 3 29 23 0 45 98 42 85 79 97 93 22 65 14 55 75 56 67 47 67 23 82 38 40 69 39 29 95 79 23 85 14 31 78 95 99 59 34 95 25 37 26 89 28 36 22 56 7 23 39 12 94 21 60 86 1 23 56 79 60 66 30 85 0 15 77 28 96 48 57 73 70 0 96 99 39 20 15 81 78 92 81 10 34 91 63 0 83 36 11 90 28 89 78 81 26 67 73 65 15 6 22 87 89 68 49 92 80 10 41 86 16 80 30 47 89 21 11 12 61 95 79 43 76 61 74 44 39 60 75 72 98 52 49 17 55 23 58 17 46 93 65 27 36 14 69 66 79 3 11 87 74 34 43 72 67] <nil>   map[] 0001-01-01 00:00:00 +0000 UTC [] false <nil> 0s 0001-01-01 00:00:00 +0000 UTC <nil> <nil>}" topic=persistent://public/default/my-topic-100006541 producer_name=standalone-0-48 producerID=1
time=2025-12-10T23:22:09.174+08:00 level=DEBUG msg="Write data: 187" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:09.174+08:00 level=DEBUG msg="Write data: 187" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:09.174+08:00 level=DEBUG msg="Write data: 187" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:09.174+08:00 level=DEBUG msg="Write data: 187" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:09.174+08:00 level=DEBUG msg="Write data: 187" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:09.174+08:00 level=DEBUG msg="Write data: 187" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:09.174+08:00 level=DEBUG msg="Write data: 187" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:09.174+08:00 level=DEBUG msg="Write data: 187" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:09.174+08:00 level=DEBUG msg="Write data: 187" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:09.174+08:00 level=DEBUG msg="Write data: 187" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:09.176+08:00 level=DEBUG msg="Got command! type:SEND_RECEIPT send_receipt:{producer_id:1 sequence_id:0 message_id:{ledgerId:97 entryId:0} highest_sequence_id:18446744073709551615} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:09.176+08:00 level=DEBUG msg="Received command: type:SEND_RECEIPT send_receipt:{producer_id:1 sequence_id:0 message_id:{ledgerId:97 entryId:0} highest_sequence_id:18446744073709551615} -- payload: <nil>" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:09.176+08:00 level=DEBUG msg="Got command! type:MESSAGE message:{consumer_id:1 message_id:{ledgerId:97 entryId:0 partition:-1}} with payload size: 169 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:09.176+08:00 level=DEBUG msg="Received command: type:MESSAGE message:{consumer_id:1 message_id:{ledgerId:97 entryId:0 partition:-1}} -- payload: &{[14 1 67 43 72 88 0 0 0 59 10 15 115 116 97 110 100 97 108 111 110 101 45 48 45 52 56 16 0 24 149 131 236 198 176 51 72 232 7 210 1 17 115 116 97 110 100 97 108 111 110 101 45 48 45 52 56 45 48 216 1 10 224 1 232 7 232 1 0 47 41 29 27 61 31 32 58 63 21 81 79 46 61 68 93 94 86 93 90 92 14 79 72 31 89 64 21 98 91 85 46 93 73 1 27 72 25 39 20 50 46 98 47 77 86 80 82 51 37 94 46 67 63 34 76 18 39 23 17 75 39 22 29 32 32 51 8 44 64 98 88 76 68 76 1 43 74 4 57 43 34 99 46 51 10 81 76 92 85 31 79 75 13 28 30 58 60 65 68] 0 169}" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:09.176+08:00 level=DEBUG msg="Got Message: consumer_id:1 message_id:{ledgerId:97 entryId:0 partition:-1}" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:09.176+08:00 level=DEBUG msg="Received chunk messageId 97:0:0, uuid = standalone-0-48-0, chunkId = 0, total-chunks 10" name=piefa topic=persistent://public/default/my-topic-100006541 subscription=chunk-subscriber consumerID=1
time=2025-12-10T23:22:10.176+08:00 level=DEBUG msg="Got command! type:SEND_RECEIPT send_receipt:{producer_id:1 sequence_id:0 message_id:{ledgerId:97 entryId:1} highest_sequence_id:18446744073709551615} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:10.176+08:00 level=DEBUG msg="Got command! type:MESSAGE message:{consumer_id:1 message_id:{ledgerId:97 entryId:1 partition:-1}} with payload size: 169 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:10.176+08:00 level=DEBUG msg="Received command: type:SEND_RECEIPT send_receipt:{producer_id:1 sequence_id:0 message_id:{ledgerId:97 entryId:1} highest_sequence_id:18446744073709551615} -- payload: <nil>" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:10.176+08:00 level=DEBUG msg="Received command: type:MESSAGE message:{consumer_id:1 message_id:{ledgerId:97 entryId:1 partition:-1}} -- payload: &{[14 1 37 76 146 137 0 0 0 59 10 15 115 116 97 110 100 97 108 111 110 101 45 48 45 52 56 16 0 24 149 131 236 198 176 51 72 232 7 210 1 17 115 116 97 110 100 97 108 111 110 101 45 48 45 52 56 45 48 216 1 10 224 1 232 7 232 1 1 44 18 90 58 44 87 97 40 47 51 46 39 66 29 91 55 7 34 3 48 26 49 96 55 37 11 54 95 97 17 19 21 36 79 12 71 94 22 56 23 89 56 10 78 22 37 87 60 79 8 17 52 8 43 20 64 55 54 1 42 37 13 86 57 52 20 43 8 53 88 18 2 1 97 64 53 88 90 33 48 82 14 85 93 31 30 83 61 70 68 56 90 23 31 77 3 65 79 18 33] 0 169}" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:10.176+08:00 level=DEBUG msg="Got Message: consumer_id:1 message_id:{ledgerId:97 entryId:1 partition:-1}" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:10.176+08:00 level=DEBUG msg="Received chunk messageId 97:1:0, uuid = standalone-0-48-0, chunkId = 1, total-chunks 10" name=piefa topic=persistent://public/default/my-topic-100006541 subscription=chunk-subscriber consumerID=1
time=2025-12-10T23:22:11.566+08:00 level=DEBUG msg="Got command! type:SEND_RECEIPT send_receipt:{producer_id:1 sequence_id:0 message_id:{ledgerId:97 entryId:2} highest_sequence_id:18446744073709551615} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:11.566+08:00 level=DEBUG msg="Got command! type:MESSAGE message:{consumer_id:1 message_id:{ledgerId:97 entryId:2 partition:-1}} with payload size: 169 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:11.566+08:00 level=DEBUG msg="Received command: type:SEND_RECEIPT send_receipt:{producer_id:1 sequence_id:0 message_id:{ledgerId:97 entryId:2} highest_sequence_id:18446744073709551615} -- payload: <nil>" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:11.567+08:00 level=DEBUG msg="Received command: type:MESSAGE message:{consumer_id:1 message_id:{ledgerId:97 entryId:2 partition:-1}} -- payload: &{[14 1 123 243 95 91 0 0 0 59 10 15 115 116 97 110 100 97 108 111 110 101 45 48 45 52 56 16 0 24 149 131 236 198 176 51 72 232 7 210 1 17 115 116 97 110 100 97 108 111 110 101 45 48 45 52 56 45 48 216 1 10 224 1 232 7 232 1 2 47 10 12 16 31 77 26 72 84 22 22 62 57 48 86 0 32 95 88 50 70 33 2 21 63 84 50 60 68 79 91 30 8 47 60 76 21 20 14 91 22 55 56 11 41 93 71 48 49 77 53 70 69 1 46 52 81 49 4 85 49 17 7 19 47 69 73 60 94 45 79 92 65 16 9 50 1 56 23 39 25 53 91 58 17 12 99 3 37 68 4 41 24 98 59 20 39 93 69 55] 0 169}" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:11.567+08:00 level=DEBUG msg="Got Message: consumer_id:1 message_id:{ledgerId:97 entryId:2 partition:-1}" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:11.567+08:00 level=DEBUG msg="Received chunk messageId 97:2:0, uuid = standalone-0-48-0, chunkId = 2, total-chunks 10" name=piefa topic=persistent://public/default/my-topic-100006541 subscription=chunk-subscriber consumerID=1
time=2025-12-10T23:22:13.257+08:00 level=DEBUG msg="Got command! type:SEND_RECEIPT send_receipt:{producer_id:1 sequence_id:0 message_id:{ledgerId:97 entryId:3} highest_sequence_id:18446744073709551615} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:13.257+08:00 level=DEBUG msg="Got command! type:MESSAGE message:{consumer_id:1 message_id:{ledgerId:97 entryId:3 partition:-1}} with payload size: 169 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:13.257+08:00 level=DEBUG msg="Received command: type:SEND_RECEIPT send_receipt:{producer_id:1 sequence_id:0 message_id:{ledgerId:97 entryId:3} highest_sequence_id:18446744073709551615} -- payload: <nil>" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:13.257+08:00 level=DEBUG msg="Received command: type:MESSAGE message:{consumer_id:1 message_id:{ledgerId:97 entryId:3 partition:-1}} -- payload: &{[14 1 163 64 200 170 0 0 0 59 10 15 115 116 97 110 100 97 108 111 110 101 45 48 45 52 56 16 0 24 149 131 236 198 176 51 72 232 7 210 1 17 115 116 97 110 100 97 108 111 110 101 45 48 45 52 56 45 48 216 1 10 224 1 232 7 232 1 3 6 15 75 12 10 18 58 96 53 27 69 10 74 10 29 75 44 21 16 93 78 54 0 96 27 74 21 38 56 30 61 69 32 3 95 32 64 92 19 72 64 92 66 58 16 6 13 26 89 11 7 17 47 33 33 30 28 95 88 68 61 20 97 59 78 87 36 73 23 37 48 48 97 44 59 87 55 82 88 26 35 73 85 25 57 56 43 22 65 91 88 77 35 24 36 86 58 69 77 18] 0 169}" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:13.257+08:00 level=DEBUG msg="Got Message: consumer_id:1 message_id:{ledgerId:97 entryId:3 partition:-1}" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:13.257+08:00 level=DEBUG msg="Received chunk messageId 97:3:0, uuid = standalone-0-48-0, chunkId = 3, total-chunks 10" name=piefa topic=persistent://public/default/my-topic-100006541 subscription=chunk-subscriber consumerID=1
time=2025-12-10T23:22:14.176+08:00 level=DEBUG msg="Got command! type:CLOSE_PRODUCER close_producer:{producer_id:1 request_id:18446744073709551615} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:14.176+08:00 level=DEBUG msg="Got command! type:CLOSE_CONSUMER close_consumer:{consumer_id:1 request_id:18446744073709551615} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:14.177+08:00 level=DEBUG msg="Received command: type:CLOSE_PRODUCER close_producer:{producer_id:1 request_id:18446744073709551615} -- payload: <nil>" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:14.177+08:00 level=INFO msg="Broker notification of Closed producer: 1" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:14.177+08:00 level=WARN msg="Connection was closed" topic=persistent://public/default/my-topic-100006541 producer_name=standalone-0-48 producerID=1 cnx="127.0.0.1:52280 -> 127.0.0.1:6650"
time=2025-12-10T23:22:14.177+08:00 level=DEBUG msg="Received command: type:CLOSE_CONSUMER close_consumer:{consumer_id:1 request_id:18446744073709551615} -- payload: <nil>" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:14.177+08:00 level=INFO msg="Broker notification of Closed consumer: 1" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:14.177+08:00 level=DEBUG msg="connection closed and send to connectClosedCh" name=piefa topic=persistent://public/default/my-topic-100006541 subscription=chunk-subscriber consumerID=1
time=2025-12-10T23:22:14.177+08:00 level=DEBUG msg="runEventsLoop will reconnect" name=piefa topic=persistent://public/default/my-topic-100006541 subscription=chunk-subscriber consumerID=1
time=2025-12-10T23:22:14.177+08:00 level=INFO msg="runEventsLoop will reconnect in producer" topic=persistent://public/default/my-topic-100006541 producer_name=standalone-0-48 producerID=1
time=2025-12-10T23:22:14.177+08:00 level=INFO msg="Reconnecting to broker" name=piefa topic=persistent://public/default/my-topic-100006541 subscription=chunk-subscriber consumerID=1 delayReconnectTime=103.498728ms assignedBrokerURL=""
time=2025-12-10T23:22:14.177+08:00 level=INFO msg="Reconnecting to broker" topic=persistent://public/default/my-topic-100006541 producer_name=standalone-0-48 producerID=1 assignedBrokerURL="" delayReconnectTime=107.199818ms
time=2025-12-10T23:22:14.281+08:00 level=DEBUG msg="Getting pooled connection" logicalAddr=pulsar://localhost:6650 physicalAddr=pulsar://localhost:6650
time=2025-12-10T23:22:14.281+08:00 level=DEBUG msg="Found connection in pool key=localhost:6650-localhost:6650-0 logical_addr=pulsar://localhost:6650 physical_addr=pulsar://localhost:6650"
time=2025-12-10T23:22:14.281+08:00 level=DEBUG msg="Write data: 67" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:14.284+08:00 level=DEBUG msg="Getting pooled connection" logicalAddr=pulsar://localhost:6650 physicalAddr=pulsar://localhost:6650
time=2025-12-10T23:22:14.284+08:00 level=DEBUG msg="Found connection in pool key=localhost:6650-localhost:6650-0 logical_addr=pulsar://localhost:6650 physical_addr=pulsar://localhost:6650"
time=2025-12-10T23:22:14.284+08:00 level=DEBUG msg="Write data: 67" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:14.948+08:00 level=DEBUG msg="Got command! type:LOOKUP_RESPONSE lookupTopicResponse:{brokerServiceUrl:\"pulsar://localhost:6650\" response:Connect request_id:8 authoritative:true proxy_through_service_url:true} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:14.948+08:00 level=DEBUG msg="Got command! type:LOOKUP_RESPONSE lookupTopicResponse:{brokerServiceUrl:\"pulsar://localhost:6650\" response:Connect request_id:9 authoritative:true proxy_through_service_url:true} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:14.948+08:00 level=DEBUG msg="Received command: type:LOOKUP_RESPONSE lookupTopicResponse:{brokerServiceUrl:\"pulsar://localhost:6650\" response:Connect request_id:8 authoritative:true proxy_through_service_url:true} -- payload: <nil>" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:14.948+08:00 level=DEBUG msg="Received command: type:LOOKUP_RESPONSE lookupTopicResponse:{brokerServiceUrl:\"pulsar://localhost:6650\" response:Connect request_id:9 authoritative:true proxy_through_service_url:true} -- payload: <nil>" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:14.948+08:00 level=DEBUG msg="Got topic{persistent://public/default/my-topic-100006541} lookup response: &{Response:type:LOOKUP_RESPONSE lookupTopicResponse:{brokerServiceUrl:\"pulsar://localhost:6650\" response:Connect request_id:9 authoritative:true proxy_through_service_url:true} Cnx:0xc00017e300}" serviceURL=pulsar://localhost:6650 serviceURL=pulsar://localhost:6650
time=2025-12-10T23:22:14.948+08:00 level=DEBUG msg="Successfully looked up topic{persistent://public/default/my-topic-100006541} on broker. pulsar://localhost:6650 /  - Use proxy: true" serviceURL=pulsar://localhost:6650 serviceURL=pulsar://localhost:6650
time=2025-12-10T23:22:14.948+08:00 level=DEBUG msg="Got topic{persistent://public/default/my-topic-100006541} lookup response: &{Response:type:LOOKUP_RESPONSE lookupTopicResponse:{brokerServiceUrl:\"pulsar://localhost:6650\" response:Connect request_id:8 authoritative:true proxy_through_service_url:true} Cnx:0xc00017e300}" serviceURL=pulsar://localhost:6650 serviceURL=pulsar://localhost:6650
time=2025-12-10T23:22:14.948+08:00 level=DEBUG msg="Lookup result: &{pulsar://localhost:6650 pulsar://localhost:6650}" topic=persistent://public/default/my-topic-100006541 producer_name=standalone-0-48 producerID=1
time=2025-12-10T23:22:14.948+08:00 level=DEBUG msg="Successfully looked up topic{persistent://public/default/my-topic-100006541} on broker. pulsar://localhost:6650 /  - Use proxy: true" serviceURL=pulsar://localhost:6650 serviceURL=pulsar://localhost:6650
time=2025-12-10T23:22:14.948+08:00 level=DEBUG msg="The partition producer schema is nil" topic=persistent://public/default/my-topic-100006541 producer_name=standalone-0-48 producerID=1
time=2025-12-10T23:22:14.948+08:00 level=DEBUG msg="Lookup result: &{pulsar://localhost:6650 pulsar://localhost:6650}" name=piefa topic=persistent://public/default/my-topic-100006541 subscription=chunk-subscriber consumerID=1
time=2025-12-10T23:22:14.948+08:00 level=DEBUG msg="The partition consumer schema is nil" name=piefa topic=persistent://public/default/my-topic-100006541 subscription=chunk-subscriber consumerID=1
time=2025-12-10T23:22:14.948+08:00 level=DEBUG msg="Getting pooled connection" logicalAddr=pulsar://localhost:6650 physicalAddr=pulsar://localhost:6650
time=2025-12-10T23:22:14.948+08:00 level=DEBUG msg="Found connection in pool key=localhost:6650-localhost:6650-0 logical_addr=pulsar://localhost:6650 physical_addr=pulsar://localhost:6650"
time=2025-12-10T23:22:14.948+08:00 level=DEBUG msg="Getting pooled connection" logicalAddr=pulsar://localhost:6650 physicalAddr=pulsar://localhost:6650
time=2025-12-10T23:22:14.948+08:00 level=DEBUG msg="Write data: 91" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:14.948+08:00 level=DEBUG msg="Found connection in pool key=localhost:6650-localhost:6650-0 logical_addr=pulsar://localhost:6650 physical_addr=pulsar://localhost:6650"
time=2025-12-10T23:22:14.948+08:00 level=DEBUG msg="Write data: 99" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:14.968+08:00 level=DEBUG msg="Got command! type:PRODUCER_SUCCESS producer_success:{request_id:10 producer_name:\"standalone-0-48\" last_sequence_id:-1 schema_version:\"\" producer_ready:true} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:14.968+08:00 level=DEBUG msg="Received command: type:PRODUCER_SUCCESS producer_success:{request_id:10 producer_name:\"standalone-0-48\" last_sequence_id:-1 schema_version:\"\" producer_ready:true} -- payload: <nil>" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:14.968+08:00 level=INFO msg="Connected producer" topic=persistent://public/default/my-topic-100006541 producer_name=standalone-0-48 producerID=1 cnx="127.0.0.1:52280 -> 127.0.0.1:6650" epoch=1
time=2025-12-10T23:22:14.968+08:00 level=INFO msg="Resending 6 pending batches" topic=persistent://public/default/my-topic-100006541 producer_name=standalone-0-48 producerID=1
time=2025-12-10T23:22:14.968+08:00 level=INFO msg="Reconnected producer to broker" topic=persistent://public/default/my-topic-100006541 producer_name=standalone-0-48 producerID=1 cnx="127.0.0.1:52280 -> 127.0.0.1:6650"
time=2025-12-10T23:22:14.968+08:00 level=DEBUG msg="Write data: 187" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:14.968+08:00 level=DEBUG msg="Write data: 187" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:14.968+08:00 level=DEBUG msg="Got command! type:SUCCESS success:{request_id:11} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:14.968+08:00 level=DEBUG msg="Write data: 187" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:14.968+08:00 level=DEBUG msg="Received command: type:SUCCESS success:{request_id:11} -- payload: <nil>" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:14.968+08:00 level=DEBUG msg="Write data: 187" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:14.968+08:00 level=DEBUG msg="Write data: 187" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:14.968+08:00 level=DEBUG msg="Write data: 187" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:14.968+08:00 level=INFO msg="Connected consumer" name=piefa topic=persistent://public/default/my-topic-100006541 subscription=chunk-subscriber consumerID=1
time=2025-12-10T23:22:14.968+08:00 level=INFO msg="Reconnected consumer to broker" name=piefa topic=persistent://public/default/my-topic-100006541 subscription=chunk-subscriber consumerID=1
time=2025-12-10T23:22:14.968+08:00 level=DEBUG msg="dispatcher received connection event" name=piefa topic=persistent://public/default/my-topic-100006541 subscription=chunk-subscriber consumerID=1
time=2025-12-10T23:22:14.968+08:00 level=DEBUG msg="dispatcher requesting initial permits=1000" name=piefa topic=persistent://public/default/my-topic-100006541 subscription=chunk-subscriber consumerID=1
time=2025-12-10T23:22:14.968+08:00 level=DEBUG msg="Write data: 17" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:14.970+08:00 level=DEBUG msg="Got command! type:SEND_RECEIPT send_receipt:{producer_id:1 sequence_id:0 message_id:{ledgerId:98 entryId:0} highest_sequence_id:18446744073709551615} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:14.970+08:00 level=DEBUG msg="Received command: type:SEND_RECEIPT send_receipt:{producer_id:1 sequence_id:0 message_id:{ledgerId:98 entryId:0} highest_sequence_id:18446744073709551615} -- payload: <nil>" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:15.970+08:00 level=DEBUG msg="Got command! type:SEND_RECEIPT send_receipt:{producer_id:1 sequence_id:0 message_id:{ledgerId:98 entryId:1} highest_sequence_id:18446744073709551615} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:15.970+08:00 level=DEBUG msg="Received command: type:SEND_RECEIPT send_receipt:{producer_id:1 sequence_id:0 message_id:{ledgerId:98 entryId:1} highest_sequence_id:18446744073709551615} -- payload: <nil>" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:17.361+08:00 level=DEBUG msg="Got command! type:SEND_RECEIPT send_receipt:{producer_id:1 sequence_id:0 message_id:{ledgerId:98 entryId:2} highest_sequence_id:18446744073709551615} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:17.361+08:00 level=DEBUG msg="Received command: type:SEND_RECEIPT send_receipt:{producer_id:1 sequence_id:0 message_id:{ledgerId:98 entryId:2} highest_sequence_id:18446744073709551615} -- payload: <nil>" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:19.051+08:00 level=DEBUG msg="Got command! type:SEND_RECEIPT send_receipt:{producer_id:1 sequence_id:0 message_id:{ledgerId:98 entryId:3} highest_sequence_id:18446744073709551615} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:19.051+08:00 level=DEBUG msg="Received command: type:SEND_RECEIPT send_receipt:{producer_id:1 sequence_id:0 message_id:{ledgerId:98 entryId:3} highest_sequence_id:18446744073709551615} -- payload: <nil>" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:20.743+08:00 level=DEBUG msg="Got command! type:SEND_RECEIPT send_receipt:{producer_id:1 sequence_id:0 message_id:{ledgerId:98 entryId:4} highest_sequence_id:18446744073709551615} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:20.743+08:00 level=DEBUG msg="Received command: type:SEND_RECEIPT send_receipt:{producer_id:1 sequence_id:0 message_id:{ledgerId:98 entryId:4} highest_sequence_id:18446744073709551615} -- payload: <nil>" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:22.433+08:00 level=DEBUG msg="Got command! type:SEND_RECEIPT send_receipt:{producer_id:1 sequence_id:0 message_id:{ledgerId:98 entryId:5} highest_sequence_id:18446744073709551615} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:22.433+08:00 level=DEBUG msg="Received command: type:SEND_RECEIPT send_receipt:{producer_id:1 sequence_id:0 message_id:{ledgerId:98 entryId:5} highest_sequence_id:18446744073709551615} -- payload: <nil>" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:24.125+08:00 level=DEBUG msg="Got command! type:MESSAGE message:{consumer_id:1 message_id:{ledgerId:97 entryId:0 partition:-1}} with payload size: 169 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:24.125+08:00 level=DEBUG msg="Got command! type:MESSAGE message:{consumer_id:1 message_id:{ledgerId:97 entryId:1 partition:-1}} with payload size: 169 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:24.125+08:00 level=DEBUG msg="Got command! type:MESSAGE message:{consumer_id:1 message_id:{ledgerId:97 entryId:2 partition:-1}} with payload size: 169 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:24.125+08:00 level=DEBUG msg="Got command! type:MESSAGE message:{consumer_id:1 message_id:{ledgerId:97 entryId:3 partition:-1}} with payload size: 169 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:24.125+08:00 level=DEBUG msg="Got command! type:MESSAGE message:{consumer_id:1 message_id:{ledgerId:98 entryId:0 partition:-1}} with payload size: 169 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:24.125+08:00 level=DEBUG msg="Got command! type:MESSAGE message:{consumer_id:1 message_id:{ledgerId:98 entryId:1 partition:-1}} with payload size: 169 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:24.125+08:00 level=DEBUG msg="Received command: type:MESSAGE message:{consumer_id:1 message_id:{ledgerId:97 entryId:0 partition:-1}} -- payload: &{[14 1 67 43 72 88 0 0 0 59 10 15 115 116 97 110 100 97 108 111 110 101 45 48 45 52 56 16 0 24 149 131 236 198 176 51 72 232 7 210 1 17 115 116 97 110 100 97 108 111 110 101 45 48 45 52 56 45 48 216 1 10 224 1 232 7 232 1 0 47 41 29 27 61 31 32 58 63 21 81 79 46 61 68 93 94 86 93 90 92 14 79 72 31 89 64 21 98 91 85 46 93 73 1 27 72 25 39 20 50 46 98 47 77 86 80 82 51 37 94 46 67 63 34 76 18 39 23 17 75 39 22 29 32 32 51 8 44 64 98 88 76 68 76 1 43 74 4 57 43 34 99 46 51 10 81 76 92 85 31 79 75 13 28 30 58 60 65 68] 0 169}" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:24.125+08:00 level=DEBUG msg="Got command! type:MESSAGE message:{consumer_id:1 message_id:{ledgerId:98 entryId:2 partition:-1}} with payload size: 169 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:24.125+08:00 level=DEBUG msg="Got Message: consumer_id:1 message_id:{ledgerId:97 entryId:0 partition:-1}" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:24.125+08:00 level=DEBUG msg="Got command! type:MESSAGE message:{consumer_id:1 message_id:{ledgerId:98 entryId:3 partition:-1}} with payload size: 169 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:24.125+08:00 level=DEBUG msg="Received chunk messageId 97:0:0, uuid = standalone-0-48-0, chunkId = 0, total-chunks 10" name=piefa topic=persistent://public/default/my-topic-100006541 subscription=chunk-subscriber consumerID=1
time=2025-12-10T23:22:24.125+08:00 level=WARN msg="Received unexpected chunk messageId 97:0:0, last-chunk-id 3, chunkId = 0, total-chunks 10" name=piefa topic=persistent://public/default/my-topic-100006541 subscription=chunk-subscriber consumerID=1
time=2025-12-10T23:22:24.125+08:00 level=DEBUG msg="Got command! type:MESSAGE message:{consumer_id:1 message_id:{ledgerId:98 entryId:4 partition:-1}} with payload size: 169 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:24.125+08:00 level=DEBUG msg="Received command: type:MESSAGE message:{consumer_id:1 message_id:{ledgerId:97 entryId:1 partition:-1}} -- payload: &{[14 1 37 76 146 137 0 0 0 59 10 15 115 116 97 110 100 97 108 111 110 101 45 48 45 52 56 16 0 24 149 131 236 198 176 51 72 232 7 210 1 17 115 116 97 110 100 97 108 111 110 101 45 48 45 52 56 45 48 216 1 10 224 1 232 7 232 1 1 44 18 90 58 44 87 97 40 47 51 46 39 66 29 91 55 7 34 3 48 26 49 96 55 37 11 54 95 97 17 19 21 36 79 12 71 94 22 56 23 89 56 10 78 22 37 87 60 79 8 17 52 8 43 20 64 55 54 1 42 37 13 86 57 52 20 43 8 53 88 18 2 1 97 64 53 88 90 33 48 82 14 85 93 31 30 83 61 70 68 56 90 23 31 77 3 65 79 18 33] 0 169}" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:24.125+08:00 level=DEBUG msg="Got command! type:MESSAGE message:{consumer_id:1 message_id:{ledgerId:98 entryId:5 partition:-1}} with payload size: 169 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:24.125+08:00 level=DEBUG msg="Got Message: consumer_id:1 message_id:{ledgerId:97 entryId:1 partition:-1}" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:24.125+08:00 level=DEBUG msg="Received chunk messageId 97:1:0, uuid = standalone-0-48-0, chunkId = 1, total-chunks 10" name=piefa topic=persistent://public/default/my-topic-100006541 subscription=chunk-subscriber consumerID=1
time=2025-12-10T23:22:24.125+08:00 level=WARN msg="Received unexpected chunk messageId 97:1:0, last-chunk-id -1, chunkId = 1, total-chunks -1" name=piefa topic=persistent://public/default/my-topic-100006541 subscription=chunk-subscriber consumerID=1
time=2025-12-10T23:22:24.125+08:00 level=DEBUG msg="Received command: type:MESSAGE message:{consumer_id:1 message_id:{ledgerId:97 entryId:2 partition:-1}} -- payload: &{[14 1 123 243 95 91 0 0 0 59 10 15 115 116 97 110 100 97 108 111 110 101 45 48 45 52 56 16 0 24 149 131 236 198 176 51 72 232 7 210 1 17 115 116 97 110 100 97 108 111 110 101 45 48 45 52 56 45 48 216 1 10 224 1 232 7 232 1 2 47 10 12 16 31 77 26 72 84 22 22 62 57 48 86 0 32 95 88 50 70 33 2 21 63 84 50 60 68 79 91 30 8 47 60 76 21 20 14 91 22 55 56 11 41 93 71 48 49 77 53 70 69 1 46 52 81 49 4 85 49 17 7 19 47 69 73 60 94 45 79 92 65 16 9 50 1 56 23 39 25 53 91 58 17 12 99 3 37 68 4 41 24 98 59 20 39 93 69 55] 0 169}" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:24.125+08:00 level=DEBUG msg="Got Message: consumer_id:1 message_id:{ledgerId:97 entryId:2 partition:-1}" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:24.125+08:00 level=DEBUG msg="Received chunk messageId 97:2:0, uuid = standalone-0-48-0, chunkId = 2, total-chunks 10" name=piefa topic=persistent://public/default/my-topic-100006541 subscription=chunk-subscriber consumerID=1
time=2025-12-10T23:22:24.125+08:00 level=WARN msg="Received unexpected chunk messageId 97:2:0, last-chunk-id -1, chunkId = 2, total-chunks -1" name=piefa topic=persistent://public/default/my-topic-100006541 subscription=chunk-subscriber consumerID=1
time=2025-12-10T23:22:24.125+08:00 level=DEBUG msg="Received command: type:MESSAGE message:{consumer_id:1 message_id:{ledgerId:97 entryId:3 partition:-1}} -- payload: &{[14 1 163 64 200 170 0 0 0 59 10 15 115 116 97 110 100 97 108 111 110 101 45 48 45 52 56 16 0 24 149 131 236 198 176 51 72 232 7 210 1 17 115 116 97 110 100 97 108 111 110 101 45 48 45 52 56 45 48 216 1 10 224 1 232 7 232 1 3 6 15 75 12 10 18 58 96 53 27 69 10 74 10 29 75 44 21 16 93 78 54 0 96 27 74 21 38 56 30 61 69 32 3 95 32 64 92 19 72 64 92 66 58 16 6 13 26 89 11 7 17 47 33 33 30 28 95 88 68 61 20 97 59 78 87 36 73 23 37 48 48 97 44 59 87 55 82 88 26 35 73 85 25 57 56 43 22 65 91 88 77 35 24 36 86 58 69 77 18] 0 169}" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:24.125+08:00 level=DEBUG msg="Got Message: consumer_id:1 message_id:{ledgerId:97 entryId:3 partition:-1}" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:24.125+08:00 level=DEBUG msg="Received chunk messageId 97:3:0, uuid = standalone-0-48-0, chunkId = 3, total-chunks 10" name=piefa topic=persistent://public/default/my-topic-100006541 subscription=chunk-subscriber consumerID=1
time=2025-12-10T23:22:24.125+08:00 level=WARN msg="Received unexpected chunk messageId 97:3:0, last-chunk-id -1, chunkId = 3, total-chunks -1" name=piefa topic=persistent://public/default/my-topic-100006541 subscription=chunk-subscriber consumerID=1
time=2025-12-10T23:22:24.125+08:00 level=DEBUG msg="Received command: type:MESSAGE message:{consumer_id:1 message_id:{ledgerId:98 entryId:0 partition:-1}} -- payload: &{[14 1 166 70 133 38 0 0 0 59 10 15 115 116 97 110 100 97 108 111 110 101 45 48 45 52 56 16 0 24 149 131 236 198 176 51 72 232 7 210 1 17 115 116 97 110 100 97 108 111 110 101 45 48 45 52 56 45 48 216 1 10 224 1 232 7 232 1 4 11 64 27 65 84 23 75 7 82 73 74 96 25 84 92 91 30 86 97 80 93 86 48 88 1 28 45 73 19 40 4 88 51 86 82 28 87 80 66 37 51 60 93 32 42 31 28 87 21 2 83 68 39 31 69 48 64 2 53 12 42 49 79 90 49 46 10 8 66 43 83 42 13 66 99 51 37 11 72 18 41 67 16 78 88 43 7 87 10 5 57 17 57 51 81 59 84 63 88 72] 0 169}" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:24.125+08:00 level=DEBUG msg="Got Message: consumer_id:1 message_id:{ledgerId:98 entryId:0 partition:-1}" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:24.125+08:00 level=DEBUG msg="Received chunk messageId 98:0:0, uuid = standalone-0-48-0, chunkId = 4, total-chunks 10" name=piefa topic=persistent://public/default/my-topic-100006541 subscription=chunk-subscriber consumerID=1
time=2025-12-10T23:22:24.125+08:00 level=WARN msg="Received unexpected chunk messageId 98:0:0, last-chunk-id -1, chunkId = 4, total-chunks -1" name=piefa topic=persistent://public/default/my-topic-100006541 subscription=chunk-subscriber consumerID=1
time=2025-12-10T23:22:24.125+08:00 level=DEBUG msg="Received command: type:MESSAGE message:{consumer_id:1 message_id:{ledgerId:98 entryId:1 partition:-1}} -- payload: &{[14 1 19 82 39 227 0 0 0 59 10 15 115 116 97 110 100 97 108 111 110 101 45 48 45 52 56 16 0 24 149 131 236 198 176 51 72 232 7 210 1 17 115 116 97 110 100 97 108 111 110 101 45 48 45 52 56 45 48 216 1 10 224 1 232 7 232 1 5 77 40 83 77 28 16 22 90 57 70 28 93 6 37 28 62 40 75 46 78 20 60 49 9 25 47 34 1 69 72 92 61 20 49 42 22 35 72 62 74 42 39 53 3 59 59 35 38 38 29 65 86 80 74 13 67 80 58 87 26 62 94 7 22 48 13 74 48 47 6 93 82 63 31 77 89 90 30 44 33 74 45 72 36 60 84 53 54 20 95 33 74 26 0 38 52 80 10 95 57] 0 169}" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:24.125+08:00 level=DEBUG msg="Got Message: consumer_id:1 message_id:{ledgerId:98 entryId:1 partition:-1}" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:24.125+08:00 level=DEBUG msg="Received chunk messageId 98:1:0, uuid = standalone-0-48-0, chunkId = 5, total-chunks 10" name=piefa topic=persistent://public/default/my-topic-100006541 subscription=chunk-subscriber consumerID=1
time=2025-12-10T23:22:24.125+08:00 level=WARN msg="Received unexpected chunk messageId 98:1:0, last-chunk-id -1, chunkId = 5, total-chunks -1" name=piefa topic=persistent://public/default/my-topic-100006541 subscription=chunk-subscriber consumerID=1
time=2025-12-10T23:22:24.125+08:00 level=DEBUG msg="Received command: type:MESSAGE message:{consumer_id:1 message_id:{ledgerId:98 entryId:2 partition:-1}} -- payload: &{[14 1 142 95 175 142 0 0 0 59 10 15 115 116 97 110 100 97 108 111 110 101 45 48 45 52 56 16 0 24 149 131 236 198 176 51 72 232 7 210 1 17 115 116 97 110 100 97 108 111 110 101 45 48 45 52 56 45 48 216 1 10 224 1 232 7 232 1 6 98 72 20 85 99 2 82 43 73 45 72 81 3 49 3 30 31 97 38 11 88 9 57 1 56 93 76 25 97 7 20 72 9 78 94 47 50 64 86 29 46 76 16 72 26 88 65 9 85 53 82 18 64 16 55 36 63 4 3 10 38 52 20 7 16 59 42 11 78 32 98 61 2 63 73 23 30 82 64 51 27 59 74 69 23 30 42 88 78 30 33 48 91 75 63 40 25 33 8 73] 0 169}" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:24.125+08:00 level=DEBUG msg="Got Message: consumer_id:1 message_id:{ledgerId:98 entryId:2 partition:-1}" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:24.125+08:00 level=DEBUG msg="Received chunk messageId 98:2:0, uuid = standalone-0-48-0, chunkId = 6, total-chunks 10" name=piefa topic=persistent://public/default/my-topic-100006541 subscription=chunk-subscriber consumerID=1
time=2025-12-10T23:22:24.125+08:00 level=WARN msg="Received unexpected chunk messageId 98:2:0, last-chunk-id -1, chunkId = 6, total-chunks -1" name=piefa topic=persistent://public/default/my-topic-100006541 subscription=chunk-subscriber consumerID=1
time=2025-12-10T23:22:24.125+08:00 level=DEBUG msg="Received command: type:MESSAGE message:{consumer_id:1 message_id:{ledgerId:98 entryId:3 partition:-1}} -- payload: &{[14 1 57 223 96 140 0 0 0 59 10 15 115 116 97 110 100 97 108 111 110 101 45 48 45 52 56 16 0 24 149 131 236 198 176 51 72 232 7 210 1 17 115 116 97 110 100 97 108 111 110 101 45 48 45 52 56 45 48 216 1 10 224 1 232 7 232 1 7 86 99 22 15 0 16 6 78 46 78 39 10 24 48 1 41 83 40 70 46 68 12 9 90 2 94 31 48 98 12 9 43 24 96 45 76 8 21 20 98 40 79 66 77 17 70 41 89 60 0 89 94 7 80 80 96 38 6 47 67 30 83 21 64 72 58 46 18 25 70 73 41 44 72 49 89 17 47 71 76 39 74 76 68 83 39 38 63 29 25 69 78 61 55 74 19 30 81 89 58] 0 169}" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:24.125+08:00 level=DEBUG msg="Got Message: consumer_id:1 message_id:{ledgerId:98 entryId:3 partition:-1}" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:24.125+08:00 level=DEBUG msg="Received chunk messageId 98:3:0, uuid = standalone-0-48-0, chunkId = 7, total-chunks 10" name=piefa topic=persistent://public/default/my-topic-100006541 subscription=chunk-subscriber consumerID=1
time=2025-12-10T23:22:24.125+08:00 level=WARN msg="Received unexpected chunk messageId 98:3:0, last-chunk-id -1, chunkId = 7, total-chunks -1" name=piefa topic=persistent://public/default/my-topic-100006541 subscription=chunk-subscriber consumerID=1
time=2025-12-10T23:22:24.125+08:00 level=DEBUG msg="Received command: type:MESSAGE message:{consumer_id:1 message_id:{ledgerId:98 entryId:4 partition:-1}} -- payload: &{[14 1 151 219 240 29 0 0 0 59 10 15 115 116 97 110 100 97 108 111 110 101 45 48 45 52 56 16 0 24 149 131 236 198 176 51 72 232 7 210 1 17 115 116 97 110 100 97 108 111 110 101 45 48 45 52 56 45 48 216 1 10 224 1 232 7 232 1 8 84 46 75 10 93 38 83 90 32 47 89 45 4 20 12 89 77 81 73 92 86 24 29 50 82 68 77 35 53 83 32 50 62 65 6 27 1 40 95 35 95 60 35 15 3 29 23 0 45 98 42 85 79 97 93 22 65 14 55 75 56 67 47 67 23 82 38 40 69 39 29 95 79 23 85 14 31 78 95 99 59 34 95 25 37 26 89 28 36 22 56 7 23 39 12 94 21 60 86 1] 0 169}" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:24.125+08:00 level=DEBUG msg="Got Message: consumer_id:1 message_id:{ledgerId:98 entryId:4 partition:-1}" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:24.125+08:00 level=DEBUG msg="Received chunk messageId 98:4:0, uuid = standalone-0-48-0, chunkId = 8, total-chunks 10" name=piefa topic=persistent://public/default/my-topic-100006541 subscription=chunk-subscriber consumerID=1
time=2025-12-10T23:22:24.125+08:00 level=WARN msg="Received unexpected chunk messageId 98:4:0, last-chunk-id -1, chunkId = 8, total-chunks -1" name=piefa topic=persistent://public/default/my-topic-100006541 subscription=chunk-subscriber consumerID=1
time=2025-12-10T23:22:24.125+08:00 level=DEBUG msg="Received command: type:MESSAGE message:{consumer_id:1 message_id:{ledgerId:98 entryId:5 partition:-1}} -- payload: &{[14 1 251 40 81 129 0 0 0 59 10 15 115 116 97 110 100 97 108 111 110 101 45 48 45 52 56 16 0 24 149 131 236 198 176 51 72 232 7 210 1 17 115 116 97 110 100 97 108 111 110 101 45 48 45 52 56 45 48 216 1 10 224 1 232 7 232 1 9 23 56 79 60 66 30 85 0 15 77 28 96 48 57 73 70 0 96 99 39 20 15 81 78 92 81 10 34 91 63 0 83 36 11 90 28 89 78 81 26 67 73 65 15 6 22 87 89 68 49 92 80 10 41 86 16 80 30 47 89 21 11 12 61 95 79 43 76 61 74 44 39 60 75 72 98 52 49 17 55 23 58 17 46 93 65 27 36 14 69 66 79 3 11 87 74 34 43 72 67] 0 169}" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:24.125+08:00 level=DEBUG msg="Got Message: consumer_id:1 message_id:{ledgerId:98 entryId:5 partition:-1}" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:24.125+08:00 level=DEBUG msg="Received chunk messageId 98:5:0, uuid = standalone-0-48-0, chunkId = 9, total-chunks 10" name=piefa topic=persistent://public/default/my-topic-100006541 subscription=chunk-subscriber consumerID=1
time=2025-12-10T23:22:24.125+08:00 level=WARN msg="Received unexpected chunk messageId 98:5:0, last-chunk-id -1, chunkId = 9, total-chunks -1" name=piefa topic=persistent://public/default/my-topic-100006541 subscription=chunk-subscriber consumerID=1
    message_chunking_test.go:213: 
        	Error Trace:	/data/code/dev/pulsar-client-go/pulsar/message_chunking_test.go:213
        	Error:      	Received unexpected error:
        	            	context deadline exceeded
        	Test:       	TestChunkWithReconnection
time=2025-12-10T23:22:29.192+08:00 level=INFO msg="Closing consumer=1" name=piefa topic=persistent://public/default/my-topic-100006541 subscription=chunk-subscriber consumerID=1
time=2025-12-10T23:22:29.192+08:00 level=DEBUG msg="Write data: 17" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:29.192+08:00 level=DEBUG msg="Got command! type:SUCCESS success:{request_id:12} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:29.192+08:00 level=DEBUG msg="Received command: type:SUCCESS success:{request_id:12} -- payload: <nil>" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:29.192+08:00 level=INFO msg="Closed consumer" name=piefa topic=persistent://public/default/my-topic-100006541 subscription=chunk-subscriber consumerID=1
time=2025-12-10T23:22:29.193+08:00 level=DEBUG msg="exiting events loop" name=piefa topic=persistent://public/default/my-topic-100006541 subscription=chunk-subscriber consumerID=1
time=2025-12-10T23:22:29.193+08:00 level=INFO msg="Closing producer" topic=persistent://public/default/my-topic-100006541 producer_name=standalone-0-48 producerID=1
time=2025-12-10T23:22:29.193+08:00 level=INFO msg="close consumer, exit reconnect" name=piefa topic=persistent://public/default/my-topic-100006541 subscription=chunk-subscriber consumerID=1
time=2025-12-10T23:22:29.193+08:00 level=DEBUG msg="Write data: 16" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:29.193+08:00 level=DEBUG msg="Closing nack tracker" name=piefa topic=persistent://public/default/my-topic-100006541 subscription=chunk-subscriber consumerID=1
time=2025-12-10T23:22:29.193+08:00 level=DEBUG msg="exiting dispatch loop" name=piefa topic=persistent://public/default/my-topic-100006541 subscription=chunk-subscriber consumerID=1
time=2025-12-10T23:22:29.193+08:00 level=DEBUG msg="Got command! type:SUCCESS success:{request_id:13} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:29.193+08:00 level=DEBUG msg="Received command: type:SUCCESS success:{request_id:13} -- payload: <nil>" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:52280
time=2025-12-10T23:22:29.193+08:00 level=INFO msg="Closed producer" topic=persistent://public/default/my-topic-100006541 producer_name=standalone-0-48 producerID=1
--- FAIL: TestChunkWithReconnection (22.09s)
panic: runtime error: invalid memory address or nil pointer dereference [recovered]
	panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x40 pc=0xc9f2f7]

goroutine 20 [running]:
testing.tRunner.func1.2({0xe90b60, 0x196c250})
	/root/go/pkg/mod/golang.org/[email protected]/src/testing/testing.go:1632 +0x230
testing.tRunner.func1()
	/root/go/pkg/mod/golang.org/[email protected]/src/testing/testing.go:1635 +0x35e
panic({0xe90b60?, 0x196c250?})
	/root/go/pkg/mod/golang.org/[email protected]/src/runtime/panic.go:785 +0x132
github.com/apache/pulsar-client-go/pulsar.TestChunkWithReconnection(0xc0000ca340)
	/data/code/dev/pulsar-client-go/pulsar/message_chunking_test.go:214 +0x7f7
testing.tRunner(0xc0000ca340, 0x102b4f0)
	/root/go/pkg/mod/golang.org/[email protected]/src/testing/testing.go:1690 +0xf4
created by testing.(*T).Run in goroutine 1
	/root/go/pkg/mod/golang.org/[email protected]/src/testing/testing.go:1743 +0x390


Process finished with the exit code 1

System configuration

Broker version: apache-pulsar-4.0.8
Client version: master bdfa6a0

I believe the main issue is that the chunked messages are not returned to the consumer.Receive() method until the message with chunkID = GetNumChunksFromMsg() - 1 is pushed to consumer's receiver queue. In other words, none of the chunked messages are individually acknowledged until every chunked message is pushed to the consumer and assembled to a new message.
Consequently, when the client reconnect, broker will redelivers all unacknowledged chunked messages to this consumer. However, during reconnection, the ctx in consumer's pc.chunkedMsgCtxMap is not cleared, causing conflicts between the redelivered message chunkID and the ctx.lastChunkedMsgIDrecorded in pc.chunkedMsgCtxMap. I think maybe we need to resetting the contents of pc.chunkedMsgCtxMap in grabCnx during the reconnection process.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions