Skip to content

Commit

Permalink
feat: Support to disable dynamic filter reordering (#11943)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #11943

Add config to disable dynamic filter reordering to see if it can help reduce io in favor of small query execution
if io wait could take significant amount when having local ssd. The static reordering solely based on the filter kind.
This should only be used in specific case as dynamic filter reordering is an important scan optimization.

Reviewed By: Yuhta, oerling

Differential Revision: D67542368
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Dec 24, 2024
1 parent 8ebd3a8 commit 97249d1
Show file tree
Hide file tree
Showing 15 changed files with 354 additions and 88 deletions.
2 changes: 1 addition & 1 deletion velox/connectors/Connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ class ConnectorQueryCtx {
const config::ConfigBase* const sessionProperties_;
const common::SpillConfig* const spillConfig_;
const common::PrefixSortConfig prefixSortConfig_;
std::unique_ptr<core::ExpressionEvaluator> expressionEvaluator_;
const std::unique_ptr<core::ExpressionEvaluator> expressionEvaluator_;
cache::AsyncDataCache* cache_;
const std::string scanId_;
const std::string queryId_;
Expand Down
13 changes: 11 additions & 2 deletions velox/connectors/hive/HiveConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,10 @@ bool HiveConfig::ignoreMissingFiles(const config::ConfigBase* session) const {
return session->get<bool>(kIgnoreMissingFilesSession, false);
}

int64_t HiveConfig::maxCoalescedBytes() const {
return config_->get<int64_t>(kMaxCoalescedBytes, 128 << 20); // 128MB
int64_t HiveConfig::maxCoalescedBytes(const config::ConfigBase* session) const {
return session->get<int64_t>(
kMaxCoalescedBytesSession,
config_->get<int64_t>(kMaxCoalescedBytes, 128 << 20)); // 128MB
}

int32_t HiveConfig::maxCoalescedDistanceBytes(
Expand Down Expand Up @@ -283,6 +285,13 @@ uint8_t HiveConfig::readTimestampUnit(const config::ConfigBase* session) const {
return unit;
}

bool HiveConfig::readStatsBasedFilterReorderDisabled(
const config::ConfigBase* session) const {
return session->get<bool>(
kReadStatsBasedFilterReorderDisabledSession,
config_->get<bool>(kReadStatsBasedFilterReorderDisabled, false));
}

bool HiveConfig::cacheNoRetention(const config::ConfigBase* session) const {
return session->get<bool>(
kCacheNoRetentionSession,
Expand Down
13 changes: 12 additions & 1 deletion velox/connectors/hive/HiveConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ class HiveConfig {

/// The max coalesce bytes for a request.
static constexpr const char* kMaxCoalescedBytes = "max-coalesced-bytes";
static constexpr const char* kMaxCoalescedBytesSession =
"max-coalesced-bytes";

/// The max merge distance to combine read requests.
/// Note: The session property name differs from the constant name for
Expand Down Expand Up @@ -206,6 +208,11 @@ class HiveConfig {
static constexpr const char* kReadTimestampUnitSession =
"hive.reader.timestamp_unit";

static constexpr const char* kReadStatsBasedFilterReorderDisabled =
"hive.reader.stats_based_filter_reorder_disabaled";
static constexpr const char* kReadStatsBasedFilterReorderDisabledSession =
"hive.reader.stats_based_filter_reorder_disabaled";

static constexpr const char* kCacheNoRetention = "cache.no_retention";
static constexpr const char* kCacheNoRetentionSession = "cache.no_retention";
static constexpr const char* kLocalDataPath = "hive_local_data_path";
Expand Down Expand Up @@ -239,7 +246,7 @@ class HiveConfig {

bool ignoreMissingFiles(const config::ConfigBase* session) const;

int64_t maxCoalescedBytes() const;
int64_t maxCoalescedBytes(const config::ConfigBase* session) const;

int32_t maxCoalescedDistanceBytes(const config::ConfigBase* session) const;

Expand Down Expand Up @@ -294,6 +301,10 @@ class HiveConfig {
// Returns the timestamp unit used when reading timestamps from files.
uint8_t readTimestampUnit(const config::ConfigBase* session) const;

/// Returns true if the stats based filter reorder for read is disabled.
bool readStatsBasedFilterReorderDisabled(
const config::ConfigBase* session) const;

/// Returns true to evict out a query scanned data out of in-memory cache
/// right after the access, and also skip staging to the ssd cache. This helps
/// to prevent the cache space pollution from the one-time table scan by large
Expand Down
40 changes: 34 additions & 6 deletions velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,28 @@
namespace facebook::velox::connector::hive {
namespace {

struct SubfieldFilter {
std::string name;
const common::Subfield* subfield;
const common::Filter* filter;
};

// Returns the sub-field filters in order of the subfield name.
std::vector<SubfieldFilter> orderSubfieldFilters(
const SubfieldFilters& filters) {
std::vector<SubfieldFilter> ordered;
ordered.reserve(filters.size());
for (const auto& [subfield, filter] : filters) {
ordered.emplace_back(
SubfieldFilter{subfield.toString(), &subfield, filter.get()});
}
std::sort(
ordered.begin(), ordered.end(), [](const auto& lhs, const auto& rhs) {
return lhs.name < rhs.name;
});
return ordered;
}

struct SubfieldSpec {
const common::Subfield* subfield;
bool filterOnly;
Expand Down Expand Up @@ -369,6 +391,7 @@ std::shared_ptr<common::ScanSpec> makeScanSpec(
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
infoColumns,
const SpecialColumnNames& specialColumns,
bool disableStatsBasedFilterReorder,
memory::MemoryPool* pool) {
auto spec = std::make_shared<common::ScanSpec>("root");
folly::F14FastMap<std::string, std::vector<const common::Subfield*>>
Expand Down Expand Up @@ -441,8 +464,9 @@ std::shared_ptr<common::ScanSpec> makeScanSpec(
}
}

for (auto& pair : filters) {
const auto name = pair.first.toString();
// Ensure the initial filter ordering is the same across splits and drivers.
const auto orderedFilters = orderSubfieldFilters(filters);
for (const auto& filterEntry : orderedFilters) {
// SelectiveColumnReader doesn't support constant columns with filters,
// hence, we can't have a filter for a $path or $bucket column.
//
Expand All @@ -451,13 +475,16 @@ std::shared_ptr<common::ScanSpec> makeScanSpec(
// to be removed.
// TODO Remove this check when Presto is fixed to not specify a filter
// on $path and $bucket column.
if (isSynthesizedColumn(name, infoColumns)) {
if (isSynthesizedColumn(filterEntry.name, infoColumns)) {
continue;
}
auto fieldSpec = spec->getOrCreateChild(pair.first);
fieldSpec->addFilter(*pair.second);
auto fieldSpec = spec->getOrCreateChild(*filterEntry.subfield);
fieldSpec->addFilter(*filterEntry.filter);
}

if (disableStatsBasedFilterReorder) {
spec->disableStatsBasedFilterReorder();
}
return spec;
}

Expand Down Expand Up @@ -557,7 +584,8 @@ void configureReaderOptions(
const std::unordered_map<std::string, std::string>& tableParameters) {
auto sessionProperties = connectorQueryCtx->sessionProperties();
readerOptions.setLoadQuantum(hiveConfig->loadQuantum());
readerOptions.setMaxCoalesceBytes(hiveConfig->maxCoalescedBytes());
readerOptions.setMaxCoalesceBytes(
hiveConfig->maxCoalescedBytes(sessionProperties));
readerOptions.setMaxCoalesceDistance(
hiveConfig->maxCoalescedDistanceBytes(sessionProperties));
readerOptions.setFileColumnNamesReadAsLowerCase(
Expand Down
1 change: 1 addition & 0 deletions velox/connectors/hive/HiveConnectorUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ std::shared_ptr<common::ScanSpec> makeScanSpec(
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
infoColumns,
const SpecialColumnNames& specialColumns,
bool disableStatsBasedFilterReorder,
memory::MemoryPool* pool);

void configureReaderOptions(
Expand Down
4 changes: 4 additions & 0 deletions velox/connectors/hive/HiveDataSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ HiveDataSource::HiveDataSource(
partitionKeys_,
infoColumns_,
specialColumns_,
hiveConfig_->readStatsBasedFilterReorderDisabled(
connectorQueryCtx_->sessionProperties()),
pool_);
if (remainingFilter) {
metadataFilter_ = std::make_shared<common::MetadataFilter>(
Expand Down Expand Up @@ -260,6 +262,8 @@ std::unique_ptr<HivePartitionFunction> HiveDataSource::setupBucketConversion() {
partitionKeys_,
infoColumns_,
specialColumns_,
hiveConfig_->readStatsBasedFilterReorderDisabled(
connectorQueryCtx_->sessionProperties()),
pool_);
newScanSpec->moveAdaptationFrom(*scanSpec_);
scanSpec_ = std::move(newScanSpec);
Expand Down
70 changes: 36 additions & 34 deletions velox/connectors/hive/tests/HiveConfigTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,16 @@ TEST(HiveConfigTest, defaultConfig) {
ASSERT_EQ(hiveConfig.immutablePartitions(), false);
ASSERT_EQ(hiveConfig.gcsEndpoint(), "");
ASSERT_EQ(hiveConfig.gcsCredentialsPath(), "");
ASSERT_EQ(hiveConfig.isOrcUseColumnNames(emptySession.get()), false);
ASSERT_EQ(
hiveConfig.isFileColumnNamesReadAsLowerCase(emptySession.get()), false);
ASSERT_FALSE(hiveConfig.isOrcUseColumnNames(emptySession.get()));
ASSERT_FALSE(hiveConfig.isFileColumnNamesReadAsLowerCase(emptySession.get()));

ASSERT_EQ(hiveConfig.maxCoalescedBytes(), 128 << 20);
ASSERT_EQ(hiveConfig.maxCoalescedBytes(emptySession.get()), 128 << 20);
ASSERT_EQ(
hiveConfig.maxCoalescedDistanceBytes(emptySession.get()), 512 << 10);
ASSERT_FALSE(
hiveConfig.readStatsBasedFilterReorderDisabled(emptySession.get()));
ASSERT_EQ(hiveConfig.numCacheFileHandles(), 20'000);
ASSERT_EQ(hiveConfig.isFileHandleCacheEnabled(), true);
ASSERT_TRUE(hiveConfig.isFileHandleCacheEnabled());
ASSERT_EQ(
hiveConfig.orcWriterMaxStripeSize(emptySession.get()),
64L * 1024L * 1024L);
Expand All @@ -62,13 +63,13 @@ TEST(HiveConfigTest, defaultConfig) {
hiveConfig.sortWriterMaxOutputBytes(emptySession.get()), 10UL << 20);
ASSERT_EQ(
hiveConfig.sortWriterFinishTimeSliceLimitMs(emptySession.get()), 5'000);
ASSERT_EQ(hiveConfig.isPartitionPathAsLowerCase(emptySession.get()), true);
ASSERT_EQ(hiveConfig.allowNullPartitionKeys(emptySession.get()), true);
ASSERT_TRUE(hiveConfig.isPartitionPathAsLowerCase(emptySession.get()));
ASSERT_TRUE(hiveConfig.allowNullPartitionKeys(emptySession.get()));
ASSERT_EQ(hiveConfig.orcWriterMinCompressionSize(emptySession.get()), 1024);
ASSERT_EQ(
hiveConfig.orcWriterCompressionLevel(emptySession.get()), std::nullopt);
ASSERT_EQ(
hiveConfig.orcWriterLinearStripeSizeHeuristics(emptySession.get()), true);
ASSERT_TRUE(
hiveConfig.orcWriterLinearStripeSizeHeuristics(emptySession.get()));
ASSERT_FALSE(hiveConfig.cacheNoRetention(emptySession.get()));
}

Expand Down Expand Up @@ -96,7 +97,8 @@ TEST(HiveConfigTest, overrideConfig) {
{HiveConfig::kOrcWriterLinearStripeSizeHeuristics, "false"},
{HiveConfig::kOrcWriterMinCompressionSize, "512"},
{HiveConfig::kOrcWriterCompressionLevel, "1"},
{HiveConfig::kCacheNoRetention, "true"}};
{HiveConfig::kCacheNoRetention, "true"},
{HiveConfig::kReadStatsBasedFilterReorderDisabled, "true"}};
HiveConfig hiveConfig(
std::make_shared<config::ConfigBase>(std::move(configFromFile)));
auto emptySession = std::make_shared<config::ConfigBase>(
Expand All @@ -106,18 +108,17 @@ TEST(HiveConfigTest, overrideConfig) {
facebook::velox::connector::hive::HiveConfig::
InsertExistingPartitionsBehavior::kOverwrite);
ASSERT_EQ(hiveConfig.maxPartitionsPerWriters(emptySession.get()), 120);
ASSERT_EQ(hiveConfig.immutablePartitions(), true);
ASSERT_TRUE(hiveConfig.immutablePartitions());
ASSERT_EQ(hiveConfig.gcsEndpoint(), "hey");
ASSERT_EQ(hiveConfig.gcsCredentialsPath(), "hey");
ASSERT_EQ(hiveConfig.isOrcUseColumnNames(emptySession.get()), true);
ASSERT_EQ(
hiveConfig.isFileColumnNamesReadAsLowerCase(emptySession.get()), true);
ASSERT_EQ(hiveConfig.allowNullPartitionKeys(emptySession.get()), false);
ASSERT_EQ(hiveConfig.maxCoalescedBytes(), 100);
ASSERT_TRUE(hiveConfig.isOrcUseColumnNames(emptySession.get()));
ASSERT_TRUE(hiveConfig.isFileColumnNamesReadAsLowerCase(emptySession.get()));
ASSERT_FALSE(hiveConfig.allowNullPartitionKeys(emptySession.get()));
ASSERT_EQ(hiveConfig.maxCoalescedBytes(emptySession.get()), 100);
ASSERT_EQ(
hiveConfig.maxCoalescedDistanceBytes(emptySession.get()), 100 << 10);
ASSERT_EQ(hiveConfig.numCacheFileHandles(), 100);
ASSERT_EQ(hiveConfig.isFileHandleCacheEnabled(), false);
ASSERT_FALSE(hiveConfig.isFileHandleCacheEnabled());
ASSERT_EQ(
hiveConfig.orcWriterMaxStripeSize(emptySession.get()),
100L * 1024L * 1024L);
Expand All @@ -142,6 +143,8 @@ TEST(HiveConfigTest, overrideConfig) {
hiveConfig.orcWriterLinearStripeSizeHeuristics(emptySession.get()),
false);
ASSERT_TRUE(hiveConfig.cacheNoRetention(emptySession.get()));
ASSERT_TRUE(
hiveConfig.readStatsBasedFilterReorderDisabled(emptySession.get()));
}

TEST(HiveConfigTest, overrideSession) {
Expand All @@ -165,44 +168,43 @@ TEST(HiveConfigTest, overrideSession) {
{HiveConfig::kOrcWriterMinCompressionSizeSession, "512"},
{HiveConfig::kOrcWriterCompressionLevelSession, "1"},
{HiveConfig::kOrcWriterLinearStripeSizeHeuristicsSession, "false"},
{HiveConfig::kCacheNoRetentionSession, "true"}};
{HiveConfig::kCacheNoRetentionSession, "true"},
{HiveConfig::kReadStatsBasedFilterReorderDisabled, "true"}};
const auto session =
std::make_unique<config::ConfigBase>(std::move(sessionOverride));
ASSERT_EQ(
hiveConfig.insertExistingPartitionsBehavior(session.get()),
facebook::velox::connector::hive::HiveConfig::
InsertExistingPartitionsBehavior::kOverwrite);
ASSERT_EQ(hiveConfig.maxPartitionsPerWriters(session.get()), 128);
ASSERT_EQ(hiveConfig.immutablePartitions(), false);
ASSERT_FALSE(hiveConfig.immutablePartitions());
ASSERT_EQ(hiveConfig.gcsEndpoint(), "");
ASSERT_EQ(hiveConfig.gcsCredentialsPath(), "");
ASSERT_EQ(hiveConfig.isOrcUseColumnNames(session.get()), true);
ASSERT_EQ(hiveConfig.isFileColumnNamesReadAsLowerCase(session.get()), true);
ASSERT_TRUE(hiveConfig.isOrcUseColumnNames(session.get()));
ASSERT_TRUE(hiveConfig.isFileColumnNamesReadAsLowerCase(session.get()));

ASSERT_EQ(hiveConfig.maxCoalescedBytes(), 128 << 20);
ASSERT_EQ(hiveConfig.maxCoalescedBytes(session.get()), 128 << 20);
ASSERT_EQ(hiveConfig.maxCoalescedDistanceBytes(session.get()), 3 << 20);
ASSERT_EQ(hiveConfig.numCacheFileHandles(), 20'000);
ASSERT_EQ(hiveConfig.isFileHandleCacheEnabled(), true);
ASSERT_TRUE(hiveConfig.isFileHandleCacheEnabled());
ASSERT_EQ(
hiveConfig.orcWriterMaxStripeSize(session.get()), 22L * 1024L * 1024L);
ASSERT_EQ(
hiveConfig.orcWriterMaxDictionaryMemory(session.get()),
22L * 1024L * 1024L);
ASSERT_EQ(
hiveConfig.isOrcWriterIntegerDictionaryEncodingEnabled(session.get()),
false);
ASSERT_EQ(
hiveConfig.isOrcWriterStringDictionaryEncodingEnabled(session.get()),
false);
ASSERT_FALSE(
hiveConfig.isOrcWriterIntegerDictionaryEncodingEnabled(session.get()));
ASSERT_FALSE(
hiveConfig.isOrcWriterStringDictionaryEncodingEnabled(session.get()));
ASSERT_EQ(hiveConfig.sortWriterMaxOutputRows(session.get()), 20);
ASSERT_EQ(hiveConfig.sortWriterMaxOutputBytes(session.get()), 20UL << 20);
ASSERT_EQ(hiveConfig.sortWriterFinishTimeSliceLimitMs(session.get()), 300);
ASSERT_EQ(hiveConfig.isPartitionPathAsLowerCase(session.get()), false);
ASSERT_EQ(hiveConfig.allowNullPartitionKeys(session.get()), false);
ASSERT_EQ(hiveConfig.ignoreMissingFiles(session.get()), true);
ASSERT_EQ(
hiveConfig.orcWriterLinearStripeSizeHeuristics(session.get()), false);
ASSERT_FALSE(hiveConfig.isPartitionPathAsLowerCase(session.get()));
ASSERT_FALSE(hiveConfig.allowNullPartitionKeys(session.get()));
ASSERT_TRUE(hiveConfig.ignoreMissingFiles(session.get()));
ASSERT_FALSE(hiveConfig.orcWriterLinearStripeSizeHeuristics(session.get()));
ASSERT_EQ(hiveConfig.orcWriterMinCompressionSize(session.get()), 512);
ASSERT_EQ(hiveConfig.orcWriterCompressionLevel(session.get()), 1);
ASSERT_TRUE(hiveConfig.cacheNoRetention(session.get()));
ASSERT_TRUE(hiveConfig.readStatsBasedFilterReorderDisabled(session.get()));
}
Loading

0 comments on commit 97249d1

Please sign in to comment.