Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Single file per rank #619

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 41 additions & 8 deletions src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <string>
#include <memory>
#include <cstdio> // sscanf
#include <fstream> // Include this for std::ifstream

// Athena headers
#include "athena.hpp"
Expand Down Expand Up @@ -231,10 +232,38 @@ int main(int argc, char *argv[]) {
ParameterInput* pinput = new ParameterInput;
IOWrapper infile, restartfile;
// read parameters from restart file
bool single_file_per_rank = false; // DBF: flag for single_file_per_rank for rst files
if (res_flag) {
restartfile.Open(restart_file.c_str(), IOWrapper::FileMode::read);
pinput->LoadFromFile(restartfile);
// Check if the path contains "rank_" directory
size_t rank_pos = restart_file.find("/rank_");
single_file_per_rank = (rank_pos != std::string::npos);

// If single_file_per_rank is true, modify the path for the current rank
if (single_file_per_rank) {
// Extract the base directory and file name
size_t last_slash = restart_file.rfind('/');
std::string base_dir = restart_file.substr(0, rank_pos);
std::string file_name = restart_file.substr(last_slash + 1);

// Construct the path for the current rank
char rank_dir[20];
std::snprintf(rank_dir, sizeof(rank_dir), "rank_%08d", global_variable::my_rank);
restart_file = base_dir + "/" + rank_dir + "/" + file_name;
}

// Now use restart_file for opening the file
std::ifstream file_check(restart_file);
if (!file_check.good()) {
std::cerr << "Error: Unable to open restart file: " << restart_file << std::endl;
// Handle the error (e.g., exit the program or use a default configuration)
}

// read parameters from restart file
restartfile.Open(restart_file.c_str(),IOWrapper::FileMode::read,single_file_per_rank);
pinput->LoadFromFile(restartfile, single_file_per_rank);
IOWrapperSizeT headeroffset = restartfile.GetPosition(single_file_per_rank);
}

// read parameters from input file. If both -r and -i are specified, this will
// override parameters from the restart file
if (iarg_flag) {
Expand All @@ -247,7 +276,7 @@ int main(int argc, char *argv[]) {
// Dump input parameters and quit if code was run with -n option.
if (narg_flag) {
if (global_variable::my_rank == 0) pinput->ParameterDump(std::cout);
if (res_flag) restartfile.Close();
if (res_flag) restartfile.Close(single_file_per_rank);
delete pinput;
Kokkos::finalize();
#if MPI_PARALLEL_ENABLED
Expand All @@ -265,13 +294,13 @@ int main(int argc, char *argv[]) {
if (!res_flag) {
pmesh->BuildTreeFromScratch(pinput);
} else {
pmesh->BuildTreeFromRestart(pinput, restartfile);
pmesh->BuildTreeFromRestart(pinput, restartfile, single_file_per_rank);
}

// If code was run with -m option, write mesh structure to file and quit.
if (marg_flag) {
if (global_variable::my_rank == 0) {pmesh->WriteMeshStructure();}
if (res_flag) {restartfile.Close();}
if (res_flag) {restartfile.Close(single_file_per_rank);}
delete pmesh;
delete pinput;
Kokkos::finalize();
Expand All @@ -292,10 +321,12 @@ int main(int argc, char *argv[]) {
pmesh->pgen = std::make_unique<ProblemGenerator>(pinput, pmesh);
} else {
// read ICs from restart file using ProblemGenerator constructor for restarts
pmesh->pgen = std::make_unique<ProblemGenerator>(pinput, pmesh, restartfile);
restartfile.Close();
pmesh->pgen = std::make_unique<ProblemGenerator>(pinput,
pmesh,
restartfile,
single_file_per_rank);
restartfile.Close(single_file_per_rank);
}

//--- Step 6. --------------------------------------------------------------------------
// Construct Driver and Outputs. Actual outputs (including initial conditions) are made
// in Driver.Initialize(). Add wall clock timer to Driver if necessary.
Expand All @@ -304,6 +335,8 @@ int main(int argc, char *argv[]) {
Driver* pdriver = new Driver(pinput, pmesh, wtlim, &timer);
Outputs* pout = new Outputs(pinput, pmesh);



//--- Step 7. --------------------------------------------------------------------------
// Execute Driver.
// 1. Initial conditions set in Driver::Initialize()
Expand Down
41 changes: 30 additions & 11 deletions src/mesh/build_tree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,8 @@ void Mesh::BuildTreeFromScratch(ParameterInput *pin) {
//! divides grid into MeshBlock(s) for restart runs, using parameters and data read from
//! restart file.

void Mesh::BuildTreeFromRestart(ParameterInput *pin, IOWrapper &resfile) {
void Mesh::BuildTreeFromRestart(ParameterInput *pin, IOWrapper &resfile,
bool single_file_per_rank) {
// At this point, the restartfile is already open and the ParameterInput (input file)
// data has already been read in main(). Thus the file pointer is set to after <par_end>
IOWrapperSizeT headeroffset = resfile.GetPosition();
Expand All @@ -327,18 +328,30 @@ void Mesh::BuildTreeFromRestart(ParameterInput *pin, IOWrapper &resfile) {
+ sizeof(RegionSize) + 2*sizeof(RegionIndcs);
char *headerdata = new char[headersize];

if (global_variable::my_rank == 0) { // the master process reads the header data
if (resfile.Read_bytes(headerdata, 1, headersize) != headersize) {
// the master process reads the header data if single_file_per_rank is false
if (global_variable::my_rank == 0 || single_file_per_rank) {
IOWrapperSizeT read_size = resfile.Read_bytes(headerdata, 1, headersize,
single_file_per_rank);
if (read_size != headersize) {
std::cout << "### FATAL ERROR in " << __FILE__ << " at line " << __LINE__
<< std::endl << "Header size read from restart file is incorrect, "
<< "restart file is broken." << std::endl;
<< "expected " << headersize << ", got " << read_size << std::endl;
exit(EXIT_FAILURE);
}
}

#if MPI_PARALLEL_ENABLED
// then broadcast the header data
MPI_Bcast(headerdata, headersize, MPI_CHAR, 0, MPI_COMM_WORLD);
if (!single_file_per_rank) {
int mpi_err = MPI_Bcast(headerdata, headersize, MPI_CHAR, 0, MPI_COMM_WORLD);
if (mpi_err != MPI_SUCCESS) {
char error_string[1024];
int length_of_error_string;
MPI_Error_string(mpi_err, error_string, &length_of_error_string);
std::cout << "MPI_Bcast failed with error: " << error_string << std::endl;
exit(EXIT_FAILURE);
}
}
#endif

// Now copy mesh data read from restart file into Mesh variables. Order of variables
Expand Down Expand Up @@ -391,8 +404,9 @@ void Mesh::BuildTreeFromRestart(ParameterInput *pin, IOWrapper &resfile) {
// allocate idlist buffer and read list of logical locations and cost
IOWrapperSizeT listsize = sizeof(LogicalLocation) + sizeof(float);
char *idlist = new char[listsize*nmb_total];
if (global_variable::my_rank == 0) { // only the master process reads the ID list
if (resfile.Read_bytes(idlist,listsize,nmb_total) !=
// only the master process reads the ID list
if (global_variable::my_rank == 0 || single_file_per_rank) {
if (resfile.Read_bytes(idlist,listsize,nmb_total,single_file_per_rank) !=
static_cast<unsigned int>(nmb_total)) {
std::cout << "### FATAL ERROR in " << __FILE__ << " at line " << __LINE__
<< std::endl << "Incorrect number of MeshBlocks in restart file; "
Expand All @@ -402,7 +416,9 @@ void Mesh::BuildTreeFromRestart(ParameterInput *pin, IOWrapper &resfile) {
}
#if MPI_PARALLEL_ENABLED
// then broadcast the ID list
MPI_Bcast(idlist, listsize*nmb_total, MPI_CHAR, 0, MPI_COMM_WORLD);
if (!single_file_per_rank) {
MPI_Bcast(idlist, listsize*nmb_total, MPI_CHAR, 0, MPI_COMM_WORLD);
}
#endif

// everyone sets the logical location and cost lists based on bradcasted data
Expand Down Expand Up @@ -439,11 +455,14 @@ void Mesh::BuildTreeFromRestart(ParameterInput *pin, IOWrapper &resfile) {

#ifdef MPI_PARALLEL_ENABLED
// check there is at least one MeshBlock per MPI rank
if (nmb_total < global_variable::nranks) {
std::cout << "### FATAL ERROR in " << __FILE__ << " at line " << __LINE__ << std::endl
if (!single_file_per_rank) {
if (nmb_total < global_variable::nranks) {
std::cout << "### FATAL ERROR in " << __FILE__ << " at line "
<< __LINE__ << std::endl
<< "Fewer MeshBlocks (nmb_total=" << nmb_total << ") than MPI ranks (nranks="
<< global_variable::nranks << ")" << std::endl;
std::exit(EXIT_FAILURE);
std::exit(EXIT_FAILURE);
}
}
#endif

Expand Down
3 changes: 2 additions & 1 deletion src/mesh/mesh.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ class Mesh {

// functions
void BuildTreeFromScratch(ParameterInput *pin);
void BuildTreeFromRestart(ParameterInput *pin, IOWrapper &resfile);
void BuildTreeFromRestart(ParameterInput *pin, IOWrapper &resfile,
bool single_file_per_rank=false);
void PrintMeshDiagnostics();
void WriteMeshStructure();
void NewTimeStep(const Real tlim);
Expand Down
82 changes: 56 additions & 26 deletions src/outputs/binary.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ MeshBinaryOutput::MeshBinaryOutput(ParameterInput *pin, Mesh *pm, OutputParamete
// set different stripe counts depending on whether mpiio is used in order to
// achieve the best performance and not to crash the filesystem
mkdir("bin",0775);
bool single_file_per_rank = op.single_file_per_rank;
if (single_file_per_rank) {
char rank_dir[20];
std::snprintf(rank_dir, sizeof(rank_dir), "bin/rank_%08d/", global_variable::my_rank);
mkdir(rank_dir, 0775);
}
}

//----------------------------------------------------------------------------------------
Expand All @@ -48,21 +54,28 @@ void MeshBinaryOutput::WriteOutputFile(Mesh *pm, ParameterInput *pin) {

// create filename: "bin/file_basename" + "." + "file_id" + "." + XXXXX + ".bin"
// where XXXXX = 5-digit file_number
std::string fname;
char number[6];
std::snprintf(number, sizeof(number), "%05d", out_params.file_number);

fname.assign("bin/");
fname.append(out_params.file_basename);
fname.append(".");
fname.append(out_params.file_id);
fname.append(".");
fname.append(number);
fname.append(".bin");
bool single_file_per_rank = out_params.single_file_per_rank;
std::string fname;
if (single_file_per_rank) {
// Generate a directory and filename for each rank
char rank_dir[20];
char number[7];
std::snprintf(number, sizeof(number), ".%05d", out_params.file_number);
std::snprintf(rank_dir, sizeof(rank_dir), "rank_%08d/", global_variable::my_rank);
fname = std::string("bin/") + std::string(rank_dir) + out_params.file_basename
+ "." + out_params.file_id + number + ".bin";
} else {
// Existing behavior: single restart file
char number[7];
std::snprintf(number, sizeof(number), ".%05d", out_params.file_number);
fname = std::string("bin/") + out_params.file_basename
+ "." + out_params.file_id + number + ".bin";
}

IOWrapper binfile;
std::size_t header_offset=0;
binfile.Open(fname.c_str(), IOWrapper::FileMode::write);
binfile.Open(fname.c_str(), IOWrapper::FileMode::write, single_file_per_rank);

// Basic parts of the format:
// 1. Size of the header
Expand All @@ -84,8 +97,9 @@ void MeshBinaryOutput::WriteOutputFile(Mesh *pm, ParameterInput *pin) {
msg << outvars[n].label.c_str() << " ";
}
msg << std::endl;
if (global_variable::my_rank == 0) {
binfile.Write_any_type(msg.str().c_str(),msg.str().size(),"byte");
if (global_variable::my_rank == 0 || single_file_per_rank) {
binfile.Write_any_type(msg.str().c_str(),msg.str().size(),"byte",
single_file_per_rank);
}
header_offset += msg.str().size();
}
Expand All @@ -96,9 +110,10 @@ void MeshBinaryOutput::WriteOutputFile(Mesh *pm, ParameterInput *pin) {
pin->ParameterDump(ost);
std::string sbuf=ost.str();
msg << " header offset=" << sbuf.size()*sizeof(char) << std::endl;
if (global_variable::my_rank == 0) {
binfile.Write_any_type(msg.str().c_str(),msg.str().size(),"byte");
binfile.Write_any_type(sbuf.c_str(),sbuf.size(),"byte");
if (global_variable::my_rank == 0 || single_file_per_rank) {
binfile.Write_any_type(msg.str().c_str(),msg.str().size(),"byte",
single_file_per_rank);
binfile.Write_any_type(sbuf.c_str(),sbuf.size(),"byte", single_file_per_rank);
}
header_offset += sbuf.size()*sizeof(char);
header_offset += msg.str().size();
Expand Down Expand Up @@ -219,20 +234,31 @@ void MeshBinaryOutput::WriteOutputFile(Mesh *pm, ParameterInput *pin) {
std::vector<int> rank_offset(global_variable::nranks, 0);
std::partial_sum(noutmbs.begin(),std::prev(noutmbs.end()),
std::next(rank_offset.begin()));
std::size_t myoffset=header_offset+data_size*rank_offset[global_variable::my_rank];
std::size_t myoffset = header_offset+data_size*rank_offset[global_variable::my_rank];

if (single_file_per_rank) {
myoffset = header_offset; // Reset offset for individual files
}

if (noutmbs_min > 0) {
binfile.Write_any_type_at_all(data,(data_size*nout_mbs),myoffset,"byte");
binfile.Write_any_type_at_all(data,(data_size*nout_mbs),myoffset,"byte",
single_file_per_rank);
} else {
if (nout_mbs > 0) {
binfile.Write_any_type_at(data,(data_size*nout_mbs),myoffset,"byte");
binfile.Write_any_type_at(data,(data_size*nout_mbs),myoffset,"byte",
single_file_per_rank);
}
}
} else {
// check if elements larger than 2^31
if (data_size*nb_mbs<=2147483648) {
// now write binary data in parallel
std::size_t myoffset=header_offset+data_size*ns_mbs;
binfile.Write_any_type_at_all(data,(data_size*nb_mbs),myoffset,"byte");
std::size_t myoffset = header_offset;
if (!single_file_per_rank) {
myoffset += data_size*ns_mbs;
}
binfile.Write_any_type_at_all(data,(data_size*nb_mbs),myoffset,"byte",
single_file_per_rank);
} else {
// write data over each MeshBlock sequentially and in parallel
// calculate max/min number of MeshBlocks across all ranks
Expand All @@ -244,19 +270,23 @@ void MeshBinaryOutput::WriteOutputFile(Mesh *pm, ParameterInput *pin) {
}
for (int m=0; m<noutmbs_max; ++m) {
char *pdata=&(data[m*data_size]);
std::size_t myoffset=header_offset+data_size*ns_mbs+data_size*m;
std::size_t myoffset = header_offset + data_size*m;
if (!single_file_per_rank) {
myoffset += data_size*ns_mbs;
}
// every rank has a MB to write, so write collectively
if (m < noutmbs_min) {
if (binfile.Write_any_type_at_all(pdata,(data_size),myoffset,"byte")
!= data_size) {
if (binfile.Write_any_type_at_all(pdata,(data_size),myoffset,"byte",
single_file_per_rank) != data_size) {
std::cout << "### FATAL ERROR in " << __FILE__ << " at line " << __LINE__
<< std::endl << "binary data not written correctly to binary file, "
<< "binary file is broken." << std::endl;
exit(EXIT_FAILURE);
}
// some ranks are finished writing, so use non-collective write
} else if (m < pm->nmb_thisrank) {
if (binfile.Write_any_type_at(pdata,(data_size),myoffset,"byte") != data_size) {
if (binfile.Write_any_type_at(pdata,(data_size),myoffset,"byte",
single_file_per_rank) != data_size) {
std::cout << "### FATAL ERROR in " << __FILE__ << " at line " << __LINE__
<< std::endl << "binary data not written correctly to binary file, "
<< "binary file is broken." << std::endl;
Expand All @@ -268,7 +298,7 @@ void MeshBinaryOutput::WriteOutputFile(Mesh *pm, ParameterInput *pin) {
}

// close the output file and clean up ptrs to data
binfile.Close();
binfile.Close(single_file_per_rank);
delete [] data;
delete [] single_data;

Expand Down
Loading