Skip to content

Commit

Permalink
Merge branch 'development' into bug-140-dart-memleaks
Browse files Browse the repository at this point in the history
  • Loading branch information
devreal committed Nov 23, 2016
2 parents 390ce12 + 42c48e6 commit 58b4656
Show file tree
Hide file tree
Showing 9 changed files with 323 additions and 23 deletions.
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@ after_script:
after_failure:
- cat ./dash-ci.err
- cat ./build-ci/*/*/build.log
after_success:
- bash <(curl -s https://codecov.io/bash)
77 changes: 77 additions & 0 deletions dart-if/v3.2/include/dash/dart/if/dart_communication.h
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,83 @@ dart_ret_t dart_put_blocking(
size_t nelem,
dart_datatype_t dtype);

/**
* DART Equivalent to MPI send.
*
* \param sendbuf Buffer containing the data to be sent by the unit.
* \param nelem Number of values sent to the specified unit.
* \param dtype The data type of values in \c sendbuf.
* \param tag Message tag for the distinction between different messages.
* \param unit Unit the message is sent to.
*
* \return \c DART_OK on success, any other of \ref dart_ret_t otherwise.
*
* \threadsafe_none
* \ingroup DartCommunication
*/
dart_ret_t dart_send(
void * sendbuf,
size_t nelem,
dart_datatype_t dtype,
int tag,
dart_unit_t unit);

/**
* DART Equivalent to MPI recv.
*
* \param recvbuf Buffer for the incoming data.
* \param nelem Number of values received by the unit
* \param dtype The data type of values in \c recvbuf.
* \param tag Message tag for the distinction between different messages.
* \param unit Unit sending the message.
*
* \return \c DART_OK on success, any other of \ref dart_ret_t otherwise.
*
* \threadsafe_none
* \ingroup DartCommunication
*/
dart_ret_t dart_recv(
void * recvbuf,
size_t nelem,
dart_datatype_t dtype,
int tag,
dart_unit_t unit);

/**
* DART Equivalent to MPI recv.
*
* \param sendbuf Buffer containing the data to be sent by the
* source unit.
* \param send_nelem Number of values sentby the source unit.
* \param send_dtype The data type of values in \c sendbuf.
* \param dest Unitthe message is sent to.
* \param send_tag Message tag for the distinction between different
* messages of the source unit.
* \param recvbuf Buffer for the incoming data.
* \param recv_nelem Number of values received by the destination unit.
* \param recv_dtype The data type of values in \c recvbuf.
* \param src Unit sending the message.
* \param recv_tag Message tag for the distinction between different
* messages of the destination unit.
*
* \return \c DART_OK on success, any other of \ref dart_ret_t otherwise.
*
* \threadsafe_none
* \ingroup DartCommunication
*/
dart_ret_t dart_sendrecv(
void * sendbuf,
size_t send_nelem,
dart_datatype_t send_dtype,
int send_tag,
dart_unit_t dest,
void * recvbuf,
size_t recv_nelem,
dart_datatype_t recv_dtype,
int recv_tag,
dart_unit_t src);


/** \} */

/** \cond DART_HIDDEN_SYMBOLS */
Expand Down
101 changes: 101 additions & 0 deletions dart-impl/mpi/src/dart_communication.c
Original file line number Diff line number Diff line change
Expand Up @@ -1792,3 +1792,104 @@ dart_ret_t dart_reduce(
}
return DART_OK;
}

dart_ret_t dart_send(
void * sendbuf,
size_t nelem,
dart_datatype_t dtype,
int tag,
dart_unit_t unit)
{
MPI_Comm comm;
MPI_Datatype mpi_dtype = dart_mpi_datatype(dtype);
dart_team_t team = DART_TEAM_ALL;
uint16_t index;
int result = dart_adapt_teamlist_convert(team, &index);
if(result == -1) {
return DART_ERR_INVAL;
}
comm = dart_team_data[index].comm;
// dart_unit = MPI rank in comm_world
if(MPI_Send(
sendbuf,
nelem,
mpi_dtype,
unit,
tag,
comm) != MPI_SUCCESS) {
return DART_ERR_INVAL;
}
return DART_OK;
}

dart_ret_t dart_recv(
void * recvbuf,
size_t nelem,
dart_datatype_t dtype,
int tag,
dart_unit_t unit)
{
MPI_Comm comm;
MPI_Datatype mpi_dtype = dart_mpi_datatype(dtype);
dart_team_t team = DART_TEAM_ALL;
uint16_t index;
int result = dart_adapt_teamlist_convert(team, &index);
if(result == -1) {
return DART_ERR_INVAL;
}
comm = dart_team_data[index].comm;
// dart_unit = MPI rank in comm_world
if(MPI_Recv(
recvbuf,
nelem,
mpi_dtype,
unit,
tag,
comm,
MPI_STATUS_IGNORE) != MPI_SUCCESS) {
return DART_ERR_INVAL;
}
return DART_OK;
}

