diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 6cbc99e2cf4d4..65dd90f7a1235 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -112,6 +112,7 @@ import org.apache.pulsar.broker.service.schema.SchemaStorageFactory; import org.apache.pulsar.broker.stats.MetricsGenerator; import org.apache.pulsar.broker.stats.OpenTelemetryConsumerStats; +import org.apache.pulsar.broker.stats.OpenTelemetryProducerStats; import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats; import org.apache.pulsar.broker.stats.PulsarBrokerOpenTelemetry; import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet; @@ -258,6 +259,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { private final PulsarBrokerOpenTelemetry openTelemetry; private OpenTelemetryTopicStats openTelemetryTopicStats; private OpenTelemetryConsumerStats openTelemetryConsumerStats; + private OpenTelemetryProducerStats openTelemetryProducerStats; private TransactionMetadataStoreService transactionMetadataStoreService; private TransactionBufferProvider transactionBufferProvider; @@ -676,6 +678,10 @@ public CompletableFuture closeAsync() { brokerClientSharedTimer.stop(); monotonicSnapshotClock.close(); + if (openTelemetryProducerStats != null) { + openTelemetryProducerStats.close(); + openTelemetryProducerStats = null; + } if (openTelemetryConsumerStats != null) { openTelemetryConsumerStats.close(); openTelemetryConsumerStats = null; @@ -827,6 +833,7 @@ public void start() throws PulsarServerException { openTelemetryTopicStats = new OpenTelemetryTopicStats(this); openTelemetryConsumerStats = new OpenTelemetryConsumerStats(this); + openTelemetryProducerStats = new OpenTelemetryProducerStats(this); localMetadataSynchronizer = StringUtils.isNotBlank(config.getMetadataSyncEventTopic()) ? new PulsarMetadataEventSynchronizer(this, config.getMetadataSyncEventTopic()) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index cf54ffea7db66..b4578711027ef 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -23,10 +23,12 @@ import static org.apache.pulsar.common.protocol.Commands.hasChecksum; import static org.apache.pulsar.common.protocol.Commands.readChecksum; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.CaseFormat; import com.google.common.base.MoreObjects; import io.netty.buffer.ByteBuf; import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; +import io.opentelemetry.api.common.Attributes; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -36,6 +38,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.apache.bookkeeper.mledger.Position; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.intercept.BrokerInterceptor; @@ -57,8 +60,8 @@ import org.apache.pulsar.common.policies.data.stats.PublisherStatsImpl; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.schema.SchemaVersion; -import org.apache.pulsar.common.stats.Rate; import org.apache.pulsar.common.util.DateFormatter; +import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,10 +77,6 @@ public class Producer { private final long producerId; private final String appId; private final BrokerInterceptor brokerInterceptor; - private Rate msgIn; - private Rate chunkedMessageRate; - // it records msg-drop rate only for non-persistent topic - private final Rate msgDrop; private volatile long pendingPublishAcks = 0; private static final AtomicLongFieldUpdater pendingPublishAcksUpdater = AtomicLongFieldUpdater @@ -87,6 +86,10 @@ public class Producer { private final CompletableFuture closeFuture; private final PublisherStatsImpl stats; + private volatile Attributes attributes = null; + private static final AtomicReferenceFieldUpdater ATTRIBUTES_FIELD_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(Producer.class, Attributes.class, "attributes"); + private final boolean isRemote; private final String remoteCluster; private final boolean isNonPersistentTopic; @@ -118,10 +121,7 @@ public Producer(Topic topic, TransportCnx cnx, long producerId, String producerN this.epoch = epoch; this.closeFuture = new CompletableFuture<>(); this.appId = appId; - this.msgIn = new Rate(); - this.chunkedMessageRate = new Rate(); this.isNonPersistentTopic = topic instanceof NonPersistentTopic; - this.msgDrop = this.isNonPersistentTopic ? new Rate() : null; this.isShadowTopic = topic instanceof PersistentTopic && ((PersistentTopic) topic).getShadowSourceTopic().isPresent(); @@ -270,7 +270,7 @@ public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf he private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, long batchSize, boolean isChunked, boolean isMarker, Position position) { MessagePublishContext messagePublishContext = - MessagePublishContext.get(this, sequenceId, msgIn, headersAndPayload.readableBytes(), + MessagePublishContext.get(this, sequenceId, headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker, position); if (brokerInterceptor != null) { brokerInterceptor @@ -282,7 +282,7 @@ private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, l private void publishMessageToTopic(ByteBuf headersAndPayload, long lowestSequenceId, long highestSequenceId, long batchSize, boolean isChunked, boolean isMarker, Position position) { MessagePublishContext messagePublishContext = MessagePublishContext.get(this, lowestSequenceId, - highestSequenceId, msgIn, headersAndPayload.readableBytes(), batchSize, + highestSequenceId, headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker, position); if (brokerInterceptor != null) { brokerInterceptor @@ -339,8 +339,8 @@ private void publishOperationCompleted() { } public void recordMessageDrop(int batchSize) { - if (this.isNonPersistentTopic) { - msgDrop.recordEvent(batchSize); + if (stats instanceof NonPersistentPublisherStatsImpl nonPersistentPublisherStats) { + nonPersistentPublisherStats.recordMsgDrop(batchSize); } } @@ -374,7 +374,6 @@ private static final class MessagePublishContext implements PublishContext, Runn private long sequenceId; private long ledgerId; private long entryId; - private Rate rateIn; private int msgSize; private long batchSize; private boolean chunked; @@ -536,13 +535,13 @@ public void run() { } // stats - rateIn.recordMultipleEvents(batchSize, msgSize); + producer.stats.recordMsgIn(batchSize, msgSize); producer.topic.recordAddLatency(System.nanoTime() - startTimeNs, TimeUnit.NANOSECONDS); producer.cnx.getCommandSender().sendSendReceiptResponse(producer.producerId, sequenceId, highestSequenceId, ledgerId, entryId); producer.cnx.completedSendOperation(producer.isNonPersistentTopic, msgSize); if (this.chunked) { - producer.chunkedMessageRate.recordEvent(); + producer.stats.recordChunkedMsgIn(); } producer.publishOperationCompleted(); if (producer.brokerInterceptor != null) { @@ -552,12 +551,11 @@ public void run() { recycle(); } - static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn, int msgSize, - long batchSize, boolean chunked, long startTimeNs, boolean isMarker, Position position) { + static MessagePublishContext get(Producer producer, long sequenceId, int msgSize, long batchSize, + boolean chunked, long startTimeNs, boolean isMarker, Position position) { MessagePublishContext callback = RECYCLER.get(); callback.producer = producer; callback.sequenceId = sequenceId; - callback.rateIn = rateIn; callback.msgSize = msgSize; callback.batchSize = batchSize; callback.chunked = chunked; @@ -573,13 +571,12 @@ static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn return callback; } - static MessagePublishContext get(Producer producer, long lowestSequenceId, long highestSequenceId, Rate rateIn, - int msgSize, long batchSize, boolean chunked, long startTimeNs, boolean isMarker, Position position) { + static MessagePublishContext get(Producer producer, long lowestSequenceId, long highestSequenceId, int msgSize, + long batchSize, boolean chunked, long startTimeNs, boolean isMarker, Position position) { MessagePublishContext callback = RECYCLER.get(); callback.producer = producer; callback.sequenceId = lowestSequenceId; callback.highestSequenceId = highestSequenceId; - callback.rateIn = rateIn; callback.msgSize = msgSize; callback.batchSize = batchSize; callback.originalProducerName = null; @@ -628,7 +625,6 @@ public void recycle() { highestSequenceId = -1L; originalSequenceId = -1L; originalHighestSequenceId = -1L; - rateIn = null; msgSize = 0; ledgerId = -1L; entryId = -1L; @@ -733,25 +729,12 @@ public void topicMigrated(Optional clusterUrl) { } public void updateRates() { - msgIn.calculateRate(); - chunkedMessageRate.calculateRate(); - stats.msgRateIn = msgIn.getRate(); - stats.msgThroughputIn = msgIn.getValueRate(); - stats.averageMsgSize = msgIn.getAverageValue(); - stats.chunkedMessageRate = chunkedMessageRate.getRate(); - if (chunkedMessageRate.getCount() > 0 && this.topic instanceof PersistentTopic) { - ((PersistentTopic) this.topic).msgChunkPublished = true; - } - if (this.isNonPersistentTopic) { - msgDrop.calculateRate(); - ((NonPersistentPublisherStatsImpl) stats).msgDropRate = msgDrop.getValueRate(); + stats.calculateRates(); + if (stats.getMsgChunkIn().getCount() > 0 && topic instanceof PersistentTopic persistentTopic) { + persistentTopic.msgChunkPublished = true; } } - public void updateRates(int numOfMessages, long msgSizeInBytes) { - msgIn.recordMultipleEvents(numOfMessages, msgSizeInBytes); - } - public boolean isRemote() { return isRemote; } @@ -817,7 +800,7 @@ public void publishTxnMessage(TxnID txnID, long producerId, long sequenceId, lon return; } MessagePublishContext messagePublishContext = - MessagePublishContext.get(this, sequenceId, highSequenceId, msgIn, + MessagePublishContext.get(this, sequenceId, highSequenceId, headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker, null); if (brokerInterceptor != null) { brokerInterceptor @@ -871,4 +854,29 @@ public void incrementThrottleCount() { public void decrementThrottleCount() { cnx.decrementThrottleCount(); } + + public Attributes getOpenTelemetryAttributes() { + if (attributes != null) { + return attributes; + } + return ATTRIBUTES_FIELD_UPDATER.updateAndGet(this, old -> { + if (old != null) { + return old; + } + var topicName = TopicName.get(topic.getName()); + var builder = Attributes.builder() + .put(OpenTelemetryAttributes.PULSAR_PRODUCER_NAME, producerName) + .put(OpenTelemetryAttributes.PULSAR_PRODUCER_ID, producerId) + .put(OpenTelemetryAttributes.PULSAR_PRODUCER_ACCESS_MODE, + CaseFormat.UPPER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, accessMode.name())) + .put(OpenTelemetryAttributes.PULSAR_DOMAIN, topicName.getDomain().toString()) + .put(OpenTelemetryAttributes.PULSAR_TENANT, topicName.getTenant()) + .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, topicName.getNamespace()) + .put(OpenTelemetryAttributes.PULSAR_TOPIC, topicName.getPartitionedTopicName()); + if (topicName.isPartitioned()) { + builder.put(OpenTelemetryAttributes.PULSAR_PARTITION_INDEX, topicName.getPartitionIndex()); + } + return builder.build(); + }); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryProducerStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryProducerStats.java new file mode 100644 index 0000000000000..9c09804554c31 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryProducerStats.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats; + +import io.opentelemetry.api.metrics.BatchCallback; +import io.opentelemetry.api.metrics.ObservableLongMeasurement; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.service.Producer; +import org.apache.pulsar.common.policies.data.stats.NonPersistentPublisherStatsImpl; + +public class OpenTelemetryProducerStats implements AutoCloseable { + + // Replaces pulsar_producer_msg_rate_in + public static final String MESSAGE_IN_COUNTER = "pulsar.broker.producer.message.incoming.count"; + private final ObservableLongMeasurement messageInCounter; + + // Replaces pulsar_producer_msg_throughput_in + public static final String BYTES_IN_COUNTER = "pulsar.broker.producer.message.incoming.size"; + private final ObservableLongMeasurement bytesInCounter; + + public static final String MESSAGE_DROP_COUNTER = "pulsar.broker.producer.message.drop.count"; + private final ObservableLongMeasurement messageDropCounter; + + private final BatchCallback batchCallback; + + public OpenTelemetryProducerStats(PulsarService pulsar) { + var meter = pulsar.getOpenTelemetry().getMeter(); + + messageInCounter = meter + .counterBuilder(MESSAGE_IN_COUNTER) + .setUnit("{message}") + .setDescription("The total number of messages received from this producer.") + .buildObserver(); + + bytesInCounter = meter + .counterBuilder(BYTES_IN_COUNTER) + .setUnit("By") + .setDescription("The total number of messages bytes received from this producer.") + .buildObserver(); + + messageDropCounter = meter + .counterBuilder(MESSAGE_DROP_COUNTER) + .setUnit("{message}") + .setDescription("The total number of messages dropped from this producer.") + .buildObserver(); + + batchCallback = meter.batchCallback(() -> pulsar.getBrokerService() + .getTopics() + .values() + .stream() + .filter(future -> future.isDone() && !future.isCompletedExceptionally()) + .map(CompletableFuture::join) + .filter(Optional::isPresent) + .flatMap(topic -> topic.get().getProducers().values().stream()) + .forEach(this::recordMetricsForProducer), + messageInCounter, + bytesInCounter, + messageDropCounter); + } + + @Override + public void close() { + batchCallback.close(); + } + + private void recordMetricsForProducer(Producer producer) { + var attributes = producer.getOpenTelemetryAttributes(); + var stats = producer.getStats(); + + messageInCounter.record(stats.getMsgInCounter(), attributes); + bytesInCounter.record(stats.getBytesInCounter(), attributes); + + if (stats instanceof NonPersistentPublisherStatsImpl nonPersistentStats) { + messageDropCounter.record(nonPersistentStats.getMsgDropCount(), attributes); + } + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryProducerStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryProducerStatsTest.java new file mode 100644 index 0000000000000..e273ac4446141 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryProducerStatsTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats; + +import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue; +import static org.assertj.core.api.Assertions.assertThat; +import io.opentelemetry.api.common.Attributes; +import lombok.Cleanup; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.broker.testcontext.PulsarTestContext; +import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class OpenTelemetryProducerStatsTest extends BrokerTestBase { + + @BeforeMethod(alwaysRun = true) + @Override + protected void setup() throws Exception { + super.baseSetup(); + } + + @AfterMethod(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Override + protected void customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder builder) { + super.customizeMainPulsarTestContextBuilder(builder); + builder.enableOpenTelemetry(true); + } + + + @Test(timeOut = 30_000) + public void testMessagingMetrics() throws Exception { + var topicName = BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/testProducerMessagingMetrics"); + admin.topics().createNonPartitionedTopic(topicName); + + var messageCount = 5; + var producerName = BrokerTestUtil.newUniqueName("testProducerName"); + + @Cleanup + var producer = pulsarClient.newProducer() + .producerName(producerName) + .topic(topicName) + .create(); + for (int i = 0; i < messageCount; i++) { + producer.send(String.format("msg-%d", i).getBytes()); + } + + var attributes = Attributes.builder() + .put(OpenTelemetryAttributes.PULSAR_DOMAIN, "persistent") + .put(OpenTelemetryAttributes.PULSAR_TENANT, "prop") + .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, "prop/ns-abc") + .put(OpenTelemetryAttributes.PULSAR_TOPIC, topicName) + .put(OpenTelemetryAttributes.PULSAR_PRODUCER_NAME, producerName) + .put(OpenTelemetryAttributes.PULSAR_PRODUCER_ID, 0) + .put(OpenTelemetryAttributes.PULSAR_PRODUCER_ACCESS_MODE, "shared") + .build(); + + var metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(); + + assertMetricLongSumValue(metrics, OpenTelemetryProducerStats.MESSAGE_IN_COUNTER, attributes, + actual -> assertThat(actual).isPositive()); + assertMetricLongSumValue(metrics, OpenTelemetryProducerStats.BYTES_IN_COUNTER, attributes, + actual -> assertThat(actual).isPositive()); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java index 336728f279eda..e99802a5bc5c4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java @@ -192,6 +192,7 @@ public void testMultipleBrokerLookup() throws Exception { // Disable collecting topic stats during this test, as it deadlocks on access to map BrokerService.topics. pulsar2.getOpenTelemetryTopicStats().close(); pulsar2.getOpenTelemetryConsumerStats().close(); + pulsar2.getOpenTelemetryProducerStats().close(); var metricReader = pulsarTestContext.getOpenTelemetryMetricReader(); var lookupRequestSemaphoreField = BrokerService.class.getDeclaredField("lookupRequestSemaphore"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java index 4f64c4271fe89..e5c992ec6f858 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.client.api; +import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue; +import static org.assertj.core.api.Assertions.assertThat; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; @@ -27,6 +29,7 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.google.common.collect.Sets; +import io.opentelemetry.api.common.Attributes; import java.net.URL; import java.util.HashSet; import java.util.Optional; @@ -50,6 +53,8 @@ import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.nonpersistent.NonPersistentReplicator; import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; +import org.apache.pulsar.broker.stats.OpenTelemetryProducerStats; +import org.apache.pulsar.broker.testcontext.PulsarTestContext; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.impl.ConsumerImpl; import org.apache.pulsar.client.impl.MessageIdImpl; @@ -65,6 +70,7 @@ import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.policies.data.TopicType; +import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes; import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; import org.apache.pulsar.zookeeper.ZookeeperServerTest; import org.awaitility.Awaitility; @@ -105,6 +111,12 @@ protected void cleanup() throws Exception { super.internalCleanup(); } + @Override + protected void customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder pulsarTestContextBuilder) { + super.customizeMainPulsarTestContextBuilder(pulsarTestContextBuilder); + pulsarTestContextBuilder.enableOpenTelemetry(true); + } + @Test(timeOut = 90000 /* 1.5mn */) public void testNonPersistentPartitionsAreNotAutoCreatedWhenThePartitionedTopicDoesNotExist() throws Exception { final boolean defaultAllowAutoTopicCreation = conf.isAllowAutoTopicCreation(); @@ -357,9 +369,12 @@ public void testProducerRateLimit() throws Exception { @Cleanup("shutdownNow") ExecutorService executor = Executors.newFixedThreadPool(5); AtomicBoolean failed = new AtomicBoolean(false); + @Cleanup Consumer consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("subscriber-1") .subscribe(); - Producer producer = pulsarClient.newProducer().topic(topic).create(); + var producerName = BrokerTestUtil.newUniqueName("testProducerRateLimit"); + @Cleanup + Producer producer = pulsarClient.newProducer().topic(topic).producerName(producerName).create(); byte[] msgData = "testData".getBytes(); final int totalProduceMessages = 10; CountDownLatch latch = new CountDownLatch(totalProduceMessages); @@ -392,7 +407,19 @@ public void testProducerRateLimit() throws Exception { // but as message should be dropped at broker: broker should not receive the message assertNotEquals(messageSet.size(), totalProduceMessages); - producer.close(); + // Verify the corresponding metric is updated + var attributes = Attributes.builder() + .put(OpenTelemetryAttributes.PULSAR_PRODUCER_NAME, producerName) + .put(OpenTelemetryAttributes.PULSAR_PRODUCER_ID, 0) + .put(OpenTelemetryAttributes.PULSAR_PRODUCER_ACCESS_MODE, "shared") + .put(OpenTelemetryAttributes.PULSAR_DOMAIN, "non-persistent") + .put(OpenTelemetryAttributes.PULSAR_TENANT, "my-property") + .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, "my-property/my-ns") + .put(OpenTelemetryAttributes.PULSAR_TOPIC, topic) + .build(); + var metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(); + assertMetricLongSumValue(metrics, OpenTelemetryProducerStats.MESSAGE_DROP_COUNTER, attributes, + value -> assertThat(value).isPositive()); } finally { conf.setMaxConcurrentNonPersistentMessagePerConnection(defaultNonPersistentMessageRate); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentPublisherStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentPublisherStatsImpl.java index adf3f92ae71fc..d62e9b8dbbeae 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentPublisherStatsImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentPublisherStatsImpl.java @@ -18,10 +18,12 @@ */ package org.apache.pulsar.common.policies.data.stats; +import com.fasterxml.jackson.annotation.JsonIgnore; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.Objects; import lombok.Getter; import org.apache.pulsar.common.policies.data.NonPersistentPublisherStats; +import org.apache.pulsar.common.stats.Rate; /** * Non-persistent publisher statistics. @@ -35,10 +37,28 @@ public class NonPersistentPublisherStatsImpl extends PublisherStatsImpl implemen @Getter public double msgDropRate; + @JsonIgnore + private final Rate msgDrop = new Rate(); + public NonPersistentPublisherStatsImpl add(NonPersistentPublisherStatsImpl stats) { Objects.requireNonNull(stats); super.add(stats); this.msgDropRate += stats.msgDropRate; return this; } + + public void calculateRates() { + super.calculateRates(); + msgDrop.calculateRate(); + msgDropRate = msgDrop.getRate(); + } + + public void recordMsgDrop(long numMessages) { + msgDrop.recordEvent(numMessages); + } + + @JsonIgnore + public long getMsgDropCount() { + return msgDrop.getTotalCount(); + } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/PublisherStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/PublisherStatsImpl.java index 304361bb2daec..3f9067eba0b25 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/PublisherStatsImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/PublisherStatsImpl.java @@ -23,6 +23,7 @@ import lombok.Data; import org.apache.pulsar.client.api.ProducerAccessMode; import org.apache.pulsar.common.policies.data.PublisherStats; +import org.apache.pulsar.common.stats.Rate; /** * Statistics about a publisher. @@ -64,6 +65,11 @@ public class PublisherStatsImpl implements PublisherStats { /** Metadata (key/value strings) associated with this publisher. */ public Map metadata; + @JsonIgnore + private final Rate msgIn = new Rate(); + @JsonIgnore + private final Rate msgChunkIn = new Rate(); + public PublisherStatsImpl add(PublisherStatsImpl stats) { if (stats == null) { throw new IllegalArgumentException("stats can't be null"); @@ -107,4 +113,37 @@ public String getClientVersion() { public void setClientVersion(String clientVersion) { this.clientVersion = clientVersion; } + + public void calculateRates() { + msgIn.calculateRate(); + msgChunkIn.calculateRate(); + + msgRateIn = msgIn.getRate(); + msgThroughputIn = msgIn.getValueRate(); + averageMsgSize = msgIn.getAverageValue(); + chunkedMessageRate = msgChunkIn.getRate(); + } + + public void recordMsgIn(long messageCount, long byteCount) { + msgIn.recordMultipleEvents(messageCount, byteCount); + } + + @JsonIgnore + public long getMsgInCounter() { + return msgIn.getTotalCount(); + } + + @JsonIgnore + public long getBytesInCounter() { + return msgIn.getTotalValue(); + } + + public void recordChunkedMsgIn() { + msgChunkIn.recordEvent(); + } + + @JsonIgnore + public long getChunkedMsgInCounter() { + return msgChunkIn.getTotalCount(); + } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/Rate.java b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/Rate.java index 886e31ab71216..936962d8ee544 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/Rate.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/Rate.java @@ -28,6 +28,7 @@ public class Rate { private final LongAdder valueAdder = new LongAdder(); private final LongAdder countAdder = new LongAdder(); private final LongAdder totalCountAdder = new LongAdder(); + private final LongAdder totalValueAdder = new LongAdder(); // Computed stats private long count = 0L; @@ -43,12 +44,14 @@ public void recordEvent() { public void recordEvent(long value) { valueAdder.add(value); + totalValueAdder.add(value); countAdder.increment(); totalCountAdder.increment(); } public void recordMultipleEvents(long events, long totalValue) { valueAdder.add(totalValue); + totalValueAdder.add(totalValue); countAdder.add(events); totalCountAdder.add(events); } @@ -88,4 +91,8 @@ public double getValueRate() { public long getTotalCount() { return this.totalCountAdder.longValue(); } + + public long getTotalValue() { + return this.totalValueAdder.sum(); + } } diff --git a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java index 9783f0e754f63..004741b6dfb55 100644 --- a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java +++ b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java @@ -87,6 +87,21 @@ public interface OpenTelemetryAttributes { */ AttributeKey PULSAR_CONSUMER_CONNECTED_SINCE = AttributeKey.longKey("pulsar.consumer.connected_since"); + /** + * The name of the Pulsar producer. + */ + AttributeKey PULSAR_PRODUCER_NAME = AttributeKey.stringKey("pulsar.producer.name"); + + /** + * The ID of the Pulsar producer. + */ + AttributeKey PULSAR_PRODUCER_ID = AttributeKey.longKey("pulsar.producer.id"); + + /** + * The access mode of the Pulsar producer. + */ + AttributeKey PULSAR_PRODUCER_ACCESS_MODE = AttributeKey.stringKey("pulsar.producer.access_mode"); + /** * The address of the Pulsar client. */