Skip to content
This repository has been archived by the owner on Aug 5, 2022. It is now read-only.

Commit

Permalink
Merge branch 'release_1.1.1' of ssh://git-ccr-1.devtools.intel.com:29…
Browse files Browse the repository at this point in the history
…418/dl_framework-intel_caffe into release_1.1.1
  • Loading branch information
fzou1 committed Mar 9, 2018
2 parents b65c473 + 5fbc80f commit 2a30fe7
Show file tree
Hide file tree
Showing 19 changed files with 204 additions and 6,799 deletions.
1 change: 1 addition & 0 deletions include/caffe/multinode/multi_solver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class MultiSolver {
protected:
virtual ~Callback() {
}

virtual void on_backward_finished(int layer_id) = 0;
virtual void on_delwt_wait(int layer_id) = 0;
virtual void apply_updates(int layer_id) = 0;
Expand Down
180 changes: 121 additions & 59 deletions include/caffe/multinode/multi_sync.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,9 @@ namespace caffe {
}

void apply_updates(int layer_id) {
if (mn::use_param_server())
return;

std::vector<int> &param_ids = layer_param_ids[layer_id];
for (int i = 0; i < param_ids.size(); ++i) {
solver->root_solver()->ApplyUpdate(param_ids[i]);
Expand Down Expand Up @@ -274,25 +277,44 @@ namespace caffe {
}
}

void launch_reduce(int layer_id, int param_id) {
void launch_reduce(int layer_id) {
mn::Distribution& distrib = layers[layer_id]->GetDistribution();
Dtype* send_buff = NULL;
Dtype* recv_buff = NULL;
size_t buf_size = net_params[param_id]->count();
if (CAN_USE_PRV_DIFF(net_params[param_id])) {
send_buff = (Dtype*)net_params[param_id]->prv_diff();
recv_buff = net_params[param_id]->mutable_prv_diff();
}
else {
send_buff = (Dtype*)net_params[param_id]->cpu_diff();
recv_buff = net_params[param_id]->mutable_cpu_diff();
}
reduce_req_vec[param_id] =
distrib.reduce_async<Dtype,MLSL::ReductionType::RT_SUM,MLSL::GroupType::GT_DATA>(
send_buff, recv_buff, buf_size);
if (reduce_req_vec[param_id] != NULL && distrib.is_root(MLSL::GroupType::GT_DATA)) {
AsyncTask req_task(layer_id, param_id, NULL);
reduce_req_list.push_back(req_task);
boost::shared_ptr<Layer<Dtype>> &layer = layers[layer_id];
std::vector<int> &param_ids = layer_param_ids[layer_id];
// TODO: descending is faster?
for (int i = param_ids.size() - 1; i >= 0; --i) {
if (!layer->ParamNeedReduce(i)) continue;
int param_id = param_ids[i];

if (mn::get_nodes_count() > 1) {
Dtype* send_buff = NULL;
Dtype* recv_buff = NULL;
size_t buf_size = net_params[param_id]->count();
if (CAN_USE_PRV_DIFF(net_params[param_id])) {
send_buff = (Dtype*)net_params[param_id]->prv_diff();
recv_buff = net_params[param_id]->mutable_prv_diff();
}
else {
send_buff = (Dtype*)net_params[param_id]->cpu_diff();
recv_buff = net_params[param_id]->mutable_cpu_diff();
}
reduce_req_vec[param_id] =
distrib.reduce_async<Dtype,MLSL::ReductionType::RT_SUM,MLSL::GroupType::GT_DATA>(
send_buff, recv_buff, buf_size);
bool complete = false;
DLOG(INFO) << "Launch reduce with param id " << param_id;
// progress communication ASAP
MLSL::Environment::GetEnv().Test(reduce_req_vec[param_id], &complete);
if (complete) {
// reset req to indicate no need to do Wait
reduce_req_vec[param_id] = NULL;
}
}

if (distrib.is_root(MLSL::GroupType::GT_DATA)) {
AsyncTask req_task(layer_id, param_id, NULL);
reduce_req_list.push_back(req_task);
}
}
}

Expand All @@ -306,13 +328,17 @@ namespace caffe {
else {
MLSL::Environment::GetEnv().Test(reduce_req_vec[iter->param_id], &complete);
}

if (complete) {
// reset req to indicate no need to do Wait
reduce_req_vec[iter->param_id] = NULL;

int param_id = iter->param_id;
DLOG(INFO) << "Finish reduce on param id " << param_id
<< " value " << net_params[param_id]->sumsq_diff();

void* send_buff;
void* recv_buff;
int param_id = iter->param_id;
size_t buf_size = net_params[param_id]->count();

if (CAN_USE_PRV_DIFF(net_params[param_id] ) ) {
Expand All @@ -327,6 +353,7 @@ namespace caffe {
else {
recv_buff = (void*)net_params[param_id]->mutable_cpu_data();
}

mn::Distribution &distrib = layers[iter->layer_id]->GetDistribution();
int server_mpi_rank = mn::param_to_server_rank(iter->layer_id, iter->param_id);
mn::TaskRequest task(
Expand All @@ -336,25 +363,35 @@ namespace caffe {
int tag = task.GetTag();
MPI_Request send_req;
int recv_flag = 1;
// recv from PS
// recv from PS
MPI_Irecv(recv_buff, buf_size, mn::DtypeToMPIDtype<Dtype>(),
server_mpi_rank, tag, MPI_COMM_WORLD, &irecv_req_vec[param_id]);
MPI_Test(&irecv_req_vec[param_id], &recv_flag, MPI_STATUS_IGNORE);
CHECK(!recv_flag);
DLOG(INFO) << "Receive data of param id " << param_id
<< " from PS (rank " << server_mpi_rank << " and tag " << tag
<< ")";
// Send to PS
MPI_Isend(send_buff, buf_size, mn::DtypeToMPIDtype<Dtype>(),
server_mpi_rank, tag, MPI_COMM_WORLD, &send_req);
// TODO: why do we have to wait here?
MPI_Wait(&send_req, MPI_STATUS_IGNORE);

DLOG(INFO) << "Send to PS of param id " << param_id
<< " to PS (rank " << server_mpi_rank << " and tag " << tag
<< ")";

irecv_req_list.push_back(task);
iter = reduce_req_list.erase(iter);
}
else iter++;
}
}

void launch_param_broadcast(int layer_id, int param_id) {
void launch_param_broadcast(int param_id) {
if (mn::get_nodes_count() <= 1)
return;

Dtype* buff;
if (CAN_USE_PRV_DATA(net_params[param_id])) {
if (distrib_bcast->is_root(MLSL::GroupType::GT_DATA))
Expand All @@ -369,6 +406,7 @@ namespace caffe {
buff = net_params[param_id]->mutable_cpu_data();
}
size_t buf_size = net_params[param_id]->count();
DLOG(INFO) << "Launch broadcast request with param id " << param_id;
broadcast_req_vec[param_id] =
distrib_bcast->bcast_async<Dtype,MLSL::GroupType::GT_DATA>(buff, buf_size);
}
Expand All @@ -384,6 +422,8 @@ namespace caffe {
MPI_Test(&irecv_req_vec[param_id], &flag, MPI_STATUS_IGNORE);
}
if (flag) {
DLOG(INFO) << " Root node received model param id " << param_id
<< " value " << net_params[param_id]->sumsq_data();
irecv_req_vec[param_id] = MPI_REQUEST_NULL;
irecv_done[param_id] = true;
iter = irecv_req_list.erase(iter);
Expand All @@ -400,7 +440,7 @@ namespace caffe {
int param_id = layer_param_ids[i][j];
if (!broadcast_launched[param_id]) {
if (irecv_done[param_id]) {
launch_param_broadcast(i, param_id);
launch_param_broadcast(param_id);
broadcast_launched[param_id] = true;
} else return;
}
Expand All @@ -410,61 +450,83 @@ namespace caffe {

void on_backward_finished(int layer_id) {
boost::shared_ptr<Layer<Dtype>> &layer = layers[layer_id];
if (layer->layerOp == nullptr) {
return;
}

if (mn::use_param_server()) {
std::vector<int> &param_ids = layer_param_ids[layer_id];
// TODO: descending is faster?
for (int i = param_ids.size() - 1; i >= 0; --i) {
if (!layer->ParamNeedReduce(i)) continue;
launch_reduce(layer_id, param_ids[i]);
mn::Distribution &distrib = layer->GetDistribution();
if (distrib.is_root(MLSL::GroupType::GT_DATA)) {
check_and_launch_comm_to_ps();
check_and_launch_broadcast();
} else {
launch_param_broadcast(layer_id, param_ids[i]);
launch_reduce(layer_id);

mn::Distribution &distrib = layer->GetDistribution();
if (distrib.is_root(MLSL::GroupType::GT_DATA)) {
check_and_launch_comm_to_ps();
check_and_launch_broadcast();
} else {
std::vector<int> &param_ids = layer_param_ids[layer_id];
for (int i=param_ids.size() - 1; i >= 0; --i) {
launch_param_broadcast(param_ids[i]);
}
}
} else {
if (layer->layerOp == nullptr) {
return;
}
launch_allreduce(layer_id);
}
}



void delwt_wait_ps(int layer_id) {
mn::Distribution &distrib = layers[layer_id]->GetDistribution();

std::vector<int> &param_ids = layer_param_ids[layer_id];
if (distrib.is_root(MLSL::GroupType::GT_DATA)) {
std::vector<int> &param_ids = layer_param_ids[layer_id];
// TODO: can we start comm with ps earlier? Per-layer data would be inconsistent then.
check_and_launch_comm_to_ps();
check_and_launch_broadcast();
for (int i = param_ids.size() - 1; i >= 0; i--) {
int param_id = param_ids[i];
// wait for reduce
if (reduce_req_vec[param_id] != NULL) {
MLSL::Environment::GetEnv().Wait(reduce_req_vec[param_id]);
}
reduce_req_vec[param_id] = NULL;
// wait for new param from param server
if (irecv_req_vec[param_id] != MPI_REQUEST_NULL) {
MPI_Wait(&irecv_req_vec[param_id], MPI_STATUS_IGNORE);
// the req is set to MPI_Request_NULL indicating the request is already finished
irecv_req_vec[param_id] = MPI_REQUEST_NULL;
}
irecv_done[param_id] = false;
// wait for the completion of broadcast
if (broadcast_req_vec[param_id] != NULL) {
MLSL::Environment::GetEnv().Wait(broadcast_req_vec[param_id]);
broadcast_req_vec[param_id] = NULL;
bool bcast_launched = false;
while (!bcast_launched) {
// TODO: can we start comm with ps earlier? Per-layer data would be inconsistent then.
check_and_launch_comm_to_ps();
check_and_launch_broadcast();
bcast_launched = true;
for (int i = param_ids.size() - 1; i >= 0; i--) {
bcast_launched = bcast_launched & broadcast_launched[param_ids[i]];
}
broadcast_launched[param_id] = false;
}
}

#ifdef FW_OVERLAP_OPT
solver->set_layer_finished_flag(layer_id, true);
#endif

for (int i = param_ids.size() - 1; i >= 0; i--) {
int param_id = param_ids[i];
DLOG(INFO) << " Wait reduce layer id " << layer_id << " param id " << param_id;
// wait for reduce
if (reduce_req_vec[param_id] != NULL) {
MLSL::Environment::GetEnv().Wait(reduce_req_vec[param_id]);
}
reduce_req_vec[param_id] = NULL;

DLOG(INFO) << "Worker (iter " << solver->root_solver()->iter() << "): param id=" << param_id << " weight=" << net_params[param_id]->sumsq_diff();

DLOG(INFO) << " Wait recv layer id " << layer_id << " param id " << param_id;

// wait for new param from param server
if (irecv_req_vec[param_id] != MPI_REQUEST_NULL) {
MPI_Wait(&irecv_req_vec[param_id], MPI_STATUS_IGNORE);
// the req is set to MPI_Request_NULL indicating the request is already finished
irecv_req_vec[param_id] = MPI_REQUEST_NULL;
}
irecv_done[param_id] = false;

DLOG(INFO) << " Wait broadcast layer id " << layer_id << " param id " << param_id;

// wait for the completion of broadcast
if (broadcast_req_vec[param_id] != NULL) {
MLSL::Environment::GetEnv().Wait(broadcast_req_vec[param_id]);
broadcast_req_vec[param_id] = NULL;
}
broadcast_launched[param_id] = false;

DLOG(INFO) << "Worker (iter " << solver->root_solver()->iter() << "): param id=" << param_id << " data=" << net_params[param_id]->sumsq_data();
}
}

void delwt_wait_no_ps(int layer_id) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
net: "models/intel_optimized_models/multinode/default_resnet50_16nodes/train_val.prototxt"
test_iter: 1000
test_interval: 625
test_interval: 563
test_initialization: false
display: 40
base_lr: 0.8
Expand All @@ -9,11 +9,11 @@ stepvalue:18750
stepvalue:37500
stepvalue:50000
gamma: 0.1
max_iter: 62556 # 56300
max_iter: 56300
warmup_iter: 3125 # 1281167 / 2048 * 5 epochs
warmup_start_lr: 0.1
momentum: 0.9
weight_decay: 0.0001
snapshot: 6250
snapshot: 5630
snapshot_prefix: "default_resnet_50_16_nodes"
solver_mode: CPU
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
net: "models/intel_optimized_models/multinode/resnet_50_64_nodes_8k_batch/train_val.prototxt"
net: "models/intel_optimized_models/multinode/default_resnet50_64nodes/train_val.prototxt"
test_iter: 1000
test_interval: 156
test_interval: 140
test_initialization: false
display: 40
base_lr: 3.2
Expand All @@ -14,6 +14,6 @@ warmup_iter: 780 # 1281167 / 8192 * 5 epochs
warmup_start_lr: 0.1
momentum: 0.9
weight_decay: 0.0001
snapshot: 1560
snapshot_prefix: "models/intel_optimized_models/multinode/resnet_50_64_nodes_8k_batch/resnet_50_64_nodes_8k"
snapshot: 1408
snapshot_prefix: "default_resnet_50_64_nodes"
solver_mode: CPU
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,7 @@ layer {
num_output: 128
kernel_size: 1
pad: 0
stride: 1
stride: 2
bias_term: false
weight_filler {
type: "msra"
Expand Down Expand Up @@ -841,7 +841,7 @@ layer {
num_output: 128
kernel_size: 3
pad: 1
stride: 2
stride: 1
bias_term: false
weight_filler {
type: "msra"
Expand Down Expand Up @@ -1569,7 +1569,7 @@ layer {
num_output: 256
kernel_size: 1
pad: 0
stride: 1
stride: 2
bias_term: false
weight_filler {
type: "msra"
Expand Down Expand Up @@ -1627,7 +1627,7 @@ layer {
num_output: 256
kernel_size: 3
pad: 1
stride: 2
stride: 1
bias_term: false
weight_filler {
type: "msra"
Expand Down Expand Up @@ -2727,7 +2727,7 @@ layer {
num_output: 512
kernel_size: 1
pad: 0
stride: 1
stride: 2
bias_term: false
weight_filler {
type: "msra"
Expand Down Expand Up @@ -2785,7 +2785,7 @@ layer {
num_output: 512
kernel_size: 3
pad: 1
stride: 2
stride: 1
bias_term: false
weight_filler {
type: "msra"
Expand Down Expand Up @@ -3302,6 +3302,9 @@ layer {
top: "loss"
name: "prob"
type: "SoftmaxWithLoss"
include {
phase: TRAIN
}
}
layer {
name: "loss3/top-1"
Expand Down
Loading

0 comments on commit 2a30fe7

Please sign in to comment.