dart_ret_t dart_sendrecv(
void * sendbuf,
size_t send_nelem,
dart_datatype_t send_dtype,
int send_tag,
dart_unit_t dest,
void * recvbuf,
size_t recv_nelem,
dart_datatype_t recv_dtype,
int recv_tag,
dart_unit_t src)
{
MPI_Comm comm;
MPI_Datatype mpi_send_dtype = dart_mpi_datatype(send_dtype);
MPI_Datatype mpi_recv_dtype = dart_mpi_datatype(recv_dtype);
dart_team_t team = DART_TEAM_ALL;
uint16_t index;
int result = dart_adapt_teamlist_convert(team, &index);
if(result == -1) {
return DART_ERR_INVAL;
}
comm = dart_team_data[index].comm;
if(MPI_Sendrecv(
sendbuf,
send_nelem,
mpi_send_dtype,
dest,
send_tag,
recvbuf,
recv_nelem,
mpi_recv_dtype,
src,
recv_tag,
comm,
MPI_STATUS_IGNORE) != MPI_SUCCESS) {
return DART_ERR_INVAL;
}
return DART_OK;
}


27 changes: 25 additions & 2 deletions dash/include/dash/algorithm/Copy.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
#include <memory>
#include <future>

#ifndef DASH__ALGORITHM__COPY__USE_WAIT
// #ifndef DASH__ALGORITHM__COPY__USE_WAIT
#define DASH__ALGORITHM__COPY__USE_FLUSH
#endif
// #define DASH__ALGORITHM__COPY__USE_WAIT
// #endif

