Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
271ff36
Add support for region level Scan Metrics
Mar 30, 2025
32a7e8e
Make TestScanMetricsByRegion work for sync client
Mar 30, 2025
a91e6fe
Fix checkstyle issues
Mar 31, 2025
c88b6d7
Fix spot bug
Mar 31, 2025
5babeaa
Apply spotless fixes
Mar 31, 2025
bab2be1
Minor nit
Mar 31, 2025
8fc1be9
Extend ScanMetrics to capture region level metrics
Apr 24, 2025
fdc8c5f
Refactor ScanMetrics by region usage
Apr 25, 2025
666e191
Do non-test changes to refactor ScanMetrics by region
Apr 25, 2025
37e2c24
Add test coverage for new ScanMetricsByRegion design
Apr 28, 2025
07c5628
Add test covrage for non-snapshot scanners
Apr 29, 2025
0457771
Complete adding test coverage
Apr 29, 2025
4c840bb
Run spotless:apply
Apr 29, 2025
e1f0470
Remove extra changes
Apr 29, 2025
4fa7f79
Merge remote-tracking branch 'apache/master' into granular-scan-metrics
Apr 29, 2025
9f04f29
Remover unwanted changes
Apr 29, 2025
87c6cf9
Run spotless apply
Apr 29, 2025
0c65103
Fix spotbugs check
Apr 30, 2025
dfb41dd
Add test coverage for RIT scenario
May 3, 2025
011bae7
Do spotless:apply
May 3, 2025
0cdc421
Empty commit to trigger build
May 4, 2025
392dca3
Fix style checks
May 9, 2025
9a9bd25
Fix spotless
May 9, 2025
54189d8
Add getters for ScanMetrics to fix spotbugs
May 11, 2025
bca08ef
Make API backward compatible
Jun 5, 2025
2c7788c
Spotless:apply
Jun 5, 2025
4ded690
Address the remaining comments not invovling redesign
Jun 5, 2025
83081cb
Address the remaining comments not invovling redesign
Jun 5, 2025
746029e
Spotless:apply
Jun 5, 2025
4d2fdf8
Add UT in a separate class
Jun 6, 2025
286d7c9
Address Viraj's comments
Jun 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.client;

import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.yetus.audience.InterfaceAudience;

