Skip to content

Commit

Permalink
TEZ-3331: Add operation specific HDFS counters for Tez UI (#379) (Las…
Browse files Browse the repository at this point in the history
…zlo Bodor reviewed by Ayush Saxena)
  • Loading branch information
abstractdog authored Dec 13, 2024
1 parent d84fdca commit 1f7465f
Show file tree
Hide file tree
Showing 7 changed files with 288 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,68 @@
package org.apache.tez.common.counters;

import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.fs.StorageStatistics.CommonStatisticNames;

/**
* FileSystemCounter is an enum for defining which filesystem/storage statistics are exposed in Tez.
*/
@Private
public enum FileSystemCounter {
BYTES_READ,
BYTES_WRITTEN,
READ_OPS,
LARGE_READ_OPS,
WRITE_OPS,
HDFS_BYTES_READ,
HDFS_BYTES_WRITTEN,
FILE_BYTES_READ,
FILE_BYTES_WRITTEN
BYTES_READ("bytesRead"),
BYTES_WRITTEN("bytesWritten"),
READ_OPS("readOps"),
LARGE_READ_OPS("largeReadOps"),
WRITE_OPS("writeOps"),

// Additional counters from HADOOP-13305
OP_APPEND(CommonStatisticNames.OP_APPEND),
OP_COPY_FROM_LOCAL_FILE(CommonStatisticNames.OP_COPY_FROM_LOCAL_FILE),
OP_CREATE(CommonStatisticNames.OP_CREATE),
OP_CREATE_NON_RECURSIVE(CommonStatisticNames.OP_CREATE_NON_RECURSIVE),
OP_DELETE(CommonStatisticNames.OP_DELETE),
OP_EXISTS(CommonStatisticNames.OP_EXISTS),
OP_GET_CONTENT_SUMMARY(CommonStatisticNames.OP_GET_CONTENT_SUMMARY),
OP_GET_DELEGATION_TOKEN(CommonStatisticNames.OP_GET_DELEGATION_TOKEN),
OP_GET_FILE_CHECKSUM(CommonStatisticNames.OP_GET_FILE_CHECKSUM),
OP_GET_FILE_STATUS(CommonStatisticNames.OP_GET_FILE_STATUS),
OP_GET_STATUS(CommonStatisticNames.OP_GET_STATUS),
OP_GLOB_STATUS(CommonStatisticNames.OP_GLOB_STATUS),
OP_IS_FILE(CommonStatisticNames.OP_IS_FILE),
OP_IS_DIRECTORY(CommonStatisticNames.OP_IS_DIRECTORY),
OP_LIST_FILES(CommonStatisticNames.OP_LIST_FILES),
OP_LIST_LOCATED_STATUS(CommonStatisticNames.OP_LIST_LOCATED_STATUS),
OP_LIST_STATUS(CommonStatisticNames.OP_LIST_STATUS),
OP_MKDIRS(CommonStatisticNames.OP_MKDIRS),
OP_MODIFY_ACL_ENTRIES(CommonStatisticNames.OP_MODIFY_ACL_ENTRIES),
OP_OPEN(CommonStatisticNames.OP_OPEN),
OP_REMOVE_ACL(CommonStatisticNames.OP_REMOVE_ACL),
OP_REMOVE_ACL_ENTRIES(CommonStatisticNames.OP_REMOVE_ACL_ENTRIES),
OP_REMOVE_DEFAULT_ACL(CommonStatisticNames.OP_REMOVE_DEFAULT_ACL),
OP_RENAME(CommonStatisticNames.OP_RENAME),
OP_SET_ACL(CommonStatisticNames.OP_SET_ACL),
OP_SET_OWNER(CommonStatisticNames.OP_SET_OWNER),
OP_SET_PERMISSION(CommonStatisticNames.OP_SET_PERMISSION),
OP_SET_TIMES(CommonStatisticNames.OP_SET_TIMES),
OP_TRUNCATE(CommonStatisticNames.OP_TRUNCATE),

// counters below are not needed in production, as the scheme_countername expansion is taken care of by the
// FileSystemCounterGroup, the only reason they are here is that some analyzers still depend on them
@Deprecated
HDFS_BYTES_READ("hdfsBytesRead"),
@Deprecated
HDFS_BYTES_WRITTEN("hdfsBytesWritten"),
@Deprecated
FILE_BYTES_READ("fileBytesRead"),
@Deprecated
FILE_BYTES_WRITTEN("fileBytesWritten");

private final String opName;

FileSystemCounter(String opName) {
this.opName = opName;
}

public String getOpName() {
return opName;
}
}
17 changes: 17 additions & 0 deletions tez-runtime-internals/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,23 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
Expand All @@ -17,9 +17,7 @@