namespace dash {

Expand Down Expand Up @@ -425,13 +426,24 @@ dash::Future<ValueType *> copy_async_impl(
num_elem_copied += num_copy_elem;
}
}
#ifdef DASH_ENABLE_TRACE_LOGGING
for (auto gptr : req_handles) {
DASH_LOG_TRACE("dash::copy_async_impl", " req_handle:", gptr);
}
#endif
dash::Future<ValueType *> result([=]() mutable {
// Wait for all get requests to complete:
ValueType * _out = out_first + num_elem_copied;
DASH_LOG_TRACE("dash::copy_async_impl [Future]()",
" wait for", req_handles.size(), "async get request");
DASH_LOG_TRACE("dash::copy_async_impl [Future]", " flush:", req_handles);
DASH_LOG_TRACE("dash::copy_async_impl [Future]", " _out:", _out);
#ifdef DASH_ENABLE_TRACE_LOGGING
for (auto gptr : req_handles) {
DASH_LOG_TRACE("dash::copy_async_impl [Future]", " req_handle:",
gptr);
}
#endif
#ifdef DASH__ALGORITHM__COPY__USE_FLUSH
for (auto gptr : req_handles) {
dart_flush_local_all(gptr);
Expand Down Expand Up @@ -552,13 +564,24 @@ dash::Future<GlobOutputIt> copy_async_impl(
}
#endif

#ifdef DASH_ENABLE_TRACE_LOGGING
for (auto gptr : req_handles) {
DASH_LOG_TRACE("dash::copy_async_impl", " req_handle:", gptr);
}
#endif
dash::Future<GlobOutputIt> result([=]() mutable {
// Wait for all get requests to complete:
GlobOutputIt _out = out_first + num_copy_elem;
DASH_LOG_TRACE("dash::copy_async_impl [Future]()",
" wait for", req_handles.size(), "async put request");
DASH_LOG_TRACE("dash::copy_async_impl [Future]", " flush:", req_handles);
DASH_LOG_TRACE("dash::copy_async_impl [Future]", " _out:", _out);
#ifdef DASH_ENABLE_TRACE_LOGGING
for (auto gptr : req_handles) {
DASH_LOG_TRACE("dash::copy_async_impl [Future]", " req_handle:",
gptr);
}
#endif
#ifdef DASH__ALGORITHM__COPY__USE_FLUSH
for (auto gptr : req_handles) {
dart_flush_all(gptr);
Expand Down
2 changes: 1 addition & 1 deletion dash/scripts/docker-testing/openmpi2/dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ ENV PATH=${PATH}:/opt/openmpi/bin
ENV MPI_EXEC_FLAGS='--allow-run-as-root'
ENV VERBOSE_CI='true'
# Workaround for issue #63
ENV GTEST_FILTER="-SharedTest.AtomicAdd:TransformTest.Array*:CopyTest.AsyncLocalToGlobPtr"
ENV GTEST_FILTER="-SharedTest.AtomicAdd:TransformTest.Array*"

# Set workdir to dash home
WORKDIR /opt/dash
8 changes: 5 additions & 3 deletions dash/test/CopyTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ TEST_F(CopyTest, BlockingLocalToGlobalBlock)
TEST_F(CopyTest, AsyncLocalToGlobPtr)
{
// Copy all elements contained in a single, continuous block.
const int num_elem_per_unit = 20;
const int num_elem_per_unit = 50;
size_t num_elem_total = _dash_size * num_elem_per_unit;

// Global target range:
Expand All @@ -396,16 +396,17 @@ TEST_F(CopyTest, AsyncLocalToGlobPtr)

// Assign initial values: [ 1000, 1001, 1002, ... 2000, 2001, ... ]
for (auto l = 0; l < num_elem_per_unit; ++l) {
array.local[l] = 1000000 + l;
array.local[l] = ((dash::myid() + 1) * 110000) + l;
local_range[l] = ((dash::myid() + 1) * 1000) + l;
}
array.barrier();

// Copy values from local range to remote global range.
// All units (u) copy into block (nblocks-1-u), so unit 0 copies into
// last block.
auto block_offset = _dash_size - 1 - dash::myid();
auto block_offset = (dash::myid() + 1) % dash::size();
auto global_offset = block_offset * num_elem_per_unit;

dash::GlobPtr<int> gptr_dest((array.begin() + global_offset).dart_gptr());
auto copy_fut = dash::copy_async(local_range,
local_range + num_elem_per_unit,
Expand All @@ -419,6 +420,7 @@ TEST_F(CopyTest, AsyncLocalToGlobPtr)
EXPECT_EQ_U(local_range[l],
static_cast<int>(array[global_offset + l]));
}
array.barrier();
}

TEST_F(CopyTest, BlockingGlobalToLocalSubBlock)
Expand Down
51 changes: 51 additions & 0 deletions dash/test/DARTCollectiveTest.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#include <libdash.h>
#include <gtest/gtest.h>
#include "TestBase.h"
#include "DARTCollectiveTest.h"

TEST_F(DARTCollectiveTest, Send_Recv) {
// we need an even amount of participating units
const int units = (_dash_size / 2) * 2;

std::vector<int> data(units);
for(int i = 0; i < units; ++i) {
data[i] = i;
}

// only use non-excess units
if(_dash_id < units) {
// every other unit sends data to the next unit
if(_dash_id % 2 == 0) {
dart_unit_t send_to = _dash_id + 1;
dart_send(&data[_dash_id], 1, DART_TYPE_INT, 0, send_to);
} else {
int recv;
dart_unit_t recv_from = _dash_id - 1;
dart_recv(&recv, 1, DART_TYPE_INT, 0, recv_from);
ASSERT_EQ(recv, data[recv_from]);
}
}
}

TEST_F(DARTCollectiveTest, Sendrecv) {
const int units = (_dash_size / 2) * 2;

std::vector<int> data(units);
for(int i = 0; i < units; ++i) {
data[i] = i;
}

if(_dash_id < units) {
int recv;
dart_unit_t partner;
if(_dash_id % 2 ==0) {
partner = _dash_id + 1;
} else {
partner = _dash_id - 1;
}
// each pair of units send data to each other
dart_sendrecv(&data[_dash_id], 1, DART_TYPE_INT, 0, partner,
&recv, 1, DART_TYPE_INT, 0, partner);
ASSERT_EQ(recv, data[partner]);
}
}
39 changes: 39 additions & 0 deletions dash/test/DARTCollectiveTest.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#ifndef DASH__TEST__DART_ONESIDED_TEST_H_
#define DASH__TEST__DART_ONESIDED_TEST_H_

#include <gtest/gtest.h>
#include <libdash.h>

/**
* Test fixture for onesided operations provided by DART.
*/
class DARTCollectiveTest : public ::testing::Test {
protected:
size_t _dash_id;
size_t _dash_size;

DARTCollectiveTest()
: _dash_id(0),
_dash_size(0) {
LOG_MESSAGE(">>> Test suite: DARTCollectiveTest");
}

virtual ~DARTCollectiveTest() {
LOG_MESSAGE("<<< Closing test suite: DARTCollectiveTest");
}

virtual void SetUp() {
_dash_id = dash::myid();
_dash_size = dash::size();
LOG_MESSAGE("===> Running test case with %d units ...",
_dash_size);
}

virtual void TearDown() {
dash::Team::All().barrier();
LOG_MESSAGE("<=== Finished test case with %d units",
_dash_size);
}
};

#endif // DASH__TEST__DART_ONESIDED_TEST_H_
Loading

0 comments on commit 58b4656

Please sign in to comment.