diff --git a/include/bout/cyclic_reduction.hxx b/include/bout/cyclic_reduction.hxx index d4ef958e93..f7dbc71cbd 100644 --- a/include/bout/cyclic_reduction.hxx +++ b/include/bout/cyclic_reduction.hxx @@ -17,7 +17,7 @@ alpha = a3 / b1 * ************************************************************************** - * Copyright 2010 B.D.Dudson, S.Farley, M.V.Umansky, X.Q.Xu + * Copyright 2010,2021 B.D.Dudson, S.Farley, M.V.Umansky, X.Q.Xu * * Contact: Ben Dudson, bd512@york.ac.uk * @@ -38,8 +38,8 @@ * ************************************************************************/ -#ifndef __CYCLIC_REDUCE_H__ -#define __CYCLIC_REDUCE_H__ +#ifndef CYCLIC_REDUCE_H_ +#define CYCLIC_REDUCE_H_ #ifdef DIAGNOSE #undef DIAGNOSE @@ -63,27 +63,40 @@ class CyclicReduce { public: CyclicReduce() = default; - CyclicReduce(MPI_Comm c, int size) : comm(c), N(size) { - MPI_Comm_size(c, &nprocs); - MPI_Comm_rank(c, &myproc); + CyclicReduce(MPI_Comm c, int size, int ngather = 0) + : comm(c), ngatherprocs(ngather), N(size) { + setup(c, size, ngather); } /// Set parameters + /// This can be called after creation, to reset the solver + /// /// @param[in] c The communicator of all processors involved in the solve /// @param[in] size The number of rows on this processor - void setup(MPI_Comm c, int size) { + /// @param[in] gather The number of processors to gather onto. If 0, use all processors + void setup(MPI_Comm c, int size, int ngather = 0) { comm = c; int np, myp; MPI_Comm_size(c, &np); MPI_Comm_rank(c, &myp); - if ((size != N) || (np != nprocs) || (myp != myproc)) { + + if (ngather == 0) { + ngather = np; // Use all processors + } + + if ((size != N) || (np != nprocs) || (myp != myproc) || (ngather != ngatherprocs)) { Nsys = 0; // Need to re-size } + N = size; periodic = false; nprocs = np; myproc = myp; + + ngatherprocs = ngather; + ASSERT1(ngatherprocs > 0); + ASSERT1(ngatherprocs <= nprocs); } ~CyclicReduce() = default; @@ -123,7 +136,7 @@ public: int nsys = std::get<0>(a.shape()); // Make sure correct memory arrays allocated - allocMemory(nprocs, nsys, N); + allocMemory(nsys); // Fill coefficient array BOUT_OMP(parallel for) @@ -197,13 +210,14 @@ public: /////////////////////////////////////// // Gather all interface equations onto single processor - // NOTE: Need to replace with divide-and-conquer at some point + // NOTE: Cyclic reduction would do this in multiple stages, + // but here we just gather onto ngatherprocs processors // - // There are Nsys sets of equations to gather, and nprocs processors + // There are Nsys sets of equations to gather, and ngatherprocs processors // which can be used. Each processor therefore sends interface equations // to several different processors for gathering // - // e.g. 3 processors (nproc=3), 4 sets of equations (Nsys=4): + // e.g. 3 processors (nprocs=3 and ngatherprocs=3), 4 sets of equations (Nsys=4): // // PE 0: [1a 2a 3a 4a] PE 1: [1b 2b 3b 4b] PE 2: [1c 2c 3c 4c] // @@ -214,14 +228,16 @@ public: // // Here PE 0 would have myns=2, PE 1 and 2 would have myns=1 - int ns = Nsys / nprocs; // Number of systems to assign to all processors - int nsextra = Nsys % nprocs; // Number of processors with 1 extra - - auto* req = new MPI_Request[nprocs]; + // Receive requests if (myns > 0) { - // Post receives from all other processors - req[myproc] = MPI_REQUEST_NULL; + // If gathering systems onto this processor + // post receives from all other processors + + all_req.resize(nprocs); // One communicator per processor + all_buffer.reallocate(nprocs, myns * 8); // Storage for systems from each processor + + all_req[myproc] = MPI_REQUEST_NULL; for (int p = 0; p < nprocs; p++) { // Loop over processor // 2 interface equations per processor // myns systems to solve @@ -240,46 +256,73 @@ public: #ifdef DIAGNOSE output << "Expecting to receive " << len << " from " << p << endl; #endif - MPI_Irecv(&recvbuffer(p, 0), len, - MPI_BYTE, // Just sending raw data, unknown type - p, // Destination processor - p, // Identifier - comm, // Communicator - &req[p]); // Request + MPI_Irecv(&all_buffer(p, 0), len, + MPI_BYTE, // Just sending raw data, unknown type + p, // Destination processor + p, // Identifier + comm, // Communicator + &all_req[p]); // Request } } } - // Send data - int s0 = 0; - for (int p = 0; p < nprocs; p++) { // Loop over processor - int nsp = ns; - if (p < nsextra) { - nsp++; - } - if ((p != myproc) && (nsp > 0)) { -#ifdef DIAGNOSE - output << "Sending to " << p << endl; - for (int i = 0; i < 8; i++) { - output << "value " << i << " : " << myif(s0, i) << endl; + // Send data to the processors which are gathering + + gather_req.resize(ngatherprocs); // One request per gathering processor + gather_proc.resize(ngatherprocs); // Processor number + gather_sys_offset.resize(ngatherprocs); // Index of the first system gathered + gather_nsys.resize(ngatherprocs); // Number of systems + + // Interval between gathering processors + BoutReal pinterval = static_cast(nprocs) / ngatherprocs; + + { + int ns = + Nsys / ngatherprocs; // Number of systems to assign to all gathering processors + int nsextra = Nsys % ngatherprocs; // Number of processors with 1 extra + int s0 = 0; // Starting system number + + // Loop over gathering processors + for (int i = 0; i < ngatherprocs; i++) { + + int p = i; // Gathering onto all processors + if (ngatherprocs != nprocs) { + // Gathering onto only some + p = static_cast(pinterval * i); } + + int nsp = ns; // Number of systems to send to this processor + if (i < nsextra) { + nsp++; // Some processors get an extra system + } + + gather_proc[i] = p; + gather_sys_offset[i] = s0; + gather_nsys[i] = nsp; + + if ((p != myproc) && (nsp > 0)) { +#ifdef DIAGNOSE + output << "Sending to " << p << endl; #endif - MPI_Send(&myif(s0, 0), // Data pointer - 8 * nsp * sizeof(T), // Number - MPI_BYTE, // Type - p, // Destination - myproc, // Message identifier - comm); // Communicator + MPI_Send(&myif(s0, 0), // Data pointer + 8 * nsp * sizeof(T), // Number + MPI_BYTE, // Type + p, // Destination + myproc, // Message identifier + comm); // Communicator + } + + s0 += nsp; } - s0 += nsp; } - if (myns > 0) { + // If this processor is gathering systems + // Wait for data int p; do { MPI_Status stat; - MPI_Waitany(nprocs, req, &p, &stat); + MPI_Waitany(nprocs, all_req.data(), &p, &stat); if (p != MPI_UNDEFINED) { // p is the processor number. Copy data #ifdef DIAGNOSE @@ -289,12 +332,12 @@ public: for (int i = 0; i < myns; i++) { for (int j = 0; j < 8; j++) { #ifdef DIAGNOSE - output << "Value " << j << " : " << recvbuffer(p, 8 * i + j) << endl; + output << "Value " << j << " : " << all_buffer(p, 8 * i + j) << endl; #endif - ifcs(i, 8 * p + j) = recvbuffer(p, 8 * i + j); + ifcs(i, 8 * p + j) = all_buffer(p, 8 * i + j); } } - req[p] = MPI_REQUEST_NULL; + all_req[p] = MPI_REQUEST_NULL; } } while (p != MPI_UNDEFINED); @@ -354,12 +397,15 @@ public: /////////////////////////////////////// // Scatter back solution + // Allocate buffer space. Note assumes 0th processor has most systems + gather_buffer.reallocate(ngatherprocs, 2 * gather_nsys[0]); + // Post receives - for (int p = 0; p < nprocs; p++) { // Loop over processor - int nsp = ns; - if (p < nsextra) { - nsp++; - } + // Loop over gathering processors + for (int i = 0; i < ngatherprocs; i++) { + int p = gather_proc[i]; // Processor number + int nsp = gather_nsys[i]; // Number of systems + int len = 2 * nsp * sizeof(T); // 2 values per system if (p == myproc) { @@ -369,19 +415,19 @@ public: x1[sys0 + i] = ifx(i, 2 * p); xn[sys0 + i] = ifx(i, 2 * p + 1); } - req[p] = MPI_REQUEST_NULL; + gather_req[i] = MPI_REQUEST_NULL; } else if (nsp > 0) { #ifdef DIAGNOSE output << "Expecting receive from " << p << " of size " << len << endl; #endif - MPI_Irecv(&recvbuffer(p, 0), len, - MPI_BYTE, // Just sending raw data, unknown type - p, // Destination processor - p, // Identifier - comm, // Communicator - &req[p]); // Request + MPI_Irecv(&gather_buffer(i, 0), len, + MPI_BYTE, // Just receiving raw data, unknown type + p, // Origin processor + p, // Identifier + comm, // Communicator + &gather_req[i]); // Request } else { - req[p] = MPI_REQUEST_NULL; + gather_req[i] = MPI_REQUEST_NULL; } } @@ -406,106 +452,123 @@ public: } // Wait for data - int fromproc; - int nsp; + int fromind{MPI_UNDEFINED}; do { MPI_Status stat; - MPI_Waitany(nprocs, req, &fromproc, &stat); + MPI_Waitany(ngatherprocs, gather_req.data(), &fromind, &stat); - if (fromproc != MPI_UNDEFINED) { - // fromproc is the processor number. Copy data + if (fromind != MPI_UNDEFINED) { + // Copy data - int s0 = fromproc * ns; - if (fromproc > nsextra) { - s0 += nsextra; - } else { - s0 += fromproc; - } - - nsp = ns; - if (fromproc < nsextra) { - nsp++; - } + int s0 = gather_sys_offset[fromind]; // System index start + int nsp = gather_nsys[fromind]; // Number of systems BOUT_OMP(parallel for) for (int i = 0; i < nsp; i++) { - x1[s0 + i] = recvbuffer(fromproc, 2 * i); - xn[s0 + i] = recvbuffer(fromproc, 2 * i + 1); + x1[s0 + i] = gather_buffer(fromind, 2 * i); + xn[s0 + i] = gather_buffer(fromind, 2 * i + 1); #ifdef DIAGNOSE output << "Received x1,xn[" << s0 + i << "] = " << x1[s0 + i] << ", " - << xn[s0 + i] << " from " << fromproc << endl; + << xn[s0 + i] << " from " << gather_proc[fromind] << endl; #endif } - req[fromproc] = MPI_REQUEST_NULL; + gather_req[fromind] = MPI_REQUEST_NULL; } - } while (fromproc != MPI_UNDEFINED); + } while (fromind != MPI_UNDEFINED); } /////////////////////////////////////// // Solve local equations back_solve(Nsys, N, coefs, x1, xn, x); - delete[] req; } private: - MPI_Comm comm; ///< Communicator + MPI_Comm comm{}; ///< Communicator int nprocs{0}, myproc{-1}; ///< Number of processors and ID of my processor + int ngatherprocs{0}; ///< Number of processors to gather onto + std::vector gather_req; ///< Comms with gathering processors + std::vector gather_proc; ///< The processor number + std::vector gather_sys_offset; ///< Index of the first system gathered + std::vector gather_nsys; ///< Number of systems gathered + Matrix gather_buffer; ///< Buffer for receiving from gathering processors + + std::vector all_req; ///< Requests for comms with all processors + Matrix all_buffer; ///< Buffer for receiving from all other processors + int N{0}; ///< Total size of the problem int Nsys{0}; ///< Number of independent systems to solve - int myns; ///< Number of systems for interface solve on this processor - int sys0; ///< Starting system index for interface solve + int myns{0}; ///< Number of systems for interface solve on this processor + int sys0{0}; ///< Starting system index for interface solve bool periodic{false}; ///< Is the domain periodic? Matrix coefs; ///< Starting coefficients, rhs [Nsys, {3*coef,rhs}*N] Matrix myif; ///< Interface equations for this processor - Matrix recvbuffer; ///< Buffer for receiving from other processors - Matrix ifcs; ///< Coefficients for interface solve - Matrix if2x2; ///< 2x2 interface equations on this processor - Matrix ifx; ///< Solution of interface equations - Array ifp; ///< Interface equations returned to processor p - Array x1, xn; ///< Interface solutions for back-solving + Matrix ifcs; ///< Coefficients for interface solve + Matrix if2x2; ///< 2x2 interface equations on this processor + Matrix ifx; ///< Solution of interface equations + Array ifp; ///< Interface equations returned to processor p + Array x1, xn; ///< Interface solutions for back-solving /// Allocate memory arrays - /// @param[in] np Number of processors /// @param[in] nsys Number of independent systems to solve - /// @param[in] n Size of each system of equations - void allocMemory(int np, int nsys, int n) { - if ((nsys == Nsys) && (n == N) && (np == nprocs)) { + void allocMemory(int nsys) { + if (nsys == Nsys) { return; // No need to allocate memory } - nprocs = np; Nsys = nsys; - N = n; // Work out how many systems are going to be solved on this processor - int ns = nsys / nprocs; // Number of systems to assign to all processors - int nsextra = nsys % nprocs; // Number of processors with 1 extra - - myns = ns; // Number of systems to gather onto this processor - sys0 = ns * myproc; // Starting system number - if (myproc < nsextra) { - myns++; - sys0 += myproc; + int ns = nsys / ngatherprocs; // Number of systems to assign to all processors + int nsextra = nsys % ngatherprocs; // Number of processors with 1 extra + + if (ngatherprocs == nprocs) { + // Gathering onto all processors (This is the default) + + myns = ns; // Number of systems to gather onto this processor + sys0 = ns * myproc; // Starting system number + if (myproc < nsextra) { + myns++; + sys0 += myproc; + } else { + sys0 += nsextra; + } } else { - sys0 += nsextra; + // Gathering onto only a few processors + + myns = 0; // Default to not gathering onto this processor + + // Interval between processors + BoutReal pinterval = static_cast(nprocs) / ngatherprocs; + ASSERT1(pinterval > 1.0); + + // Calculate which processors these are + for (int i = 0; i < ngatherprocs; i++) { + int proc = static_cast(pinterval * i); + + if (proc == myproc) { + // This processor is receiving + + myns = ns; // Number of systems to gather onto this processor + sys0 = ns * i; // Starting system number + + if (i < nsextra) { + // Receiving an extra system + myns++; + sys0 += i; + } else { + sys0 += nsextra; + } + } + } } coefs.reallocate(Nsys, 4 * N); myif.reallocate(Nsys, 8); - // Note: The recvbuffer is used to receive data in both stages of the solve: - // 1. In the gather step, this processor will receive myns interface equations - // from each processor. - // 2. In the scatter step, this processor receives the solved interface values - // from each processor. The number of systems of equations received will - // vary from myns to myns+1 (if myproc >= nsextra). - // The size of the array reserved is therefore (myns+1) - recvbuffer.reallocate(nprocs, (myns + 1) * 8); - // Some interface systems to be solved on this processor // Note that the interface equations are organised by system (myns as first argument) // but communication buffers are organised by processor (nprocs first). @@ -640,4 +703,4 @@ private: } }; -#endif // __CYCLIC_REDUCE_H__ +#endif // CYCLIC_REDUCE_H_ diff --git a/src/invert/laplace/impls/cyclic/cyclic_laplace.cxx b/src/invert/laplace/impls/cyclic/cyclic_laplace.cxx index 2687bf7187..71e15b7767 100644 --- a/src/invert/laplace/impls/cyclic/cyclic_laplace.cxx +++ b/src/invert/laplace/impls/cyclic/cyclic_laplace.cxx @@ -96,16 +96,16 @@ LaplaceCyclic::LaplaceCyclic(Options* opt, const CELL_LOC loc, Mesh* mesh_in, xcmplx.reallocate(nmode, n); bcmplx.reallocate(nmode, n); + int ngather = + (*opt)["ngather"] + .doc("Number of processors in X to gather onto. Default (0) is all processors") + .withDefault(0); + // Create a cyclic reduction object, operating on dcomplex values - cr = new CyclicReduce(localmesh->getXcomm(), n); + cr = std::make_unique>(localmesh->getXcomm(), n, ngather); cr->setPeriodic(localmesh->periodicX); } -LaplaceCyclic::~LaplaceCyclic() { - // Delete tridiagonal solver - delete cr; -} - FieldPerp LaplaceCyclic::solve(const FieldPerp& rhs, const FieldPerp& x0) { ASSERT1(localmesh == rhs.getMesh() && localmesh == x0.getMesh()); ASSERT1(rhs.getLocation() == location); @@ -184,7 +184,6 @@ FieldPerp LaplaceCyclic::solve(const FieldPerp& rhs, const FieldPerp& x0) { BOUT_OMP(parallel) { /// Create a local thread-scope working array - // ZFFT routine expects input of this length auto k1d = Array(localmesh->LocalNz); diff --git a/src/invert/laplace/impls/cyclic/cyclic_laplace.hxx b/src/invert/laplace/impls/cyclic/cyclic_laplace.hxx index 841f0a4e05..7a094fb79e 100644 --- a/src/invert/laplace/impls/cyclic/cyclic_laplace.hxx +++ b/src/invert/laplace/impls/cyclic/cyclic_laplace.hxx @@ -28,8 +28,8 @@ class LaplaceCyclic; -#ifndef __LAP_CYCLIC_H__ -#define __LAP_CYCLIC_H__ +#ifndef LAP_CYCLIC_H_ +#define LAP_CYCLIC_H_ #include "bout/build_config.hxx" #include "bout/invert_laplace.hxx" @@ -61,7 +61,6 @@ class LaplaceCyclic : public Laplacian { public: LaplaceCyclic(Options* opt = nullptr, const CELL_LOC loc = CELL_CENTRE, Mesh* mesh_in = nullptr, Solver* solver = nullptr); - ~LaplaceCyclic(); using Laplacian::setCoefA; void setCoefA(const Field2D& val) override { @@ -120,9 +119,9 @@ private: bool dst; - CyclicReduce* cr; ///< Tridiagonal solver + std::unique_ptr> cr; ///< Tridiagonal solver }; #endif // BOUT_USE_METRIC_3D -#endif // __SPT_H__ +#endif // LAP_CYCLIC_H_ diff --git a/tests/integrated/test-cyclic/runtest b/tests/integrated/test-cyclic/runtest index 5e31da6239..a0388e8b5a 100755 --- a/tests/integrated/test-cyclic/runtest +++ b/tests/integrated/test-cyclic/runtest @@ -20,7 +20,14 @@ from sys import stdout, exit build_and_log("Cyclic Reduction test") -flags = ["", "nsys=2", "nsys=5 periodic", "nsys=7 n=10"] +flags = [ + "", + "nsys=2", + "nsys=5 periodic", + "nsys=7 n=10", + "ngather=1", + "nsys=7 n=10 ngather=1", +] code = 0 # Return code for nproc in [1, 2, 4]: diff --git a/tests/integrated/test-cyclic/test_cyclic.cxx b/tests/integrated/test-cyclic/test_cyclic.cxx index 0f48a1b706..80aa35525a 100644 --- a/tests/integrated/test-cyclic/test_cyclic.cxx +++ b/tests/integrated/test-cyclic/test_cyclic.cxx @@ -21,7 +21,7 @@ int main(int argc, char** argv) { int nsys; int n; - Options* options = Options::getRoot(); + Options& options = Options::root(); OPTION(options, n, 5); OPTION(options, nsys, 1); BoutReal tol; @@ -29,13 +29,16 @@ int main(int argc, char** argv) { bool periodic; OPTION(options, periodic, false); - // Create a cyclic reduction object, operating on Ts - auto* cr = new CyclicReduce(BoutComm::get(), n); - int mype, npe; MPI_Comm_rank(BoutComm::get(), &mype); MPI_Comm_size(BoutComm::get(), &npe); + int ngather = + options["ngather"].doc("The number of processors to gather onto").withDefault(npe); + + // Create a cyclic reduction object, operating on Ts + auto cr = std::make_unique>(BoutComm::get(), n, ngather); + a.reallocate(nsys, n); b.reallocate(nsys, n); c.reallocate(nsys, n); @@ -88,9 +91,6 @@ int main(int argc, char** argv) { cr->setCoefs(a, b, c); cr->solve(rhs, x); - // Destroy solver - delete cr; - // Check result int passed = 1; diff --git a/tests/integrated/test-laplace/runtest b/tests/integrated/test-laplace/runtest index e54e46d0d8..b435f80833 100755 --- a/tests/integrated/test-laplace/runtest +++ b/tests/integrated/test-laplace/runtest @@ -54,18 +54,27 @@ print("Running Laplacian inversion test") success = True for solver in ["cyclic", "pcr", "pcr_thomas"]: - for nproc in [1, 2, 4]: - nxpe = 1 - if nproc > 2: - nxpe = 2 - - cmd = "./test_laplace NXPE=" + str(nxpe) + " laplace:type=" + solver + # Try different combinations of processor numbers + # If not cyclic then ignore ngather option + for nproc, nxpe, ngather in [(1, 1, 1), (2, 1, 1), (2, 2, 2), (4, 2, 1), (4, 4, 2)]: + cmd = ( + "./test_laplace NXPE=" + + str(nxpe) + + " laplace:type=" + + solver + + ((" laplace:ngather=" + str(ngather)) if solver == "cyclic" else "") + ) shell("rm data/BOUT.dmp.*.nc") - print(" %s solver with %d processors (nxpe = %d)...." % (solver, nproc, nxpe)) + print( + " %s solver with %d processors (nxpe = %d, ngather = %d)...." + % (solver, nproc, nxpe, ngather) + ) s, out = launch_safe(cmd, nproc=nproc, mthread=1, pipe=True) - with open("run.log." + str(nproc), "w") as f: + with open( + "run.log." + str(nproc) + "." + str(nxpe) + "." + str(ngather), "w" + ) as f: f.write(out) # Collect output data