package org.apache.tez.runtime.metrics;

import java.util.List;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.StorageStatistics;
import org.apache.tez.common.counters.FileSystemCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
Expand All @@ -30,50 +28,22 @@
*/
public class FileSystemStatisticUpdater {

private List<FileSystem.Statistics> stats;
private TezCounter readBytesCounter, writeBytesCounter, readOpsCounter, largeReadOpsCounter,
writeOpsCounter;
private String scheme;
private TezCounters counters;
private final StorageStatistics stats;
private final TezCounters counters;

FileSystemStatisticUpdater(TezCounters counters, List<FileSystem.Statistics> stats, String scheme) {
this.stats = stats;
this.scheme = scheme;
FileSystemStatisticUpdater(TezCounters counters, StorageStatistics storageStatistics) {
this.stats = storageStatistics;
this.counters = counters;
}

void updateCounters() {
if (readBytesCounter == null) {
readBytesCounter = counters.findCounter(scheme, FileSystemCounter.BYTES_READ);
}
if (writeBytesCounter == null) {
writeBytesCounter = counters.findCounter(scheme, FileSystemCounter.BYTES_WRITTEN);
}
if (readOpsCounter == null) {
readOpsCounter = counters.findCounter(scheme, FileSystemCounter.READ_OPS);
}
if (largeReadOpsCounter == null) {
largeReadOpsCounter = counters.findCounter(scheme, FileSystemCounter.LARGE_READ_OPS);
}
if (writeOpsCounter == null) {
writeOpsCounter = counters.findCounter(scheme, FileSystemCounter.WRITE_OPS);
}
long readBytes = 0;
long writeBytes = 0;
long readOps = 0;
long largeReadOps = 0;
long writeOps = 0;
for (FileSystem.Statistics stat : stats) {
readBytes = readBytes + stat.getBytesRead();
writeBytes = writeBytes + stat.getBytesWritten();
readOps = readOps + stat.getReadOps();
largeReadOps = largeReadOps + stat.getLargeReadOps();
writeOps = writeOps + stat.getWriteOps();
// loop through FileSystemCounter enums as it is a smaller set
for (FileSystemCounter fsCounter : FileSystemCounter.values()) {
Long val = stats.getLong(fsCounter.getOpName());
if (val != null && val != 0) {
TezCounter counter = counters.findCounter(stats.getScheme(), fsCounter);
counter.setValue(val);
}
}
readBytesCounter.setValue(readBytes);
writeBytesCounter.setValue(writeBytes);
readOpsCounter.setValue(readOps);
largeReadOpsCounter.setValue(largeReadOps);
writeOpsCounter.setValue(writeOps);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@

package org.apache.tez.runtime.metrics;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Iterator;
import java.util.Map;

import org.apache.hadoop.fs.GlobalStorageStatistics;
import org.apache.hadoop.fs.StorageStatistics;
import org.apache.tez.util.TezMxBeanResourceCalculator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
import org.apache.tez.common.GcTimeUpdater;
import org.apache.tez.common.counters.TaskCounter;
Expand All @@ -49,10 +49,9 @@ public class TaskCounterUpdater {
private final Configuration conf;

/**
* A Map where Key-> URIScheme and value->FileSystemStatisticUpdater
* A Map where Key-> URIScheme and value->Map<Name, FileSystemStatisticUpdater>
*/
private Map<String, FileSystemStatisticUpdater> statisticUpdaters =
new HashMap<String, FileSystemStatisticUpdater>();
private final Map<String, Map<String, FileSystemStatisticUpdater>> statisticUpdaters = new HashMap<>();
protected final GcTimeUpdater gcUpdater;
private ResourceCalculatorProcessTree pTree;
private long initCpuCumulativeTime = 0;
Expand All @@ -67,34 +66,18 @@ public TaskCounterUpdater(TezCounters counters, Configuration conf, String pid)
recordInitialCpuStats();
}


public void updateCounters() {
// FileSystemStatistics are reset each time a new task is seen by the
// container.
// This doesn't remove the fileSystem, and does not clear all statistics -
// so there is a potential of an unused FileSystem showing up for a
// Container, and strange values for READ_OPS etc.
Map<String, List<FileSystem.Statistics>> map = new
HashMap<String, List<FileSystem.Statistics>>();
for(Statistics stat: FileSystem.getAllStatistics()) {
String uriScheme = stat.getScheme();
if (map.containsKey(uriScheme)) {
List<FileSystem.Statistics> list = map.get(uriScheme);
list.add(stat);
} else {
List<FileSystem.Statistics> list = new ArrayList<FileSystem.Statistics>();
list.add(stat);
map.put(uriScheme, list);
}
}
for (Map.Entry<String, List<FileSystem.Statistics>> entry: map.entrySet()) {
FileSystemStatisticUpdater updater = statisticUpdaters.get(entry.getKey());
if(updater==null) {//new FileSystem has been found in the cache
updater =
new FileSystemStatisticUpdater(tezCounters, entry.getValue(),
entry.getKey());
statisticUpdaters.put(entry.getKey(), updater);
}
GlobalStorageStatistics globalStorageStatistics = FileSystem.getGlobalStorageStatistics();
Iterator<StorageStatistics> iter = globalStorageStatistics.iterator();
while (iter.hasNext()) {
StorageStatistics stats = iter.next();
// Fetch or initialize the updater set for the scheme
Map<String, FileSystemStatisticUpdater> updaterSet = statisticUpdaters
.computeIfAbsent(stats.getScheme(), k -> new HashMap<>());
// Fetch or create the updater for the specific statistic
FileSystemStatisticUpdater updater = updaterSet
.computeIfAbsent(stats.getName(), k -> new FileSystemStatisticUpdater(tezCounters, stats));
updater.updateCounters();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.tez.runtime.metrics;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.tez.common.counters.FileSystemCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestFileSystemStatisticUpdater {

private static final Logger LOG = LoggerFactory.getLogger(
TestFileSystemStatisticUpdater.class);

private static MiniDFSCluster dfsCluster;

private static final Configuration CONF = new Configuration();
private static FileSystem remoteFs;

private static final String TEST_ROOT_DIR = "target" + Path.SEPARATOR +
TestFileSystemStatisticUpdater.class.getName() + "-tmpDir";

@BeforeClass
public static void beforeClass() throws Exception {
CONF.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
}

@AfterClass
public static void tearDown() {
if (dfsCluster != null) {
dfsCluster.shutdown();
dfsCluster = null;
}
}

@Before
public void setup() throws IOException {
FileSystem.clearStatistics();
try {
// tear down the whole cluster before each test to completely get rid of file system statistics
if (dfsCluster != null) {
dfsCluster.shutdown();
}
dfsCluster = new MiniDFSCluster.Builder(CONF).numDataNodes(2).build();
remoteFs = dfsCluster.getFileSystem();
} catch (IOException io) {
throw new RuntimeException("problem starting mini dfs cluster", io);
}
}

@Test
public void basicTest() throws IOException {
TezCounters counters = new TezCounters();
TaskCounterUpdater updater = new TaskCounterUpdater(counters, CONF, "pid");

DFSTestUtil.writeFile(remoteFs, new Path("/tmp/foo/abc.txt"), "xyz");

updater.updateCounters();
LOG.info("Counters (after first update): {}", counters);
assertCounter(counters, FileSystemCounter.OP_MKDIRS, 0); // DFSTestUtil doesn't call separate mkdirs
assertCounter(counters, FileSystemCounter.OP_CREATE, 1);
assertCounter(counters, FileSystemCounter.BYTES_WRITTEN, 3); // "xyz"
assertCounter(counters, FileSystemCounter.WRITE_OPS, 1);
assertCounter(counters, FileSystemCounter.OP_GET_FILE_STATUS, 1); // DFSTestUtil calls fs.exists
assertCounter(counters, FileSystemCounter.OP_CREATE, 1);

DFSTestUtil.writeFile(remoteFs, new Path("/tmp/foo/abc1.txt"), "xyz");

updater.updateCounters();
LOG.info("Counters (after second update): {}", counters);
assertCounter(counters, FileSystemCounter.OP_CREATE, 2);
assertCounter(counters, FileSystemCounter.BYTES_WRITTEN, 6); // "xyz" has been written twice
assertCounter(counters, FileSystemCounter.WRITE_OPS, 2);
assertCounter(counters, FileSystemCounter.OP_GET_FILE_STATUS, 2); // DFSTestUtil calls fs.exists again
assertCounter(counters, FileSystemCounter.OP_CREATE, 2);

// Ensure all numbers are reset
updater.updateCounters();
LOG.info("Counters (after third update): {}", counters);
// counter holds its value after clearStatistics + updateCounters
assertCounter(counters, FileSystemCounter.OP_CREATE, 2);
}

private void assertCounter(TezCounters counters, FileSystemCounter fsCounter, int value) {
TezCounter counter = counters.findCounter(remoteFs.getScheme(), fsCounter);
Assert.assertNotNull(counter);
Assert.assertEquals(value, counter.getValue());
}
}
Loading

0 comments on commit 1f7465f

Please sign in to comment.