diff --git a/include/caffe/multinode/multi_solver.hpp b/include/caffe/multinode/multi_solver.hpp index a601e648a..c7de0d0cc 100644 --- a/include/caffe/multinode/multi_solver.hpp +++ b/include/caffe/multinode/multi_solver.hpp @@ -73,6 +73,7 @@ class MultiSolver { protected: virtual ~Callback() { } + virtual void on_backward_finished(int layer_id) = 0; virtual void on_delwt_wait(int layer_id) = 0; virtual void apply_updates(int layer_id) = 0; diff --git a/include/caffe/multinode/multi_sync.hpp b/include/caffe/multinode/multi_sync.hpp index 74dbfb541..79d88b328 100644 --- a/include/caffe/multinode/multi_sync.hpp +++ b/include/caffe/multinode/multi_sync.hpp @@ -239,6 +239,9 @@ namespace caffe { } void apply_updates(int layer_id) { + if (mn::use_param_server()) + return; + std::vector ¶m_ids = layer_param_ids[layer_id]; for (int i = 0; i < param_ids.size(); ++i) { solver->root_solver()->ApplyUpdate(param_ids[i]); @@ -274,25 +277,44 @@ namespace caffe { } } - void launch_reduce(int layer_id, int param_id) { + void launch_reduce(int layer_id) { mn::Distribution& distrib = layers[layer_id]->GetDistribution(); - Dtype* send_buff = NULL; - Dtype* recv_buff = NULL; - size_t buf_size = net_params[param_id]->count(); - if (CAN_USE_PRV_DIFF(net_params[param_id])) { - send_buff = (Dtype*)net_params[param_id]->prv_diff(); - recv_buff = net_params[param_id]->mutable_prv_diff(); - } - else { - send_buff = (Dtype*)net_params[param_id]->cpu_diff(); - recv_buff = net_params[param_id]->mutable_cpu_diff(); - } - reduce_req_vec[param_id] = - distrib.reduce_async( - send_buff, recv_buff, buf_size); - if (reduce_req_vec[param_id] != NULL && distrib.is_root(MLSL::GroupType::GT_DATA)) { - AsyncTask req_task(layer_id, param_id, NULL); - reduce_req_list.push_back(req_task); + boost::shared_ptr> &layer = layers[layer_id]; + std::vector ¶m_ids = layer_param_ids[layer_id]; + // TODO: descending is faster? + for (int i = param_ids.size() - 1; i >= 0; --i) { + if (!layer->ParamNeedReduce(i)) continue; + int param_id = param_ids[i]; + + if (mn::get_nodes_count() > 1) { + Dtype* send_buff = NULL; + Dtype* recv_buff = NULL; + size_t buf_size = net_params[param_id]->count(); + if (CAN_USE_PRV_DIFF(net_params[param_id])) { + send_buff = (Dtype*)net_params[param_id]->prv_diff(); + recv_buff = net_params[param_id]->mutable_prv_diff(); + } + else { + send_buff = (Dtype*)net_params[param_id]->cpu_diff(); + recv_buff = net_params[param_id]->mutable_cpu_diff(); + } + reduce_req_vec[param_id] = + distrib.reduce_async( + send_buff, recv_buff, buf_size); + bool complete = false; + DLOG(INFO) << "Launch reduce with param id " << param_id; + // progress communication ASAP + MLSL::Environment::GetEnv().Test(reduce_req_vec[param_id], &complete); + if (complete) { + // reset req to indicate no need to do Wait + reduce_req_vec[param_id] = NULL; + } + } + + if (distrib.is_root(MLSL::GroupType::GT_DATA)) { + AsyncTask req_task(layer_id, param_id, NULL); + reduce_req_list.push_back(req_task); + } } } @@ -306,13 +328,17 @@ namespace caffe { else { MLSL::Environment::GetEnv().Test(reduce_req_vec[iter->param_id], &complete); } + if (complete) { // reset req to indicate no need to do Wait reduce_req_vec[iter->param_id] = NULL; + int param_id = iter->param_id; + DLOG(INFO) << "Finish reduce on param id " << param_id + << " value " << net_params[param_id]->sumsq_diff(); + void* send_buff; void* recv_buff; - int param_id = iter->param_id; size_t buf_size = net_params[param_id]->count(); if (CAN_USE_PRV_DIFF(net_params[param_id] ) ) { @@ -327,6 +353,7 @@ namespace caffe { else { recv_buff = (void*)net_params[param_id]->mutable_cpu_data(); } + mn::Distribution &distrib = layers[iter->layer_id]->GetDistribution(); int server_mpi_rank = mn::param_to_server_rank(iter->layer_id, iter->param_id); mn::TaskRequest task( @@ -336,17 +363,24 @@ namespace caffe { int tag = task.GetTag(); MPI_Request send_req; int recv_flag = 1; - // recv from PS + // recv from PS MPI_Irecv(recv_buff, buf_size, mn::DtypeToMPIDtype(), server_mpi_rank, tag, MPI_COMM_WORLD, &irecv_req_vec[param_id]); MPI_Test(&irecv_req_vec[param_id], &recv_flag, MPI_STATUS_IGNORE); CHECK(!recv_flag); + DLOG(INFO) << "Receive data of param id " << param_id + << " from PS (rank " << server_mpi_rank << " and tag " << tag + << ")"; // Send to PS MPI_Isend(send_buff, buf_size, mn::DtypeToMPIDtype(), server_mpi_rank, tag, MPI_COMM_WORLD, &send_req); // TODO: why do we have to wait here? MPI_Wait(&send_req, MPI_STATUS_IGNORE); + DLOG(INFO) << "Send to PS of param id " << param_id + << " to PS (rank " << server_mpi_rank << " and tag " << tag + << ")"; + irecv_req_list.push_back(task); iter = reduce_req_list.erase(iter); } @@ -354,7 +388,10 @@ namespace caffe { } } - void launch_param_broadcast(int layer_id, int param_id) { + void launch_param_broadcast(int param_id) { + if (mn::get_nodes_count() <= 1) + return; + Dtype* buff; if (CAN_USE_PRV_DATA(net_params[param_id])) { if (distrib_bcast->is_root(MLSL::GroupType::GT_DATA)) @@ -369,6 +406,7 @@ namespace caffe { buff = net_params[param_id]->mutable_cpu_data(); } size_t buf_size = net_params[param_id]->count(); + DLOG(INFO) << "Launch broadcast request with param id " << param_id; broadcast_req_vec[param_id] = distrib_bcast->bcast_async(buff, buf_size); } @@ -384,6 +422,8 @@ namespace caffe { MPI_Test(&irecv_req_vec[param_id], &flag, MPI_STATUS_IGNORE); } if (flag) { + DLOG(INFO) << " Root node received model param id " << param_id + << " value " << net_params[param_id]->sumsq_data(); irecv_req_vec[param_id] = MPI_REQUEST_NULL; irecv_done[param_id] = true; iter = irecv_req_list.erase(iter); @@ -400,7 +440,7 @@ namespace caffe { int param_id = layer_param_ids[i][j]; if (!broadcast_launched[param_id]) { if (irecv_done[param_id]) { - launch_param_broadcast(i, param_id); + launch_param_broadcast(param_id); broadcast_launched[param_id] = true; } else return; } @@ -410,61 +450,83 @@ namespace caffe { void on_backward_finished(int layer_id) { boost::shared_ptr> &layer = layers[layer_id]; - if (layer->layerOp == nullptr) { - return; - } if (mn::use_param_server()) { - std::vector ¶m_ids = layer_param_ids[layer_id]; - // TODO: descending is faster? - for (int i = param_ids.size() - 1; i >= 0; --i) { - if (!layer->ParamNeedReduce(i)) continue; - launch_reduce(layer_id, param_ids[i]); - mn::Distribution &distrib = layer->GetDistribution(); - if (distrib.is_root(MLSL::GroupType::GT_DATA)) { - check_and_launch_comm_to_ps(); - check_and_launch_broadcast(); - } else { - launch_param_broadcast(layer_id, param_ids[i]); + launch_reduce(layer_id); + + mn::Distribution &distrib = layer->GetDistribution(); + if (distrib.is_root(MLSL::GroupType::GT_DATA)) { + check_and_launch_comm_to_ps(); + check_and_launch_broadcast(); + } else { + std::vector ¶m_ids = layer_param_ids[layer_id]; + for (int i=param_ids.size() - 1; i >= 0; --i) { + launch_param_broadcast(param_ids[i]); } } } else { + if (layer->layerOp == nullptr) { + return; + } launch_allreduce(layer_id); } } + + void delwt_wait_ps(int layer_id) { mn::Distribution &distrib = layers[layer_id]->GetDistribution(); + + std::vector ¶m_ids = layer_param_ids[layer_id]; if (distrib.is_root(MLSL::GroupType::GT_DATA)) { - std::vector ¶m_ids = layer_param_ids[layer_id]; - // TODO: can we start comm with ps earlier? Per-layer data would be inconsistent then. - check_and_launch_comm_to_ps(); - check_and_launch_broadcast(); - for (int i = param_ids.size() - 1; i >= 0; i--) { - int param_id = param_ids[i]; - // wait for reduce - if (reduce_req_vec[param_id] != NULL) { - MLSL::Environment::GetEnv().Wait(reduce_req_vec[param_id]); - } - reduce_req_vec[param_id] = NULL; - // wait for new param from param server - if (irecv_req_vec[param_id] != MPI_REQUEST_NULL) { - MPI_Wait(&irecv_req_vec[param_id], MPI_STATUS_IGNORE); - // the req is set to MPI_Request_NULL indicating the request is already finished - irecv_req_vec[param_id] = MPI_REQUEST_NULL; - } - irecv_done[param_id] = false; - // wait for the completion of broadcast - if (broadcast_req_vec[param_id] != NULL) { - MLSL::Environment::GetEnv().Wait(broadcast_req_vec[param_id]); - broadcast_req_vec[param_id] = NULL; + bool bcast_launched = false; + while (!bcast_launched) { + // TODO: can we start comm with ps earlier? Per-layer data would be inconsistent then. + check_and_launch_comm_to_ps(); + check_and_launch_broadcast(); + bcast_launched = true; + for (int i = param_ids.size() - 1; i >= 0; i--) { + bcast_launched = bcast_launched & broadcast_launched[param_ids[i]]; } - broadcast_launched[param_id] = false; } } + #ifdef FW_OVERLAP_OPT solver->set_layer_finished_flag(layer_id, true); #endif + + for (int i = param_ids.size() - 1; i >= 0; i--) { + int param_id = param_ids[i]; + DLOG(INFO) << " Wait reduce layer id " << layer_id << " param id " << param_id; + // wait for reduce + if (reduce_req_vec[param_id] != NULL) { + MLSL::Environment::GetEnv().Wait(reduce_req_vec[param_id]); + } + reduce_req_vec[param_id] = NULL; + + DLOG(INFO) << "Worker (iter " << solver->root_solver()->iter() << "): param id=" << param_id << " weight=" << net_params[param_id]->sumsq_diff(); + + DLOG(INFO) << " Wait recv layer id " << layer_id << " param id " << param_id; + + // wait for new param from param server + if (irecv_req_vec[param_id] != MPI_REQUEST_NULL) { + MPI_Wait(&irecv_req_vec[param_id], MPI_STATUS_IGNORE); + // the req is set to MPI_Request_NULL indicating the request is already finished + irecv_req_vec[param_id] = MPI_REQUEST_NULL; + } + irecv_done[param_id] = false; + + DLOG(INFO) << " Wait broadcast layer id " << layer_id << " param id " << param_id; + + // wait for the completion of broadcast + if (broadcast_req_vec[param_id] != NULL) { + MLSL::Environment::GetEnv().Wait(broadcast_req_vec[param_id]); + broadcast_req_vec[param_id] = NULL; + } + broadcast_launched[param_id] = false; + + DLOG(INFO) << "Worker (iter " << solver->root_solver()->iter() << "): param id=" << param_id << " data=" << net_params[param_id]->sumsq_data(); + } } void delwt_wait_no_ps(int layer_id) { diff --git a/python/caffe/_caffe.cpp b/python/caffe/_caffe.cpp index cea302f4b..2de7611ee 100644 --- a/python/caffe/_caffe.cpp +++ b/python/caffe/_caffe.cpp @@ -374,7 +374,6 @@ void Solver_add_callback(Solver * solver, bp::object on_start, } #ifdef USE_MLSL - template void MultiSolverBackward(MultiSolver * solver) { diff --git a/src/caffe/multinode/async_param_server.cpp b/src/caffe/multinode/async_param_server.cpp index e2c012616..36ae76099 100644 --- a/src/caffe/multinode/async_param_server.cpp +++ b/src/caffe/multinode/async_param_server.cpp @@ -50,7 +50,6 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. namespace caffe { namespace mn { - using std::make_pair; template @@ -63,16 +62,16 @@ namespace caffe { int mpi_rank = get_node_rank(); shared_ptr> net = solver_->net(); const vector *> &net_params = net->learnable_params(); - + const std::vector& layer_need_backward{ net->layer_need_backward() }; + for (int i = 0; i < get_num_groups(); i++) { int root_rank = get_group_root_rank(i); //iterate over layers and skip the ones without params for (int j = 0; j < net->layers().size(); j++) { shared_ptr> layer = net->layers()[j]; - //skip layers w/o parameters - if ((layer->layerOp == nullptr) || !(layer->layerOp->HasParameterSets())) { - continue; - } + if (!layer_need_backward[j]) + continue; + const MultinodeLayerParameter & mn_layer_param = layer->layer_param().multinode(); int model_parts = mn_layer_param.model_parts(); int mn_num_nodes = mn_layer_param.num_nodes(); @@ -158,14 +157,18 @@ namespace caffe { // apply update int blob_wise_iter = async_iter_[make_pair(task.param_id_, task.part_id_) ]; solver_->set_iter(blob_wise_iter); + // TODO: supports partial param update per model parts solver_->ApplyUpdate(task.param_id_); + DLOG(INFO) << "PS (iter " << blob_wise_iter << "): param id=" << task.param_id_ << " weight=" << net_params[task.param_id_]->sumsq_diff(); + DLOG(INFO) << "PS (iter " << blob_wise_iter << "): param id=" << task.param_id_ << " data=" << net_params[task.param_id_]->sumsq_data(); + //clean up solver_->net()->ClearParamDiffs(task.param_id_); async_iter_[ make_pair(task.param_id_, task.part_id_) ] += 1; update_cnt_ += 1; - + // copy model(data) in solver to mpi buffer mpi_buf = send_buf_[make_pair(root_rank, task.param_id_)].first; caffe_copy(count / task.num_parts_, diff --git a/src/caffe/multinode/mlsl.cpp b/src/caffe/multinode/mlsl.cpp index 90b6456a6..69b3f0a66 100644 --- a/src/caffe/multinode/mlsl.cpp +++ b/src/caffe/multinode/mlsl.cpp @@ -59,16 +59,17 @@ namespace caffe { distrib_map = new std::map, boost::shared_ptr>(); if (use_param_server()) { + std::string config_str; if (is_param_server()) { - // this is for paramter servers - MLSL::Environment::GetEnv().Configure("color=0"); + // this is for parameter servers + config_str = "color=0"; } else { // this is for workers int group_id = get_group_id(); - std::string config_str = "color=" + std::to_string(group_id + 1); - MLSL::Environment::GetEnv().Configure(config_str.c_str()); + config_str = "color=" + std::to_string(group_id + 1); } + MLSL::Environment::GetEnv().Configure(config_str.c_str()); } #ifdef ENABLE_WEIGHT_GRAD_COMPRESSION get_weight_grad_compress_info(); diff --git a/src/caffe/multinode/multi_solver.cpp b/src/caffe/multinode/multi_solver.cpp index 558176fcf..96705d9e2 100644 --- a/src/caffe/multinode/multi_solver.cpp +++ b/src/caffe/multinode/multi_solver.cpp @@ -108,7 +108,7 @@ inline bool MultiSolver::IsSkipSyncGradient(int layer_id) { if (!layer_need_backward[layer_id] || ((layers[layer_id]->layerOp != nullptr) && !layers[layer_id]->layerOp->HasParameterSets())) { - DLOG(INFO) << "No need for synchronizing gradients for layer # " << layer_id; + DLOG(INFO) << "No need for synchronizing gradients for layer #" << layer_id; return true; } return false; @@ -140,13 +140,13 @@ inline void MultiSolver::UpdateGradient(int layer_id) { } template -Dtype MultiSolver::ForwardFromTo(int start, int end) { +inline Dtype MultiSolver::ForwardFromTo(int start, int end) { Net& net = *root_solver_->net(); return net.ForwardFromTo(start, end); } template -void MultiSolver::BackwardFromTo(int start, int end, bool last) { +inline void MultiSolver::BackwardFromTo(int start, int end, bool last) { Net& net = *root_solver_->net(); const std::vector& layer_need_backward{ net.layer_need_backward() }; @@ -168,7 +168,7 @@ void MultiSolver::BackwardFromTo(int start, int end, bool last) { } template -void MultiSolver::Backward(bool last) { +inline void MultiSolver::Backward(bool last) { Net& net = *root_solver_->net(); const std::vector>>& layers{ net.layers() }; @@ -230,7 +230,7 @@ void MultiSolver::WaitAndUpdateBeforeFwd(int layer_id) { } template -Dtype MultiSolver::UpdateAndForward(bool first) { +inline Dtype MultiSolver::UpdateAndForward(bool first) { Dtype loss = 0; Net& net = *root_solver_->net(); const std::vector>>& layers{ net.layers() }; @@ -246,7 +246,7 @@ Dtype MultiSolver::UpdateAndForward(bool first) { #endif template -Dtype MultiSolver::Forward() { +inline Dtype MultiSolver::Forward() { Dtype loss = 0; Net& net = *root_solver_->net(); const std::vector>>& layers{ net.layers() }; @@ -258,7 +258,7 @@ Dtype MultiSolver::Forward() { } template -void MultiSolver::WaitAndUpdate() { +inline void MultiSolver::WaitAndUpdate() { Net& net = *root_solver_->net(); const std::vector>>& layers{ net.layers() }; diff --git a/src/caffe/multinode/multi_sync.cpp b/src/caffe/multinode/multi_sync.cpp index 217f29e2a..0f8f56811 100644 --- a/src/caffe/multinode/multi_sync.cpp +++ b/src/caffe/multinode/multi_sync.cpp @@ -50,8 +50,8 @@ MultiSync::MultiSync(shared_ptr > root_solver) reduce_req_vec(net_params.size(), NULL), irecv_req_vec(net_params.size(), MPI_REQUEST_NULL), broadcast_req_vec(net_params.size(), NULL), - irecv_done(net_params.size(), true), - broadcast_launched(net_params.size(), true), + irecv_done(net_params.size(), false), + broadcast_launched(net_params.size(), false), distrib_bcast(NULL) { root_solver->param().set_disabled_update(true); diff --git a/src/caffe/net.cpp b/src/caffe/net.cpp index 48df83495..89a741bd2 100644 --- a/src/caffe/net.cpp +++ b/src/caffe/net.cpp @@ -300,6 +300,10 @@ void Net::Init(const NetParameter& in_param) { // global batch size must be a multiple of data_parts int global_batch_size = mn::train::get_global_minibatch_size(); int fake_batch_size = mn::get_distrib()->get_data_parts(); + if (mn::use_param_server() && mn::is_param_server()) { + fake_batch_size = mn::nServer; + } + if (global_batch_size == 0) { LOG(WARNING) << "SetMinibatchSize " << fake_batch_size; mn::train::set_global_minibatch_size(fake_batch_size); diff --git a/tools/caffe.cpp b/tools/caffe.cpp index 5edfaa88a..f22af5e11 100644 --- a/tools/caffe.cpp +++ b/tools/caffe.cpp @@ -730,8 +730,18 @@ int main(int argc, char** argv) { caffe::GlobalInit(&argc, &argv); #ifdef USE_MLSL caffe::mn::nGroup = FLAGS_n_group; + if (caffe::mn::nGroup <= 0) { + LOG(ERROR) << "Invalid number of group: " << caffe::mn::nGroup; + return 1; + } + caffe::mn::nServer = FLAGS_n_server; caffe::mn::init(&argc, &argv); + if (caffe::mn::get_group_size() <= 0) { + LOG(ERROR) << "Invalid group size: " << caffe::mn::get_group_size(); + return 1; + } + CHECK_EQ(caffe::mn::get_world_size(), caffe::mn::nGroup * caffe::mn::get_group_size() + caffe::mn::nServer); if (caffe::mn::nGroup > 1) {