Skip to content

Commit

Permalink
Simplify the memory pool arbitration process management (facebookincu…
Browse files Browse the repository at this point in the history
…bator#10705)

Summary:
Move memory arbitration context setting from shared arbitrator to memory pool and pass the
root memory pool to memory arbitrator directly to simplify the the arbitrator backend implementation.
The memory pool handles the memory arbitration context setting and grow the capacity with the
memory arbitrator directly.

Pull Request resolved: facebookincubator#10705

Reviewed By: tanjialiang

Differential Revision: D61063144

Pulled By: xiaoxmeng

fbshipit-source-id: 3baca78d69a8584f04070b851f27c9210a2144c0
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Aug 12, 2024
1 parent af14051 commit 5f32870
Show file tree
Hide file tree
Showing 11 changed files with 164 additions and 137 deletions.
11 changes: 0 additions & 11 deletions velox/common/memory/Memory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,17 +131,13 @@ MemoryManager::MemoryManager(const MemoryManagerOptions& options)
debugEnabled_(options.debugEnabled),
coreOnAllocationFailureEnabled_(options.coreOnAllocationFailureEnabled),
poolDestructionCb_([&](MemoryPool* pool) { dropPool(pool); }),
poolGrowCb_([&](MemoryPool* pool, uint64_t targetBytes) {
return growPool(pool, targetBytes);
}),
sysRoot_{std::make_shared<MemoryPoolImpl>(
this,
std::string(kSysRootName),
MemoryPool::Kind::kAggregate,
nullptr,
nullptr,
nullptr,
nullptr,
// NOTE: the default root memory pool has no capacity limit, and it is
// used for system usage in production such as disk spilling.
MemoryPool::Options{
Expand Down Expand Up @@ -268,7 +264,6 @@ std::shared_ptr<MemoryPool> MemoryManager::addRootPool(
MemoryPool::Kind::kAggregate,
nullptr,
std::move(reclaimer),
poolGrowCb_,
poolDestructionCb_,
options);
pools_.emplace(poolName, pool);
Expand All @@ -290,12 +285,6 @@ std::shared_ptr<MemoryPool> MemoryManager::addLeafPool(
return sysRoot_->addLeafChild(poolName, threadSafe, nullptr);
}

bool MemoryManager::growPool(MemoryPool* pool, uint64_t incrementBytes) {
VELOX_CHECK_NOT_NULL(pool);
VELOX_CHECK_NE(pool->capacity(), kMaxMemory);
return arbitrator_->growCapacity(pool, incrementBytes);
}

uint64_t MemoryManager::shrinkPools(
uint64_t targetBytes,
bool allowSpill,
Expand Down
6 changes: 0 additions & 6 deletions velox/common/memory/Memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -307,10 +307,6 @@ class MemoryManager {
private:
void dropPool(MemoryPool* pool);

// Invoked to grow a memory pool's free capacity with at least
// 'incrementBytes'. The function returns true on success, otherwise false.
bool growPool(MemoryPool* pool, uint64_t incrementBytes);

// Returns the shared references to all the alive memory pools in 'pools_'.
std::vector<std::shared_ptr<MemoryPool>> getAlivePools() const;

Expand All @@ -328,8 +324,6 @@ class MemoryManager {
// tracked by 'pools_'. It is invoked on the root pool destruction and removes
// the pool from 'pools_'.
const MemoryPoolImpl::DestructionCallback poolDestructionCb_;
// Callback invoked by the root memory pool to request memory capacity growth.
const MemoryPoolImpl::GrowCapacityCallback poolGrowCb_;

const std::shared_ptr<MemoryPool> sysRoot_;
const std::shared_ptr<MemoryPool> spillPool_;
Expand Down
28 changes: 13 additions & 15 deletions velox/common/memory/MemoryArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,16 @@ const MemoryArbitrationContext* memoryArbitrationContext() {
return arbitrationCtx;
}

ScopedMemoryPoolArbitrationCtx::ScopedMemoryPoolArbitrationCtx(MemoryPool* pool)
: pool_(pool) {
VELOX_CHECK_NOT_NULL(pool_);
pool_->enterArbitration();
}

ScopedMemoryPoolArbitrationCtx::~ScopedMemoryPoolArbitrationCtx() {
pool_->leaveArbitration();
}

bool underMemoryArbitration() {
return memoryArbitrationContext() != nullptr;
}
Expand All @@ -515,21 +525,9 @@ void testingRunArbitration(
MemoryPool* pool,
uint64_t targetBytes,
bool allowSpill) {
pool->enterArbitration();
// Seraliazes the testing arbitration injection to make sure that the previous
// op has left arbitration section before starting the next one. This is
// guaranteed by the production code for operation triggered arbitration.
static std::mutex lock;
{
std::lock_guard<std::mutex> l(lock);
static_cast<MemoryPoolImpl*>(pool)->testingManager()->shrinkPools(
targetBytes, allowSpill);
pool->leaveArbitration();
}
// This function is simulating an operator triggered arbitration which
// would check if the query has been aborted after finish arbitration by the
// memory pool capacity grow path.
static_cast<MemoryPoolImpl*>(pool)->testingCheckIfAborted();
ScopedMemoryPoolArbitrationCtx arbitrationCtx{pool};
static_cast<MemoryPoolImpl*>(pool)->testingManager()->shrinkPools(
targetBytes, allowSpill);
}

ScopedReclaimedBytesRecorder::ScopedReclaimedBytesRecorder(
Expand Down
19 changes: 15 additions & 4 deletions velox/common/memory/MemoryArbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -411,10 +411,10 @@ class ScopedMemoryArbitrationContext {
public:
explicit ScopedMemoryArbitrationContext(const MemoryPool* requestor);

// Can be used to restore a previously captured MemoryArbitrationContext.
// contextToRestore can be nullptr if there was no context at the time it was
// captured, in which case arbitrationCtx is unchanged upon
// contruction/destruction of this object.
/// Can be used to restore a previously captured MemoryArbitrationContext.
/// contextToRestore can be nullptr if there was no context at the time it was
/// captured, in which case arbitrationCtx is unchanged upon
/// contruction/destruction of this object.
explicit ScopedMemoryArbitrationContext(
const MemoryArbitrationContext* contextToRestore);

Expand All @@ -425,6 +425,17 @@ class ScopedMemoryArbitrationContext {
MemoryArbitrationContext currentArbitrationCtx_;
};

/// Object used to setup arbitration context for a memory pool.
class ScopedMemoryPoolArbitrationCtx {
public:
explicit ScopedMemoryPoolArbitrationCtx(MemoryPool* pool);

~ScopedMemoryPoolArbitrationCtx();

private:
MemoryPool* const pool_;
};

/// Returns the memory arbitration context set by a per-thread local variable if
/// the running thread is under memory arbitration processing.
const MemoryArbitrationContext* memoryArbitrationContext();
Expand Down
19 changes: 12 additions & 7 deletions velox/common/memory/MemoryPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,13 +413,12 @@ MemoryPoolImpl::MemoryPoolImpl(
Kind kind,
std::shared_ptr<MemoryPool> parent,
std::unique_ptr<MemoryReclaimer> reclaimer,
GrowCapacityCallback growCapacityCb,
DestructionCallback destructionCb,
const Options& options)
: MemoryPool{name, kind, parent, options},
manager_{memoryManager},
allocator_{manager_->allocator()},
growCapacityCb_(std::move(growCapacityCb)),
arbitrator_{manager_->arbitrator()},
destructionCb_(std::move(destructionCb)),
debugPoolNameRegex_(debugEnabled_ ? *(debugPoolNameRegex().rlock()) : ""),
reclaimer_(std::move(reclaimer)),
Expand All @@ -428,8 +427,8 @@ MemoryPoolImpl::MemoryPoolImpl(
capacity_(parent_ != nullptr ? kMaxMemory : 0) {
VELOX_CHECK(options.threadSafe || isLeaf());
VELOX_CHECK(
isRoot() || (destructionCb_ == nullptr && growCapacityCb_ == nullptr),
"Only root memory pool allows to set destruction and capacity grow callbacks: {}",
isRoot() || destructionCb_ == nullptr,
"Only root memory pool allows to set destruction callbacks: {}",
name_);
}

Expand Down Expand Up @@ -733,7 +732,6 @@ std::shared_ptr<MemoryPool> MemoryPoolImpl::genChild(
parent,
std::move(reclaimer),
nullptr,
nullptr,
Options{
.alignment = alignment_,
.trackUsage = trackUsage_,
Expand Down Expand Up @@ -842,8 +840,7 @@ bool MemoryPoolImpl::incrementReservationThreadSafe(

VELOX_CHECK_NULL(parent_);

++numCapacityGrowths_;
if (growCapacityCb_(requestor, size)) {
if (growCapacity(requestor, size)) {
TestValue::adjust(
"facebook::velox::memory::MemoryPoolImpl::incrementReservationThreadSafe::AfterGrowCallback",
this);
Expand All @@ -865,6 +862,14 @@ bool MemoryPoolImpl::incrementReservationThreadSafe(
treeMemoryUsage()));
}

bool MemoryPoolImpl::growCapacity(MemoryPool* requestor, uint64_t size) {
VELOX_CHECK(requestor->isLeaf());
++numCapacityGrowths_;

ScopedMemoryPoolArbitrationCtx arbitrationCtx(requestor);
return arbitrator_->growCapacity(this, size);
}

bool MemoryPoolImpl::maybeIncrementReservation(uint64_t size) {
std::lock_guard<std::mutex> l(mutex_);
if (isRoot()) {
Expand Down
42 changes: 21 additions & 21 deletions velox/common/memory/MemoryPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -372,16 +372,6 @@ class MemoryPool : public std::enable_shared_from_this<MemoryPool> {
/// Returns the memory reclaimer of this memory pool if not null.
virtual MemoryReclaimer* reclaimer() const = 0;

/// Invoked by the memory arbitrator to enter memory arbitration processing.
/// It is a noop if 'reclaimer' is not set, otherwise invoke the reclaimer's
/// corresponding method.
virtual void enterArbitration() = 0;

/// Invoked by the memory arbitrator to leave memory arbitration processing.
/// It is a noop if 'reclaimer' is not set, otherwise invoke the reclaimer's
/// corresponding method.
virtual void leaveArbitration() noexcept = 0;

/// Function estimates the number of reclaimable bytes and returns in
/// 'reclaimableBytes'. If the 'reclaimer' is not set, the function returns
/// std::nullopt. Otherwise, it will invoke the corresponding method of the
Expand Down Expand Up @@ -499,6 +489,16 @@ class MemoryPool : public std::enable_shared_from_this<MemoryPool> {
protected:
static constexpr uint64_t kMB = 1 << 20;

/// Invoked by the memory arbitrator to enter memory arbitration processing.
/// It is a noop if 'reclaimer' is not set, otherwise invoke the reclaimer's
/// corresponding method.
virtual void enterArbitration() = 0;

/// Invoked by the memory arbitrator to leave memory arbitration processing.
/// It is a noop if 'reclaimer' is not set, otherwise invoke the reclaimer's
/// corresponding method.
virtual void leaveArbitration() noexcept = 0;

/// Invoked to free up to the specified amount of free memory by reducing
/// this memory pool's capacity without actually freeing any used memory. The
/// function returns the actually freed memory capacity in bytes. If
Expand Down Expand Up @@ -557,6 +557,7 @@ class MemoryPool : public std::enable_shared_from_this<MemoryPool> {
friend class velox::exec::ParallelMemoryReclaimer;
friend class MemoryManager;
friend class MemoryArbitrator;
friend class ScopedMemoryPoolArbitrationCtx;

VELOX_FRIEND_TEST(MemoryPoolTest, shrinkAndGrowAPIs);
VELOX_FRIEND_TEST(MemoryPoolTest, grow);
Expand All @@ -573,19 +574,13 @@ class MemoryPoolImpl : public MemoryPool {
/// The callback invoked on the root memory pool destruction. It is set by
/// memory manager to removes the pool from 'MemoryManager::pools_'.
using DestructionCallback = std::function<void(MemoryPool*)>;
/// The callback invoked when the used memory reservation of the root memory
/// pool exceed its capacity. It is set by memory manager to grow the memory
/// pool capacity. The callback returns true if the capacity growth succeeds,
/// otherwise false.
using GrowCapacityCallback = std::function<bool(MemoryPool*, uint64_t)>;

MemoryPoolImpl(
MemoryManager* manager,
const std::string& name,
Kind kind,
std::shared_ptr<MemoryPool> parent,
std::unique_ptr<MemoryReclaimer> reclaimer,
GrowCapacityCallback growCapacityCb,
DestructionCallback destructionCb,
const Options& options = Options{});

Expand Down Expand Up @@ -651,10 +646,6 @@ class MemoryPoolImpl : public MemoryPool {

MemoryReclaimer* reclaimer() const override;

void enterArbitration() override;

void leaveArbitration() noexcept override;

std::optional<uint64_t> reclaimableBytes() const override;

uint64_t reclaim(
Expand Down Expand Up @@ -731,6 +722,10 @@ class MemoryPoolImpl : public MemoryPool {
}

private:
void enterArbitration() override;

void leaveArbitration() noexcept override;

uint64_t shrink(uint64_t targetBytes = 0) override;

bool grow(uint64_t growBytes, uint64_t reservationBytes = 0) override;
Expand Down Expand Up @@ -872,6 +867,11 @@ class MemoryPoolImpl : public MemoryPool {

void releaseThreadSafe(uint64_t size, bool releaseOnly);

// Invoked to grow capacity of the root memory pool from the memory
// arbitrator. 'requestor' is the leaf memory pool that triggers the memory
// capacity growth. 'size' is the memory capacity growth in bytes.
bool growCapacity(MemoryPool* requestor, uint64_t size);

FOLLY_ALWAYS_INLINE void releaseNonThreadSafe(
uint64_t size,
bool releaseOnly) {
Expand Down Expand Up @@ -999,7 +999,7 @@ class MemoryPoolImpl : public MemoryPool {

MemoryManager* const manager_;
MemoryAllocator* const allocator_;
const GrowCapacityCallback growCapacityCb_;
MemoryArbitrator* const arbitrator_;
const DestructionCallback destructionCb_;

// Regex for filtering on 'name_' when debug mode is enabled. This allows us
Expand Down
Loading

0 comments on commit 5f32870

Please sign in to comment.