Skip to content

Commit

Permalink
allocate combined buffers only as needed
Browse files Browse the repository at this point in the history
  • Loading branch information
lroberts36 committed Nov 19, 2024
1 parent 0c3131e commit 08d9c13
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 21 deletions.
44 changes: 29 additions & 15 deletions src/bvals/comms/coalesced_buffers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,32 +36,42 @@ namespace parthenon {
void CoalescedBuffer::AllocateCoalescedBuffer() {
int send_rank = sender ? Globals::my_rank : other_rank;
int recv_rank = sender ? other_rank : Globals::my_rank;
coalesced_comm_buffer = CommBuffer<buf_t>(2 * partition, send_rank, recv_rank, comm_);
coalesced_comm_buffer.ConstructBuffer("combined send buffer",
current_size + 1); // Actually allocate the thing
coalesced_comm_buffer = CommBuffer<buf_t>(
2 * partition, send_rank, recv_rank, comm_,
[](int size) { return buf_t("Combined Buffer", 2 * size); }, true);

sparse_status_buffer =
CommBuffer<std::vector<int>>(2 * partition + 1, send_rank, recv_rank, comm_);
sparse_status_buffer.ConstructBuffer(current_size + 1);
// PARTHENON_REQUIRE(current_size > 0, "Are we bigger than zero?");
// Point the BndId objects to the combined buffer
for (auto uid : all_vars) {
for (auto &[bnd_id, pvbbuf] : coalesced_info_buf.at(uid)) {
bnd_id.coalesced_buf = coalesced_comm_buffer.buffer();
}
}
}

//----------------------------------------------------------------------------------------
ParArray1D<BndId> &CoalescedBuffer::GetBndIdsOnDevice(const std::set<Uid_t> &vars) {
ParArray1D<BndId> &CoalescedBuffer::GetBndIdsOnDevice(const std::set<Uid_t> &vars,
int *pcomb_size) {
const auto &var_set = vars.size() == 0 ? all_vars : vars;
auto &bnd_ids_device = bnd_ids_device_map[var_set];
auto &bnd_ids_host = bnd_ids_host_map[var_set];

int nbnd_id{0};
for (auto uid : var_set)
int comb_size{0};
for (auto uid : var_set) {
nbnd_id += coalesced_info_buf.at(uid).size();
for (auto &[bnd_id, pvbbuf] : coalesced_info_buf.at(uid)) {
auto buf_state = pvbbuf->GetState();
if ((buf_state == BufferState::sending) || (buf_state == BufferState::received))
comb_size += bnd_id.size();
}
}
if (pcomb_size != nullptr) *pcomb_size = comb_size;

bool updated = false;
if (comb_size > coalesced_comm_buffer.buffer().size()) {
PARTHENON_REQUIRE(
sender, "Something bad is going on if we are doing this on a receiving buffer.");
coalesced_comm_buffer.ConstructBuffer("combined send buffer", 2 * comb_size);
updated = true;
}

if (nbnd_id != bnd_ids_device.size()) {
bnd_ids_device = ParArray1D<BndId>("bnd_id", nbnd_id);
bnd_ids_host = Kokkos::create_mirror_view(bnd_ids_device);
Expand All @@ -79,14 +89,17 @@ ParArray1D<BndId> &CoalescedBuffer::GetBndIdsOnDevice(const std::set<Uid_t> &var

const bool alloc =
(buf_state == BufferState::sending) || (buf_state == BufferState::received);

// Test if this boundary has changed
if (!bid_h.SameBVChannel(bnd_id) || (bid_h.buf_allocated != alloc) ||
(bid_h.start_idx() != c_buf_idx) ||
!UsingSameResource(bid_h.buf, pvbbuf->buffer())) {
!UsingSameResource(bid_h.buf, pvbbuf->buffer()) ||
bid_h.coalesced_buf.data() != coalesced_comm_buffer.buffer().data()) {
updated = true;
bid_h = bnd_id;
bid_h.buf_allocated = alloc;
bid_h.start_idx() = c_buf_idx;
bid_h.coalesced_buf = coalesced_comm_buffer.buffer();
if (bid_h.buf_allocated) bid_h.buf = pvbbuf->buffer();
}
if (bid_h.buf_allocated) c_buf_idx += bid_h.size();
Expand All @@ -101,7 +114,8 @@ ParArray1D<BndId> &CoalescedBuffer::GetBndIdsOnDevice(const std::set<Uid_t> &var
void CoalescedBuffer::PackAndSend(const std::set<Uid_t> &vars) {
PARTHENON_REQUIRE(coalesced_comm_buffer.IsAvailableForWrite(),
"Trying to write to a buffer that is in use.");
auto &bids = GetBndIdsOnDevice(vars);
int comb_size;
auto &bids = GetBndIdsOnDevice(vars, &comb_size);
Kokkos::parallel_for(
PARTHENON_AUTO_LABEL,
Kokkos::TeamPolicy<>(parthenon::DevExecSpace(), bids.size(), Kokkos::AUTO),
Expand All @@ -118,7 +132,7 @@ void CoalescedBuffer::PackAndSend(const std::set<Uid_t> &vars) {
#ifdef MPI_PARALLEL
Kokkos::fence();
#endif
coalesced_comm_buffer.Send();
coalesced_comm_buffer.Send(false, comb_size);

// Send the sparse null info as well
if (bids.size() != sparse_status_buffer.buffer().size()) {
Expand Down
4 changes: 3 additions & 1 deletion src/bvals/comms/coalesced_buffers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <memory>
#include <set>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>

Expand Down Expand Up @@ -91,7 +92,8 @@ struct CoalescedBuffer {
coalesced_comm_buffer.IsAvailableForWrite();
}

ParArray1D<BndId> &GetBndIdsOnDevice(const std::set<Uid_t> &vars);
ParArray1D<BndId> &GetBndIdsOnDevice(const std::set<Uid_t> &vars,
int *pcomb_size = nullptr);

void PackAndSend(const std::set<Uid_t> &vars);

Expand Down
11 changes: 6 additions & 5 deletions src/utils/communication_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class CommBuffer {

BufferState GetState() { return *state_; }

void Send(bool local = false) noexcept;
void Send(bool local = false, int size = -1) noexcept;
void SendNull(bool local = false) noexcept;

bool IsAvailableForWrite();
Expand Down Expand Up @@ -235,7 +235,7 @@ CommBuffer<T> &CommBuffer<T>::operator=(const CommBuffer<U> &in) {
}

template <class T>
void CommBuffer<T>::Send(bool local) noexcept {
void CommBuffer<T>::Send(bool local, int size) noexcept {
if (!active_) {
SendNull(local);
return;
Expand All @@ -251,10 +251,11 @@ void CommBuffer<T>::Send(bool local) noexcept {
PARTHENON_REQUIRE(
buf_.size() > 0,
"Trying to send zero size buffer, which will be interpreted as sending_null.");
if (size < 0) size = buf_.size();
PARTHENON_REQUIRE(size <= buf_.size(), "Asking to send too much");
PARTHENON_MPI_CHECK(MPI_Wait(my_request_.get(), MPI_STATUS_IGNORE));
PARTHENON_MPI_CHECK(MPI_Isend(buf_.data(), buf_.size(),
MPITypeMap<buf_base_t>::type(), recv_rank_, tag_, comm_,
my_request_.get()));
PARTHENON_MPI_CHECK(MPI_Isend(buf_.data(), size, MPITypeMap<buf_base_t>::type(),
recv_rank_, tag_, comm_, my_request_.get()));
#endif
}
if (*comm_type_ == BuffCommType::receiver) {
Expand Down

0 comments on commit 08d9c13

Please sign in to comment.