$treeview $search $mathjax
Palabos  Version 1.1
$projectbrief
$projectbrief
$searchbox

sendRecvPool.h

Go to the documentation of this file.
00001 /* This file is part of the Palabos library.
00002  *
00003  * Copyright (C) 2011 FlowKit Sarl
00004  * Avenue de Chailly 23
00005  * 1012 Lausanne, Switzerland
00006  * E-mail contact: contact@flowkit.com
00007  *
00008  * The most recent release of Palabos can be downloaded at 
00009  * <http://www.palabos.org/>
00010  *
00011  * The library Palabos is free software: you can redistribute it and/or
00012  * modify it under the terms of the GNU Affero General Public License as
00013  * published by the Free Software Foundation, either version 3 of the
00014  * License, or (at your option) any later version.
00015  *
00016  * The library is distributed in the hope that it will be useful,
00017  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00018  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00019  * GNU Affero General Public License for more details.
00020  *
00021  * You should have received a copy of the GNU Affero General Public License
00022  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
00023 */
00024 
00029 #ifndef SEND_RECV_POOL_H
00030 #define SEND_RECV_POOL_H
00031 
00032 #include "core/globalDefs.h"
00033 #include "core/util.h"
00034 #include "parallelism/mpiManager.h"
00035 #include <map>
00036 #include <vector>
00037 #include <sstream>
00038 
00039 namespace plb {
00040 
00041 #ifdef PLB_MPI_PARALLEL
00042 
00044 struct PoolEntry {
00045     PoolEntry()
00046         : cumDataLength(0), lengths()
00047     { }
00048     int cumDataLength;
00049     std::vector<int> lengths;
00050 };
00051 
00053 class SendRecvPool {
00054 public:
00055     typedef std::map<int, PoolEntry> SubsT;
00056 public:
00057     void subscribeMessage(int proc, int numData) {
00058         PoolEntry& entry = subscriptions[proc];
00059         entry.lengths.push_back(numData);
00060         entry.cumDataLength += numData;
00061     }
00062     void clear() {
00063         subscriptions.clear();
00064     }
00065     SubsT::const_iterator begin() const {
00066         //PLB_PRECONDITION( !subscriptions.empty() );
00067         return subscriptions.begin();
00068     }
00069     SubsT::const_iterator end() const {
00070         //PLB_PRECONDITION( !subscriptions.empty() );
00071         return subscriptions.end();
00072     }
00073     bool empty() const { return subscriptions.empty(); }
00074 private:
00075     SubsT subscriptions;
00076 };
00077 
00080 struct CommunicatorEntry {
00081     CommunicatorEntry() 
00082         : lengths(),
00083           cumDataLength(0),
00084           messages(),
00085           data(),
00086           currentMessage(0)
00087     { } 
00088     CommunicatorEntry(PoolEntry const& poolEntry)
00089         : lengths(poolEntry.lengths),
00090           cumDataLength(poolEntry.cumDataLength),
00091           messages(lengths.size()),
00092           currentMessage(0)
00093     {
00094         for (pluint iMessage=0; iMessage<messages.size(); ++iMessage) {
00095             messages[iMessage].resize(lengths[iMessage]);
00096         }
00097     }
00098     void reset() {
00099         currentMessage=0;
00100     }
00101     std::string info() {
00102         std::stringstream infostr;
00103         for (pluint iL=0; iL<lengths.size(); ++iL) {
00104             int length = lengths[iL];
00105             PLB_ASSERT(length==(int)messages[iL].size());
00106             infostr << length << " ";
00107         }
00108         return infostr.str();
00109     }
00110     std::vector<int> lengths;
00111     int              cumDataLength;
00112     std::vector<std::vector<char> > messages;
00117     std::vector<char> data;
00118     int currentMessage;
00119     MPI_Request sizeRequest, messageRequest;
00120     MPI_Status  sizeStatus, messageStatus;
00121 };
00122 
00124 class SendPoolCommunicator {
00125 public:
00126     SendPoolCommunicator() { }
00127     SendPoolCommunicator(SendRecvPool const& pool);
00128     std::vector<char>& getSendBuffer(int toProc);
00129     void acceptMessage(int toProc, bool staticMessage);
00130     void finalize(bool staticMessage);
00131 private:
00132     void startCommunication(int toProc, bool staticMessage);
00133 private:
00134     std::map<int, CommunicatorEntry > subscriptions;
00135 };
00136 
00138 class RecvPoolCommunicator {
00139 public:
00140     RecvPoolCommunicator() { }
00141     RecvPoolCommunicator(SendRecvPool const& pool);
00143     void startBeingReceptive(bool staticMessage);
00144     std::vector<char> const& receiveMessage(int fromProc, bool staticMessage);
00145 private:
00146     void finalizeStatic(int fromProc);
00147     void receiveDynamic(int fromProc);
00148 private:
00149     std::map<int, CommunicatorEntry > subscriptions;
00150 };
00151 
00152 #endif  // PLB_MPI_PARALLEL
00153 
00154 }  // namespace plb
00155 
00156 #endif  // SEND_RECV_POOL_H