Expand All @@ -25,15 +27,33 @@
*/
@InterfaceAudience.Private
public abstract class AbstractClientScanner implements ResultScanner {
protected ScanMetrics scanMetrics;
protected ScanMetrics scanMetrics = null;
protected List<ScanMetrics> scanMetricsByRegion;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Over-all question: why do you always add this List<ScanMetrics> scanMetricsByRegion instead of using the existing instance ScanMetrics scanMetrics as an accumulator and fold in the new metrics as they arrive into this instance? You could initialize it once and have less null-checking going on. You do this in all the classes that interact with this new feature.


/**
* Check and initialize list for collecting scan metrics if application wants to collect scan
* metrics per region
*/
protected void initScanMetricsByRegion(Scan scan) {
// check if application wants to collect scan metrics
if (scan.isScanMetricsEnabled() && scan.isScanMetricsByRegionEnabled()) {
scanMetricsByRegion = new ArrayList<>();
}
}

/**
* Check and initialize if application wants to collect scan metrics
*/
protected void initScanMetrics(Scan scan) {
// check if application wants to collect scan metrics
if (scan.isScanMetricsEnabled()) {
scanMetrics = new ScanMetrics();
if (scanMetricsByRegion != null) {
scanMetrics = new ScanMetrics();
scanMetricsByRegion.add(scanMetrics);
} else if (scanMetrics == null) {
// Only initialize once
this.scanMetrics = new ScanMetrics();
}
}
}

Expand All @@ -44,6 +64,24 @@ protected void initScanMetrics(Scan scan) {
*/
@Override
public ScanMetrics getScanMetrics() {
return scanMetrics;
if (scanMetricsByRegion != null) {
if (scanMetricsByRegion.isEmpty()) {
return null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: imho, it's easier to read code that exits early, as opposed to deeply indented if/else blocks. Just my style preference.

} else if (scanMetricsByRegion.size() == 1) {
return scanMetricsByRegion.get(0);
}
ScanMetrics overallScanMetrics = new ScanMetrics();
for (ScanMetrics otherScanMetrics : scanMetricsByRegion) {
overallScanMetrics.combineMetrics(otherScanMetrics);
}
return overallScanMetrics;
} else {
return scanMetrics;
}
}

@Override
public List<ScanMetrics> getScanMetricsByRegion() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have this method at all? Is it not sufficient to return the "overall" ScanMetics object?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh wait, it's because you haven't added the per-region scan metrics to the existing object. Why not?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's because you haven't added the per-region scan metrics to the existing object. Why not?

Actually I didn't consider that. But I like this idea of folding region level metrics into existing ScanMetrics object. Thanks

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm surprised that this method with this name returns a list and not a Map<encodedRegionName -> ScanMetrics>.

return scanMetricsByRegion;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -67,7 +69,7 @@ class AsyncClientScanner {
// AsyncScanSingleRegionRpcRetryingCaller will modify this scan object directly.
private final Scan scan;

private final ScanMetrics scanMetrics;
private ScanMetrics scanMetrics;

private final AdvancedScanResultConsumer consumer;

Expand All @@ -92,6 +94,7 @@ class AsyncClientScanner {
private final ScanResultCache resultCache;

private final Span span;
private List<ScanMetrics> scanMetricByRegion;

private final Map<String, byte[]> requestAttributes;

Expand Down Expand Up @@ -119,8 +122,15 @@ public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableN
this.resultCache = createScanResultCache(scan);
this.requestAttributes = requestAttributes;
if (scan.isScanMetricsEnabled()) {
this.scanMetrics = new ScanMetrics();
consumer.onScanMetricsCreated(scanMetrics);
if (scan.isScanMetricsByRegionEnabled()) {
// When scan metrics per region is enabled we will create scan metrics while calling
// openScanner.
this.scanMetricByRegion = new ArrayList<>();
consumer.onScanMetricsByRegionEnabled(scanMetricByRegion);
} else {
this.scanMetrics = new ScanMetrics();
consumer.onScanMetricsCreated(scanMetrics);
}
} else {
this.scanMetrics = null;
}
Expand Down Expand Up @@ -165,6 +175,10 @@ public OpenScannerResponse(HRegionLocation loc, boolean isRegionServerRemote, In
private CompletableFuture<OpenScannerResponse> callOpenScanner(HBaseRpcController controller,
HRegionLocation loc, ClientService.Interface stub) {
try (Scope ignored = span.makeCurrent()) {
if (this.scanMetrics != null && scan.isScanMetricsByRegionEnabled()) {
this.scanMetrics.setServerName(loc.getServerName());
this.scanMetrics.setEncodedRegionName(loc.getRegion().getEncodedName());
}
boolean isRegionServerRemote = isRemote(loc.getHostname());
incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
if (openScannerTries.getAndIncrement() > 1) {
Expand Down Expand Up @@ -250,6 +264,10 @@ private long getPrimaryTimeoutNs() {
}

private void openScanner() {
if (scan.isScanMetricsEnabled() && scan.isScanMetricsByRegionEnabled()) {
this.scanMetrics = new ScanMetrics();
this.scanMetricByRegion.add(this.scanMetrics);
}
incRegionCountMetrics(scanMetrics);
openScannerTries.set(1);
addListener(timelineConsistentRead(conn.getLocator(), tableName, scan, scan.getStartRow(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,11 @@ private void scan0(Scan scan, ScanResultConsumer consumer) {
span = scanner.getSpan();
try (Scope ignored = span.makeCurrent()) {
if (scan.isScanMetricsEnabled()) {
consumer.onScanMetricsCreated(scanner.getScanMetrics());
if (scan.isScanMetricsByRegionEnabled()) {
consumer.onScanMetricsByRegionEnabled(scanner.getScanMetricsByRegion());
} else {
consumer.onScanMetricsCreated(scanner.getScanMetrics());
}
}
for (Result result; (result = scanner.next()) != null;) {
if (!consumer.onNext(result)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Queue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncScanSingleRegionRpcRetryingCaller.ScanResumerImpl;
Expand Down Expand Up @@ -62,6 +63,7 @@ class AsyncTableResultScanner implements ResultScanner, AdvancedScanResultConsum

// Used to pass the span instance to the `AsyncTableImpl` from its underlying `rawAsyncTable`.
private Span span = null;
private List<ScanMetrics> scanMetricsByRegion;

public AsyncTableResultScanner(TableName tableName, Scan scan, long maxCacheSize) {
this.tableName = tableName;
Expand Down Expand Up @@ -211,7 +213,30 @@ synchronized boolean isSuspended() {

@Override
public ScanMetrics getScanMetrics() {
return scanMetrics;
if (scanMetricsByRegion != null) {
if (scanMetricsByRegion.isEmpty()) {
return null;
} else if (scanMetricsByRegion.size() == 1) {
return scanMetricsByRegion.get(0);
}
ScanMetrics overallScanMetrics = new ScanMetrics();
for (ScanMetrics otherScanMetrics : scanMetricsByRegion) {
overallScanMetrics.combineMetrics(otherScanMetrics);
}
return overallScanMetrics;
} else {
return scanMetrics;
}
}

@Override
public void onScanMetricsByRegionEnabled(List<ScanMetrics> scanMetricsByRegion) {
this.scanMetricsByRegion = scanMetricsByRegion;
}

@Override
public List<ScanMetrics> getScanMetricsByRegion() {
return scanMetricsByRegion;
}

int getCacheSize() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,8 @@ default Result[] next(int nbRows) throws IOException {

/** Returns the scan metrics, or {@code null} if we do not enable metrics. */
ScanMetrics getScanMetrics();

default List<ScanMetrics> getScanMetricsByRegion() {
throw new UnsupportedOperationException("Scan Metrics by region is not supported");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we return null (or empty list, whatever we decide on) instead of throwing? Wouldn't that be more consistent with the existing interfaces?

I'm also not clear on whether we need this method at all.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ public class Scan extends Query {
// define this attribute with the appropriate table name by calling
// scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(tableName))
static public final String SCAN_ATTRIBUTES_TABLE_NAME = "scan.attributes.table.name";
static private final String SCAN_ATTRIBUTES_METRICS_BY_REGION_ENABLE =
"scan.attributes.metrics.byRegion.enable";

/**
* -1 means no caching specified and the value of {@link HConstants#HBASE_CLIENT_SCANNER_CACHING}
Expand Down Expand Up @@ -1030,4 +1032,15 @@ public boolean isNeedCursorResult() {
public static Scan createScanFromCursor(Cursor cursor) {
return new Scan().withStartRow(cursor.getRow());
}

public Scan setEnableScanMetricsByRegion(final boolean enable) {
setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_BY_REGION_ENABLE,
Bytes.toBytes(Boolean.valueOf(enable)));
return this;
}

public boolean isScanMetricsByRegionEnabled() {
byte[] attr = getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_BY_REGION_ENABLE);
return attr != null && Bytes.toBoolean(attr);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.client;

import java.util.List;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.yetus.audience.InterfaceAudience;

Expand All @@ -38,11 +39,22 @@ public interface ScanResultConsumerBase {
void onComplete();

/**
* If {@code scan.isScanMetricsEnabled()} returns true, then this method will be called prior to
* all other methods in this interface to give you the {@link ScanMetrics} instance for this scan
* operation. The {@link ScanMetrics} instance will be updated on-the-fly during the scan, you can
* store it somewhere to get the metrics at any time if you want.
* If {@code scan.isScanMetricsEnabled()} returns true and
* {@code scan.isScanMetricsByRegionEnabled()} returns false, then this method will be called
* prior to all other methods in this interface to give you the {@link ScanMetrics} instance for
* this scan operation. The {@link ScanMetrics} instance will be updated on-the-fly during the
* scan, you can store it somewhere to get the metrics at any time if you want.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So is this object underneath this method no longer maintained when the scan.isScanMetricsByRegionEnabled() returns true? The javadoc now implies to me that it is NOT maintained, which is a backward compatibility problem (and, imho, an ergonomics problem).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't get how this is backward incompatibility problem as current use cases using this method will have scanMetricsByRegionEnabled() set to false and for them ScanMetrics object is still tracked. For use cases which plan to use this new feature they need to rely on new object (list of ScanMetrics).
(But I think this issue will also get solved if I extend existing ScanMetrics to track region level metrics instead of maintaining a list of ScanMetrics). Thanks

*/
default void onScanMetricsCreated(ScanMetrics scanMetrics) {
}

/**
* If {@code scan.isScanMetricsEnabled()} and {@code scan.isScanMetricsByRegionEnabled()} both
* return true, then this method will be called prior to calling all the methods in this interface
* to give you the list for storing scan metric per region for this scan operation. The list will
* be get populated with per region scan metrics on-the-fly during the scan, you can store the
* provided list somewhere to get scan metrics by region later when you want.
*/
default void onScanMetricsByRegionEnabled(List<ScanMetrics> scanMetricsByRegion) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
Expand All @@ -33,7 +35,9 @@ public class ServerSideScanMetrics {
/**
* Hash to hold the String -&gt; Atomic Long mappings for each metric
*/
private final Map<String, AtomicLong> counters = new HashMap<>();
protected final Map<String, AtomicLong> counters = new HashMap<>();
private ServerName serverName;
private String encodedRegionName;

/**
* Create a new counter with the specified name
Expand Down Expand Up @@ -117,4 +121,64 @@ public Map<String, Long> getMetricsMap(boolean reset) {
// Build the immutable map so that people can't mess around with it.
return builder.build();
}

public ServerName getServerName() {
return this.serverName;
}

public void setServerName(ServerName serverName) {
if (this.serverName == null) {
this.serverName = serverName;
}
}

public void setEncodedRegionName(String encodedRegionName) {
if (this.encodedRegionName == null) {
this.encodedRegionName = encodedRegionName;
}
}

public String getEncodedRegionName() {
return this.encodedRegionName;
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("ServerName:");
sb.append(this.serverName);
sb.append(",EncodedRegion:");
sb.append(this.encodedRegionName);
sb.append(",[");
boolean isFirstMetric = true;
for (Map.Entry<String, AtomicLong> e : this.counters.entrySet()) {
if (isFirstMetric) {
isFirstMetric = false;
} else {
sb.append(",");
}
sb.append(e.getKey());
sb.append(":");
sb.append(e.getValue().get());
}
sb.append("]");
return sb.toString();
}

public void combineMetrics(ScanMetrics other) {
for (Map.Entry<String, AtomicLong> entry : other.counters.entrySet()) {
String counterName = entry.getKey();
AtomicLong counter = entry.getValue();
this.addToCounter(counterName, counter.get());
}
if (
this.encodedRegionName != null
&& !Objects.equals(this.encodedRegionName, other.getEncodedRegionName())
) {
this.encodedRegionName = null;
}
if (this.serverName != null && !Objects.equals(this.serverName, other.getServerName())) {
this.serverName = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious, how does setting null for encodedRegionName and serverName help? Are we using them to log something or return them with ScanMetrics?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

encodedRegionName and serverName are not part of ScanMetrics. When combining ScanMetrics if we observe that the ScanMetrics object in which all other ScanMetrics are getting combined has different serverName and encodedRegionName then we set these two fields to null as now we can't deterministically pick any one value. Plus, the assumption is someone getting combined ScanMetrics is not interested in region level details as we have separate getter for getting list of region level ScanMetrics.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that this is not immediately obvious. Please add a comment explaining your thought process.

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -87,9 +88,17 @@ public ClientSideRegionScanner(Configuration conf, FileSystem fs, Path rootDir,
values = new ArrayList<>();

if (scanMetrics == null) {
initScanMetricsByRegion(scan);
initScanMetrics(scan);
} else {
this.scanMetrics = scanMetrics;
if (scan.isScanMetricsByRegionEnabled()) {
this.scanMetricsByRegion = Collections.singletonList(this.scanMetrics);
}
}
if (this.scanMetrics != null && scan.isScanMetricsByRegionEnabled()) {
this.scanMetrics.setEncodedRegionName(region.getRegionInfo().getEncodedName());
// The server name will be null in scan metrics as this is a client side region scanner
}
region.startRegionOperation();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public TableSnapshotScanner(Configuration conf, Path rootDir, Path restoreDir,
openWithRestoringSnapshot();
}

initScanMetrics(scan);
initScanMetricsByRegion(scan);
}

private void openWithoutRestoringSnapshot() throws IOException {
Expand Down Expand Up @@ -184,6 +184,7 @@ public Result next() throws IOException {
}

RegionInfo hri = regions.get(currentRegion);
initScanMetrics(scan);
currentRegionScanner =
new ClientSideRegionScanner(conf, fs, restoreDir, htd, hri, scan, scanMetrics);
if (this.scanMetrics != null) {
Expand Down
Loading