Alpine3D

Alpine3D Svn Source Tree

Root/trunk/alpine3d/MPIControl.cc

1/***********************************************************************************/
2/*Copyright 2009-2015 WSL Institute for Snow and Avalanche Research SLF-DAVOS */
3/***********************************************************************************/
4/* This file is part of Alpine3D.
5 Alpine3D is free software: you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
9
10 Alpine3D is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU Lesser General Public License for more details.
14
15 You should have received a copy of the GNU Lesser General Public License
16 along with Alpine3D. If not, see <http://www.gnu.org/licenses/>.
17*/
18#include <alpine3d/MPIControl.h>
19#include <alpine3d/SnowpackInterfaceWorker.h>
20
21#if (defined _WIN32 || defined __MINGW32__) && ! defined __CYGWIN__
22#include <winsock.h>
23#else
24#include <unistd.h>
25#endif
26#ifdef ENABLE_PETSC
27#include <petscksp.h>
28#endif
29
30using namespace std;
31using namespace mio;
32
33// Local oprators << and >> on pairs, nedded to serialize pairs in MPI communication
34std::istream& operator>>(std::istream& is, std::pair<size_t,size_t>& data)
35{
36is.read(reinterpret_cast<char*>(&data.first), sizeof(data.first));
37is.read(reinterpret_cast<char*>(&data.second), sizeof(data.second));
38return is;
39}
40
41std::ostream& operator<<(std::ostream& os, const std::pair<size_t,size_t>& data)
42{
43os.write(reinterpret_cast<const char*>(&data.first), sizeof(data.first));
44os.write(reinterpret_cast<const char*>(&data.second), sizeof(data.second));
45return os;
46}
47
48bool MPIControl::openmp() const
49{
50#ifdef _OPENMP
51return true;
52#else
53return false;
54#endif
55}
56
57size_t MPIControl::thread() const
58{
59#ifdef _OPENMP
60return static_cast<size_t>( omp_get_thread_num() );
61#else
62return 0;
63#endif
64}
65
66size_t MPIControl::max_threads() const
67{
68#ifdef _OPENMP
69return static_cast<size_t>( omp_get_max_threads() );
70#else
71return 1;
72#endif
73}
74
75size_t MPIControl::rank() const
76{
77return rank_;
78}
79
80size_t MPIControl::master_rank() const
81{
82return 0; //HACK: this assumes the master is always 0
83}
84
85size_t MPIControl::size() const
86{
87return size_;
88}
89
90std::string MPIControl::name() const
91{
92return name_;
93}
94
95bool MPIControl::master() const
96{
97return (rank_ == 0); //HACK: this assumes the master is always 0
98}
99
100#ifdef ENABLE_MPI
101MPIControl::MPIControl()
102{
103MPI_Init(NULL, NULL);
104
105#ifdef ENABLE_PETSC
106PetscInitialize(NULL, NULL, NULL, NULL);
107#endif
108
109int o_rank, o_size;
110MPI_Comm_rank(MPI_COMM_WORLD, &o_rank);
111MPI_Comm_size(MPI_COMM_WORLD, &o_size);
112
113if (o_rank<0 || o_size<0)
114throw mio::IOException("Invalid rank or size returned by MPI", AT);
115
116rank_ = static_cast<size_t>(o_rank);
117size_ = static_cast<size_t>(o_size);
118
119//MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN); // return info about errors
120
121char processor_name[MPI_MAX_PROCESSOR_NAME];
122int name_len;
123MPI_Get_processor_name(processor_name, &name_len);
124name_ = std::string(processor_name);
125
126std::cout << "[i] Init of MPI on '" << name_ << "' with a total world size of " << size_ << " (I'm rank #" << rank_ << ")\n";
127}
128
129MPIControl::~MPIControl()
130{
131MPI_Finalize();
132}
133
134void MPIControl::checkSuccess(const int& ierr)
135{
136if (ierr != MPI_SUCCESS) {
137int err_len = 0;
138char err_buffer[MPI_MAX_ERROR_STRING];
139
140MPI_Error_string(ierr, err_buffer, &err_len);
141throw mio::IOException(string(err_buffer), AT);
142}
143}
144
145void MPIControl::broadcast(std::string& message, const size_t& root)
146{
147int ierr;
148unsigned int msg_len = (unsigned int) message.size();
149
150//Now broadcast the size of the object and then the object itself
151ierr = MPI_Bcast(&msg_len, 1, MPI_UNSIGNED, root, MPI_COMM_WORLD);
152checkSuccess(ierr);
153
154if (rank_ != root) message.resize(msg_len);
155ierr = MPI_Bcast(const_cast<char*>(message.c_str()), msg_len, MPI_CHAR, root, MPI_COMM_WORLD);
156checkSuccess(ierr);
157}
158
159void MPIControl::send(std::string& message, const size_t& recipient, const int& tag)
160{
161int ierr;
162unsigned int msg_len = (unsigned int) message.size();
163
164ierr = MPI_Send(&msg_len, 1, MPI_UNSIGNED, static_cast<int>(recipient), tag, MPI_COMM_WORLD);
165checkSuccess(ierr);
166
167ierr = MPI_Send(const_cast<char*>(message.c_str()), msg_len, MPI_CHAR, static_cast<int>(recipient), tag, MPI_COMM_WORLD);
168checkSuccess(ierr);
169}
170
171void MPIControl::receive(std::string& message, const size_t& source, const int& tag)
172{
173int ierr;
174unsigned int msg_len;
175
176ierr = MPI_Recv(&msg_len, 1, MPI_UNSIGNED, static_cast<int>(source), tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
177checkSuccess(ierr);
178
179message.resize(msg_len);
180ierr = MPI_Recv(const_cast<char*>(message.c_str()), msg_len, MPI_CHAR, static_cast<int>(source), tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
181checkSuccess(ierr);
182}
183
184void MPIControl::send(const int& value, const size_t& recipient, const int& tag)
185{
186int buffer = value;
187MPI_Send(&buffer, 1, MPI_INT, static_cast<int>(recipient), tag, MPI_COMM_WORLD);
188}
189
190void MPIControl::receive(int& value, const size_t& source, const int& tag)
191{
192MPI_Recv(&value, 1, MPI_INT, static_cast<int>(source), tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
193}
194
195void MPIControl::gather(const int& send_value, std::vector<int>& receive_vector, const size_t& root)
196{
197int value = send_value; // make a copy to preserve const notion
198int* recv_data(NULL);
199if (rank_ == root) {
200receive_vector.resize(size_);
201recv_data = &receive_vector[0];
202}
203
204MPI_Gather(&value, 1, MPI_INT, recv_data, 1, MPI_INT, static_cast<int>(root), MPI_COMM_WORLD);
205}
206
207void MPIControl::allreduce_max(double& value)
208{
209double buffer;
210MPI_Allreduce(&value, &buffer, 1, MPI_DOUBLE, MPI_MAX, MPI_COMM_WORLD);
211value = buffer;
212}
213
214void MPIControl::allreduce_min(double& value)
215{
216double buffer;
217MPI_Allreduce(&value, &buffer, 1, MPI_DOUBLE, MPI_MIN, MPI_COMM_WORLD);
218value = buffer;
219}
220
221void MPIControl::allreduce_sum(double& value)
222{
223double buffer;
224MPI_Allreduce(&value, &buffer, 1, MPI_DOUBLE, MPI_SUM, MPI_COMM_WORLD);
225value = buffer;
226}
227
228void MPIControl::allreduce_sum(int& value)
229{
230int buffer;
231MPI_Allreduce(&value, &buffer, 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD);
232value = buffer;
233}
234
235void MPIControl::barrier() const
236{
237MPI_Barrier(MPI_COMM_WORLD);
238}
239
240#else
241std::string getHostName() {
242static const size_t len = 4096;
243
244#if (defined _WIN32 || defined __MINGW32__) && ! defined __CYGWIN__
245TCHAR infoBuf[len];
246DWORD bufCharCount = len;
247if ( !GetComputerName( infoBuf, &bufCharCount ) )
248return std::string("N/A");
249
250return std::string(infoBuf);
251 #else
252char name[len];
253if (gethostname(name, len) != 0) {
254return std::string("localhost");
255}
256
257if (name[0] == '\0') return std::string("localhost");
258return std::string(name);
259#endif
260}
261
262MPIControl::MPIControl() : rank_(0), size_(1), name_( getHostName() )
263{
264#ifdef _OPENMP
265std::cout << "[i] Init of OpenMP on '" << name_ << "' with a pool of " << omp_get_max_threads() << " threads\n";
266#endif
267}
268
269MPIControl::~MPIControl() {}
270void MPIControl::barrier() const {}
271void MPIControl::allreduce_max(double&) {}
272void MPIControl::allreduce_min(double&) {}
273void MPIControl::allreduce_sum(double&) {}
274void MPIControl::allreduce_sum(int&) {}
275void MPIControl::gather(const int& val, std::vector<int>& vec, const size_t&) { vec.resize(1, val); }
276#endif
277
278
279#ifdef ENABLE_MPI
280/**
281 * @briefSend the objects pointed to by vector<T*> to process \#destination
282 * @param[in] vec_local A vector of T* pointers to objects that shall be sent
283 * @param[in] destination The process rank that will receive the values
284 * @param[in] tag Arbitrary non-negative integer assigned to uniquely identify a message
285 * @note Class T needs to have the serialize and deseralize operator << and >> implemented
286 */
287template <class T> void MPIControl::send(const std::vector<T*>& vec_local, const size_t& destination, const int& tag)
288{
289if ((size_ <= 1) || (rank_ == destination)) return;
290
291const size_t v_size = vec_local.size();
292
293send((int)v_size, destination, tag); // first send the size of the vector
294for (size_t ii=0; ii<v_size; ii++) {
295std::string obj_string;
296std::stringstream objs_stream;
297
298objs_stream << *(vec_local[ii]);
299obj_string.insert(0, objs_stream.str());
300send(obj_string, destination, tag);
301}
302}
303/**
304 * @briefReceive vector of objects from process \#source
305 * @param[in] vec_local A vector of T* pointers to receive the object pointers
306 * @param[in] source The process rank that will send the objects
307 * @param[in] tag Arbitrary non-negative integer assigned to uniquely identify a message
308 * @note Class T needs to have the serialize and deseralize operator << and >> implemented
309 */
310template <class T> void MPIControl::receive(std::vector<T*>& vec_local, const size_t& source, const int& tag)
311{
312if ((size_ <= 1) || (rank_ == source)) return;
313
314if (!vec_local.empty())
315throw mio::IOException("The vector to receive pointers has to be empty (please properly free the vector)", AT);
316
317std::string obj_string;
318int v_size;
319receive(v_size, source, tag);
320vec_local.resize((size_t)v_size);
321
322T obj; // temporary object
323for (size_t ii=0; ii<vec_local.size(); ii++) {
324std::stringstream objs_stream;
325receive(obj_string, source, tag);
326
327objs_stream << obj_string;
328
329objs_stream >> obj;
330vec_local[ii] = new T(obj);
331}
332}
333
334/**
335 * @briefSend the objects pointed to by vector<T*> to process \#destination
336 * @param[in] vec_local A vector of T* pointers to objects that shall be sent
337 * @param[in] destination The process rank that will receive the values
338 * @param[in] tag Arbitrary non-negative integer assigned to uniquely identify a message
339 * @note Class T needs to have the serialize and deseralize operator << and >> implemented
340 */
341template <class T> void MPIControl::send(const std::vector<T>& vec_local, const size_t& destination, const int& tag)
342{
343if ((size_ <= 1) || (rank_ == destination)) return;
344
345const size_t v_size = vec_local.size();
346
347send((int)v_size, destination, tag); // first send the size of the vector
348for (size_t ii=0; ii<v_size; ii++) {
349std::string obj_string;
350std::stringstream objs_stream;
351
352objs_stream << vec_local[ii];
353obj_string.insert(0, objs_stream.str());
354send(obj_string, destination, tag);
355}
356}
357/**
358 * @briefReceive vector of objects from process \#source
359 * @param[in] vec_local A vector of T* pointers to receive the object pointers
360 * @param[in] source The process rank that will send the objects
361 * @param[in] tag Arbitrary non-negative integer assigned to uniquely identify a message
362 * @note Class T needs to have the serialize and deseralize operator << and >> implemented
363 */
364template <class T> void MPIControl::receive(std::vector<T>& vec_local, const size_t& source, const int& tag)
365{
366if ((size_ <= 1) || (rank_ == source)) return;
367
368if (!vec_local.empty())
369throw mio::IOException("The vector to receive pointers has to be empty (please properly free the vector)", AT);
370
371std::string obj_string;
372int v_size;
373receive(v_size, source, tag);
374vec_local.resize((size_t)v_size);
375
376T obj; // temporary object
377for (size_t ii=0; ii<vec_local.size(); ii++) {
378std::stringstream objs_stream;
379receive(obj_string, source, tag);
380
381objs_stream << obj_string;
382
383objs_stream >> obj;
384vec_local[ii] = T(obj);
385}
386}
387
388// Since template is in cc file (not possible to template on h, becuse if definition is is
389// h file, the custom >> and << declaration for std::pair should be in h file, which creates
390// conflict with other h files).
391template void MPIControl::receive<SnowStation>(std::vector<SnowStation*>&, const size_t&, const int&);
392template void MPIControl::send<SnowStation>(const std::vector<SnowStation*>&, const size_t&, const int&);
393
394template void MPIControl::receive<CurrentMeteo>(std::vector<CurrentMeteo*>&, const size_t&, const int&);
395template void MPIControl::send<CurrentMeteo>(const std::vector<CurrentMeteo*>&, const size_t&, const int&);
396
397template void MPIControl::receive<SurfaceFluxes>(std::vector<SurfaceFluxes*>&, const size_t&, const int&);
398template void MPIControl::send<SurfaceFluxes>(const std::vector<SurfaceFluxes*>&, const size_t&, const int&);
399
400template void MPIControl::receive< std::pair<unsigned long, unsigned long> >(std::vector<std::pair<unsigned long, unsigned long> >&, const size_t&, const int&);
401template void MPIControl::send< std::pair<unsigned long, unsigned long> >(const std::vector<std::pair<unsigned long, unsigned long> >&, const size_t&, const int&);
402
403#endif
404
405MPIControl& MPIControl::instance()
406{
407static MPIControl instance_;
408return instance_;
409}
410
411void MPIControl::getArraySliceParams(const size_t& dimx, const size_t& nbworkers, const size_t& idx_wk, size_t& startx_sub, size_t& nx_sub)
412{
413if (nbworkers < dimx) {
414const size_t nx_avg = static_cast<size_t>( floor(static_cast<double>(dimx) / static_cast<double>(nbworkers)) );
415const size_t remainder = dimx % nbworkers;
416
417if (idx_wk < remainder) { //distribute remainder, 1 extra column per worker, on first workers
418startx_sub = idx_wk * nx_avg + idx_wk;
419nx_sub = nx_avg + 1;
420} else { //all remainder has been distributed, we now attribute a normal number of columns
421startx_sub = idx_wk * nx_avg + remainder;
422nx_sub = nx_avg;
423}
424} else {
425if (idx_wk < dimx) {
426startx_sub = idx_wk;
427nx_sub = 1;
428} else {
429nx_sub = 0;
430startx_sub = 0;
431}
432}
433}
434
435/**
436 * @briefSplit the domain for MPI, balance the laod between MPI instances taking
437 * into account cells where no comutation are required
438 * @param[in] dimx size in x of the DEM (not necessary anymore, but retained to have simmilar call than getArraySliceParams)
439 * @param[in] idx_wk MPI id of the current instance
440 * @param[in] startx_sub Where the number of columns to compute for this instance will be stored
441 * @param[in] nx_sub Where the starting point for this instance will be stored
442 * @param[in] dem DEM used in the model
443 * @param[in] landuse Landuse grid used in the model, nodata in landuse should indicate where no computaiton is required
444 */
445void MPIControl::getArraySliceParamsOptim(const size_t& dimx, const size_t& idx_wk, size_t& startx_sub, size_t& nx_sub,const mio::DEMObject& dem, const mio::Grid2DObject& landuse)
446{
447//Nothing to do is size_=1
448if (size_==1){
449nx_sub = dimx;
450startx_sub = 0;
451return;
452}
453
454//Check for each col how many cells to compute
455const size_t dimy = dem.getNy();
456size_t n_skip_cell = 0;
457std::vector<size_t> cells_per_col(dimx,0);
458for (size_t ix = 0; ix < dimx; ix++) {
459for (size_t iy = 0; iy < dimy; iy++) {
460if (SnowpackInterfaceWorker::skipThisCell(landuse(ix,iy), dem(ix,iy))) {
461//skip nodata cells as well as water bodies, etc
462n_skip_cell++;
463} else {
464cells_per_col.at(ix)++;
465}
466}
467}
468
469const size_t num_cells_to_compute = dimx*dimy - n_skip_cell;
470const size_t mean_num_cells_per_mpi = num_cells_to_compute / size_;
471
472std::vector<size_t> startx(size_,0);
473std::vector<size_t> nx(size_,0);
474std::vector<size_t> n_cells(size_,0);
475size_t current_x=0;
476size_t current_num_cells=0;
477size_t total_num_cells=0;
478for (size_t i=0; i<size_-1;++i) {
479startx.at(i) = current_x;
480current_num_cells = 0;
481//do-while ot force to always add one column
482do {
483current_num_cells += cells_per_col.at(current_x);
484total_num_cells+=cells_per_col.at(current_x);
485current_x++;
486}
487while(current_x < dimx
488&& total_num_cells + cells_per_col.at(current_x) < (i+1.2)*mean_num_cells_per_mpi
489&& (dimx-current_x) > (size_-i));
490//The i+1.2 test is to allow to add a bit more, otherwise last column get all the remainders and is huge
491//The last test is to be sure that at least 1 column per MPI remains
492n_cells.at(i) = current_num_cells;
493//No -1 required because curent_x already incremented once in the "while" loop
494nx.at(i) = current_x-startx.at(i);
495}
496startx.at(size_-1) = current_x;
497current_num_cells = 0;
498//Finish to fill the last slice with remaining cells
499while( current_x < dimx) {
500current_num_cells+=cells_per_col.at(current_x);
501current_x++;
502}
503n_cells.at(size_-1) = current_num_cells;
504nx.at(size_-1) = current_x-startx.at(size_-1);
505
506startx_sub = startx.at(idx_wk);
507nx_sub = nx.at(idx_wk);
508}

Archive Download this file

Revision: HEAD