Skip to content
This repository has been archived by the owner on Aug 5, 2022. It is now read-only.

Commit

Permalink
fix issues of async sgd
Browse files Browse the repository at this point in the history
  • Loading branch information
fzou1 committed Mar 8, 2018
1 parent 751d129 commit 318da3d
Show file tree
Hide file tree
Showing 9 changed files with 160 additions and 80 deletions.
1 change: 1 addition & 0 deletions include/caffe/multinode/multi_solver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class MultiSolver {
protected:
virtual ~Callback() {
}

virtual void on_backward_finished(int layer_id) = 0;
virtual void on_delwt_wait(int layer_id) = 0;
virtual void apply_updates(int layer_id) = 0;
Expand Down
180 changes: 121 additions & 59 deletions include/caffe/multinode/multi_sync.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,9 @@ namespace caffe {
}

void apply_updates(int layer_id) {
if (mn::use_param_server())
return;

std::vector<int> &param_ids = layer_param_ids[layer_id];
for (int i = 0; i < param_ids.size(); ++i) {
solver->root_solver()->ApplyUpdate(param_ids[i]);
Expand Down Expand Up @@ -274,25 +277,44 @@ namespace caffe {
}
}

void launch_reduce(int layer_id, int param_id) {
void launch_reduce(int layer_id) {
mn::Distribution& distrib = layers[layer_id]->GetDistribution();
Dtype* send_buff = NULL;
Dtype* recv_buff = NULL;
size_t buf_size = net_params[param_id]->count();
if (CAN_USE_PRV_DIFF(net_params[param_id])) {
send_buff = (Dtype*)net_params[param_id]->prv_diff();
recv_buff = net_params[param_id]->mutable_prv_diff();
}
else {
send_buff = (Dtype*)net_params[param_id]->cpu_diff();
recv_buff = net_params[param_id]->mutable_cpu_diff();
}
reduce_req_vec[param_id] =
distrib.reduce_async<Dtype,MLSL::ReductionType::RT_SUM,MLSL::GroupType::GT_DATA>(
send_buff, recv_buff, buf_size);
if (reduce_req_vec[param_id] != NULL && distrib.is_root(MLSL::GroupType::GT_DATA)) {
AsyncTask req_task(layer_id, param_id, NULL);
reduce_req_list.push_back(req_task);
boost::shared_ptr<Layer<Dtype>> &layer = layers[layer_id];
std::vector<int> &param_ids = layer_param_ids[layer_id];
// TODO: descending is faster?
for (int i = param_ids.size() - 1; i >= 0; --i) {
if (!layer->ParamNeedReduce(i)) continue;
int param_id = param_ids[i];

if (mn::get_nodes_count() > 1) {
Dtype* send_buff = NULL;
Dtype* recv_buff = NULL;
size_t buf_size = net_params[param_id]->count();
if (CAN_USE_PRV_DIFF(net_params[param_id])) {
send_buff = (Dtype*)net_params[param_id]->prv_diff();
recv_buff = net_params[param_id]->mutable_prv_diff();
}
else {
send_buff = (Dtype*)net_params[param_id]->cpu_diff();
recv_buff = net_params[param_id]->mutable_cpu_diff();
}
reduce_req_vec[param_id] =
distrib.reduce_async<Dtype,MLSL::ReductionType::RT_SUM,MLSL::GroupType::GT_DATA>(
send_buff, recv_buff, buf_size);
bool complete = false;
DLOG(INFO) << "Launch reduce with param id " << param_id;
// progress communication ASAP
MLSL::Environment::GetEnv().Test(reduce_req_vec[param_id], &complete);
if (complete) {
// reset req to indicate no need to do Wait
reduce_req_vec[param_id] = NULL;
}
}

if (distrib.is_root(MLSL::GroupType::GT_DATA)) {
AsyncTask req_task(layer_id, param_id, NULL);
reduce_req_list.push_back(req_task);
}
}
}

Expand All @@ -306,13 +328,17 @@ namespace caffe {
else {
MLSL::Environment::GetEnv().Test(reduce_req_vec[iter->param_id], &complete);
}

if (complete) {
// reset req to indicate no need to do Wait
reduce_req_vec[iter->param_id] = NULL;

int param_id = iter->param_id;
DLOG(INFO) << "Finish reduce on param id " << param_id
<< " value " << net_params[param_id]->sumsq_diff();

void* send_buff;
void* recv_buff;
int param_id = iter->param_id;
size_t buf_size = net_params[param_id]->count();

if (CAN_USE_PRV_DIFF(net_params[param_id] ) ) {
Expand All @@ -327,6 +353,7 @@ namespace caffe {
else {
recv_buff = (void*)net_params[param_id]->mutable_cpu_data();
}

mn::Distribution &distrib = layers[iter->layer_id]->GetDistribution();
int server_mpi_rank = mn::param_to_server_rank(iter->layer_id, iter->param_id);
mn::TaskRequest task(
Expand All @@ -336,25 +363,35 @@ namespace caffe {
int tag = task.GetTag();
MPI_Request send_req;
int recv_flag = 1;
// recv from PS
// recv from PS
MPI_Irecv(recv_buff, buf_size, mn::DtypeToMPIDtype<Dtype>(),
server_mpi_rank, tag, MPI_COMM_WORLD, &irecv_req_vec[param_id]);
MPI_Test(&irecv_req_vec[param_id], &recv_flag, MPI_STATUS_IGNORE);
CHECK(!recv_flag);
DLOG(INFO) << "Receive data of param id " << param_id
<< " from PS (rank " << server_mpi_rank << " and tag " << tag
<< ")";
// Send to PS
MPI_Isend(send_buff, buf_size, mn::DtypeToMPIDtype<Dtype>(),
server_mpi_rank, tag, MPI_COMM_WORLD, &send_req);
// TODO: why do we have to wait here?
MPI_Wait(&send_req, MPI_STATUS_IGNORE);

DLOG(INFO) << "Send to PS of param id " << param_id
<< " to PS (rank " << server_mpi_rank << " and tag " << tag
<< ")";

irecv_req_list.push_back(task);
iter = reduce_req_list.erase(iter);
}
else iter++;
}
}

void launch_param_broadcast(int layer_id, int param_id) {
void launch_param_broadcast(int param_id) {
if (mn::get_nodes_count() <= 1)
return;

Dtype* buff;
if (CAN_USE_PRV_DATA(net_params[param_id])) {
if (distrib_bcast->is_root(MLSL::GroupType::GT_DATA))
Expand All @@ -369,6 +406,7 @@ namespace caffe {
buff = net_params[param_id]->mutable_cpu_data();
}
size_t buf_size = net_params[param_id]->count();
DLOG(INFO) << "Launch broadcast request with param id " << param_id;
broadcast_req_vec[param_id] =
distrib_bcast->bcast_async<Dtype,MLSL::GroupType::GT_DATA>(buff, buf_size);
}
Expand All @@ -384,6 +422,8 @@ namespace caffe {
MPI_Test(&irecv_req_vec[param_id], &flag, MPI_STATUS_IGNORE);
}
if (flag) {
DLOG(INFO) << " Root node received model param id " << param_id
<< " value " << net_params[param_id]->sumsq_data();
irecv_req_vec[param_id] = MPI_REQUEST_NULL;
irecv_done[param_id] = true;
iter = irecv_req_list.erase(iter);
Expand All @@ -400,7 +440,7 @@ namespace caffe {
int param_id = layer_param_ids[i][j];
if (!broadcast_launched[param_id]) {
if (irecv_done[param_id]) {
launch_param_broadcast(i, param_id);
launch_param_broadcast(param_id);
broadcast_launched[param_id] = true;
} else return;
}
Expand All @@ -410,61 +450,83 @@ namespace caffe {

void on_backward_finished(int layer_id) {
boost::shared_ptr<Layer<Dtype>> &layer = layers[layer_id];
if (layer->layerOp == nullptr) {
return;
}

if (mn::use_param_server()) {
std::vector<int> &param_ids = layer_param_ids[layer_id];
// TODO: descending is faster?
for (int i = param_ids.size() - 1; i >= 0; --i) {
if (!layer->ParamNeedReduce(i)) continue;
launch_reduce(layer_id, param_ids[i]);
mn::Distribution &distrib = layer->GetDistribution();
if (distrib.is_root(MLSL::GroupType::GT_DATA)) {
check_and_launch_comm_to_ps();
check_and_launch_broadcast();
} else {
launch_param_broadcast(layer_id, param_ids[i]);
launch_reduce(layer_id);

mn::Distribution &distrib = layer->GetDistribution();
if (distrib.is_root(MLSL::GroupType::GT_DATA)) {
check_and_launch_comm_to_ps();
check_and_launch_broadcast();
} else {
std::vector<int> &param_ids = layer_param_ids[layer_id];
for (int i=param_ids.size() - 1; i >= 0; --i) {
launch_param_broadcast(param_ids[i]);
}
}
} else {
if (layer->layerOp == nullptr) {
return;
}
launch_allreduce(layer_id);
}
}



void delwt_wait_ps(int layer_id) {
mn::Distribution &distrib = layers[layer_id]->GetDistribution();

std::vector<int> &param_ids = layer_param_ids[layer_id];
if (distrib.is_root(MLSL::GroupType::GT_DATA)) {
std::vector<int> &param_ids = layer_param_ids[layer_id];
// TODO: can we start comm with ps earlier? Per-layer data would be inconsistent then.
check_and_launch_comm_to_ps();
check_and_launch_broadcast();
for (int i = param_ids.size() - 1; i >= 0; i--) {
int param_id = param_ids[i];
// wait for reduce
if (reduce_req_vec[param_id] != NULL) {
MLSL::Environment::GetEnv().Wait(reduce_req_vec[param_id]);
}
reduce_req_vec[param_id] = NULL;
// wait for new param from param server
if (irecv_req_vec[param_id] != MPI_REQUEST_NULL) {
MPI_Wait(&irecv_req_vec[param_id], MPI_STATUS_IGNORE);
// the req is set to MPI_Request_NULL indicating the request is already finished
irecv_req_vec[param_id] = MPI_REQUEST_NULL;
}
irecv_done[param_id] = false;
// wait for the completion of broadcast
if (broadcast_req_vec[param_id] != NULL) {
MLSL::Environment::GetEnv().Wait(broadcast_req_vec[param_id]);
broadcast_req_vec[param_id] = NULL;
bool bcast_launched = false;
while (!bcast_launched) {
// TODO: can we start comm with ps earlier? Per-layer data would be inconsistent then.
check_and_launch_comm_to_ps();
check_and_launch_broadcast();
bcast_launched = true;
for (int i = param_ids.size() - 1; i >= 0; i--) {
bcast_launched = bcast_launched & broadcast_launched[param_ids[i]];
}
broadcast_launched[param_id] = false;
}
}

#ifdef FW_OVERLAP_OPT
solver->set_layer_finished_flag(layer_id, true);
#endif

for (int i = param_ids.size() - 1; i >= 0; i--) {
int param_id = param_ids[i];
DLOG(INFO) << " Wait reduce layer id " << layer_id << " param id " << param_id;
// wait for reduce
if (reduce_req_vec[param_id] != NULL) {
MLSL::Environment::GetEnv().Wait(reduce_req_vec[param_id]);
}
reduce_req_vec[param_id] = NULL;

DLOG(INFO) << "Worker (iter " << solver->root_solver()->iter() << "): param id=" << param_id << " weight=" << net_params[param_id]->sumsq_diff();

DLOG(INFO) << " Wait recv layer id " << layer_id << " param id " << param_id;

// wait for new param from param server
if (irecv_req_vec[param_id] != MPI_REQUEST_NULL) {
MPI_Wait(&irecv_req_vec[param_id], MPI_STATUS_IGNORE);
// the req is set to MPI_Request_NULL indicating the request is already finished
irecv_req_vec[param_id] = MPI_REQUEST_NULL;
}
irecv_done[param_id] = false;

DLOG(INFO) << " Wait broadcast layer id " << layer_id << " param id " << param_id;

// wait for the completion of broadcast
if (broadcast_req_vec[param_id] != NULL) {
MLSL::Environment::GetEnv().Wait(broadcast_req_vec[param_id]);
broadcast_req_vec[param_id] = NULL;
}
broadcast_launched[param_id] = false;

DLOG(INFO) << "Worker (iter " << solver->root_solver()->iter() << "): param id=" << param_id << " data=" << net_params[param_id]->sumsq_data();
}
}

void delwt_wait_no_ps(int layer_id) {
Expand Down
1 change: 0 additions & 1 deletion python/caffe/_caffe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,6 @@ void Solver_add_callback(Solver<Dtype> * solver, bp::object on_start,
}

#ifdef USE_MLSL

template<typename Dtype>
void MultiSolverBackward(MultiSolver<Dtype> * solver)
{
Expand Down
17 changes: 10 additions & 7 deletions src/caffe/multinode/async_param_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

namespace caffe {
namespace mn {

using std::make_pair;

template <typename Dtype>
Expand All @@ -63,16 +62,16 @@ namespace caffe {
int mpi_rank = get_node_rank();
shared_ptr<Net<Dtype>> net = solver_->net();
const vector<Blob<Dtype> *> &net_params = net->learnable_params();

const std::vector<bool>& layer_need_backward{ net->layer_need_backward() };

for (int i = 0; i < get_num_groups(); i++) {
int root_rank = get_group_root_rank(i);
//iterate over layers and skip the ones without params
for (int j = 0; j < net->layers().size(); j++) {
shared_ptr<Layer<Dtype>> layer = net->layers()[j];
//skip layers w/o parameters
if ((layer->layerOp == nullptr) || !(layer->layerOp->HasParameterSets())) {
continue;
}
if (!layer_need_backward[j])
continue;

const MultinodeLayerParameter & mn_layer_param = layer->layer_param().multinode();
int model_parts = mn_layer_param.model_parts();
int mn_num_nodes = mn_layer_param.num_nodes();
Expand Down Expand Up @@ -158,14 +157,18 @@ namespace caffe {
// apply update
int blob_wise_iter = async_iter_[make_pair(task.param_id_, task.part_id_) ];
solver_->set_iter(blob_wise_iter);

// TODO: supports partial param update per model parts
solver_->ApplyUpdate(task.param_id_);

DLOG(INFO) << "PS (iter " << blob_wise_iter << "): param id=" << task.param_id_ << " weight=" << net_params[task.param_id_]->sumsq_diff();
DLOG(INFO) << "PS (iter " << blob_wise_iter << "): param id=" << task.param_id_ << " data=" << net_params[task.param_id_]->sumsq_data();

//clean up
solver_->net()->ClearParamDiffs(task.param_id_);
async_iter_[ make_pair(task.param_id_, task.part_id_) ] += 1;
update_cnt_ += 1;

// copy model(data) in solver to mpi buffer
mpi_buf = send_buf_[make_pair(root_rank, task.param_id_)].first;
caffe_copy(count / task.num_parts_,
Expand Down
9 changes: 5 additions & 4 deletions src/caffe/multinode/mlsl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,17 @@ namespace caffe {
distrib_map =
new std::map<std::pair<int,int>, boost::shared_ptr<Distribution>>();
if (use_param_server()) {
std::string config_str;
if (is_param_server()) {
// this is for paramter servers
MLSL::Environment::GetEnv().Configure("color=0");
// this is for parameter servers
config_str = "color=0";
}
else {
// this is for workers
int group_id = get_group_id();
std::string config_str = "color=" + std::to_string(group_id + 1);
MLSL::Environment::GetEnv().Configure(config_str.c_str());
config_str = "color=" + std::to_string(group_id + 1);
}
MLSL::Environment::GetEnv().Configure(config_str.c_str());
}
#ifdef ENABLE_WEIGHT_GRAD_COMPRESSION
get_weight_grad_compress_info();
Expand Down
Loading

0 comments on commit 318da3d

Please sign in to comment.