NAMD
SynchronousCollectives.h
Go to the documentation of this file.
1 #ifndef SYNCHRONOUS_COLLECTIVES_H
2 #define SYNCHRONOUS_COLLECTIVES_H
3 
4 #include "charm++.h"
5 
6 #include "main.h"
7 #include "NamdTypes.h"
8 #include "ProcessorPrivate.h"
9 #include "CudaRecord.h"
10 #include "CudaUtils.h"
11 
12 #include "SynchronousCollectives.decl.h"
13 
14 #include <vector>
15 #include <any>
16 #include <map>
17 
18 #if defined(NAMD_CUDA) || defined(NAMD_HIP)
19 
20 /*
21  * Defines the scopes used by collectives
22  */
23 enum class SynchronousCollectiveScope: unsigned int {
24  all,
25  master,
26  single
27 };
28 
30  public CkMcastBaseMsg, public CMessage_SynchronousCollectivesMulticastMsg
31 {
32 public:
34 };
35 
36 /*
37  * A collection of bulk synchronous collective functions used in the GPU resident code
38  *
39  * The general strategy for the collectives is for the calling thread to invoke the
40  * entry methods and then enter a loop yielding control to the converse scheduler
41  * and checking if the expected messages have arrived. The entry methods store the
42  * data they receive in a temporary buffer and then indicate that they have received
43  * that message.
44  *
45  * This approach doesn't have strict ordering guarantees for entry functions.
46  * For example:
47  *
48  * If we have three PEs (0, 1, 2) which call some entry function A. If PE 0 receives
49  * messages from PEs 1 and 2 and executes the corresponding A entry functions, it will
50  * continue with the main thread execution. However, there is no guarantee PE 1 has
51  * received the message from PE 2 at this point. PE 0 could then invoke another entry
52  * function, B. PE 1 could execute B based on PE 0's message before it executes A based
53  * on PE 2's message.
54  *
55  * The entry methods store the data they receive in a std::map; the key to the map is
56  * an unsigned integer that gets incremented when the collectives is called and then
57  * passed to the entry function.
58  *
59  */
60 class SynchronousCollectives : public CBase_SynchronousCollectives
61 {
62 public:
64  return CkpvAccess(SynchronousCollectives_instance);
65  }
66  static SynchronousCollectives* ObjectOnPe(const int pe) {
67  return CkpvAccessOther(SynchronousCollectives_instance, CmiRankOf(pe));
68  }
69 
72 
73  /*
74  * @brief Initializes the all-scoped collectives
75  *
76  * This will initialize the list of PEs and barrier used for all-scoped collectives
77  */
78  void initAllScope();
79 
80  /*
81  * @brief Initializes the masterPE-scoped collectives
82  *
83  * This will initialize the Charm++ sections used for master PE communication as well
84  * as the barrier for master PE scoped collectives. This is separate from the init
85  * for all-scoped collectives because we need information from the GlobalGPUMgr, but
86  * the GlobalGPUMgr needs to use all-scoped collectives to initialize.
87  *
88  * This needs to be called by all PEs.
89  *
90  * @param[in] isMasterPe whether or not this PE is a master PE
91  * @param[in] isMasterDevice whether or not PE is assigned to the master device
92  * @param[in] numDevices number of devices used by NAMD across all nodes
93  * @param[in] deviceIndex this devices index among GPUs on all nodes
94  * @param[in] masterPeList list of master PEs on all nodes used by master PE scoped
95  * collectives
96  */
97  void initMasterScope(const int isMasterPe, const int isMasterDevice,
98  const int numDevices, const int deviceIndex, const std::vector<int>& masterPeList);
99 
100  /*
101  * @brief Computes the element-wise reduction between PEs on a vector of elements
102  *
103  * This uses Charm++'s reduction functionality to compute the given reduction over
104  * a std::vector. It will either involved all PEs or master PEs depending on the
105  * given scope. This function can be called with a master PE scope by non-master PEs,
106  * in this case, they will return their input data without blocking.
107  *
108  *
109  * @param[in] data Reference to std::vector of data on which the reduction will happen
110  * @param[in] type The type of reduction to perform
111  * @param[in] scope The scope of the collective operation
112  *
113  * @return A vector of data containing the result of the allreduce. This will be the
114  * same length as the inputed data
115  */
116  template<typename T>
117  std::vector<T> allReduce(std::vector<T>& data, CkReduction::reducerType type,
118  const SynchronousCollectiveScope scope);
119 
120  /*
121  * @brief Performs an all gather between PEs
122  *
123  * This performs an allgather between either all PEs or master PEs depending on the given
124  * scope. Each participating PE provides data that will be distributed to all other participating
125  * PEs; this operation works on std::vectors.
126  *
127  * @param[in] data Reference to data being spent by this PE
128  * @param[in] scope The scope of the collective operation
129  *
130  * @return A vector containing data entries from all participating PEs
131  */
132  template<typename T>
133  std::vector<T> allGather(const T& data, const SynchronousCollectiveScope scope);
134 
135  /*
136  * @brief Performs an alltoallv between PEs
137  *
138  * This performs an alltoallv between all PEs or master PEs depending on the given scope. Each PE
139  * provides a vector of data where each element is sent to a different remote PE. This operation
140  * works on std::vectors, allowing for different amounts of data to be sent to different PEs.
141  *
142  * @param[in] data Reference to data being spent by this PE
143  * @param[in] scope The scope of the collective operation
144  *
145  * @return A vector containing data entries from all participating PEs
146  */
147  template<typename T>
148  std::vector<T> alltoallv(const std::vector<T>& data, const SynchronousCollectiveScope scope);
149 
150  /*
151  * @brief Performs a broadcast
152  *
153  * This performs a broadcast from one PE to either all PEs or master PEs depending on the
154  * given scope. The PE with isRoot set to true will broadcast data, and isRoot should only
155  * be true on one PE.
156  *
157  * @param[in] data Reference to data being sent to other PEs
158  * @param[in] isRoot Boolean set to true for the broadcasting PE
159  * @param[in] scope The scope of the broadcast
160  *
161  * @return Data from root PE
162  */
163  template<typename T>
164  T broadcast(const T& data, const bool isRoot, const SynchronousCollectiveScope scope);
165 
166  /*
167  * @brief Waits for all charm++ operations to complete and awakens current thread
168  *
169  * This will suspend all threads an use quiescence detection to resume execution.
170  * This allows for all outstanding Charm++ operations to complete. This is necessary
171  * in the GPU resident code path in order to use the existing broadcast, reduction, and
172  * outputting code where it needs to wait for Charm++ entry functions to complete
173  */
174  void waitAndAwaken();
175 
176  /*
177  * @brief Barrier function
178  *
179  * This will block until all participating PEs have reached the barrier. It can operate
180  * with all PEs or just master PEs.
181  *
182  * @param[in] scope The scope of the broadcast
183  */
184  void barrier(const SynchronousCollectiveScope scope);
185 
186  /*
187  * @brief Barrier All Function
188  *
189  * This function will always use a single barrier between all PEs, where as the
190  * barrier function can use barrier between master PEs plus a CmiNodeBarrier. Sometimes
191  * this function is needs during startup.
192  *
193  * @param[in] scope The scope of the broadcast
194  */
195  void forceBarrierAll();
196 
197  /*
198  * Charm++ Entry Functions
199  */
200 
201  /*
202  * Sends a message to all master PE to initialize multi-cast cookie for reductions
203  */
205 
206  /*
207  * This will be used as the callback function given to Charm++'s reduction
208  * function. It will be invoked on one PE and will broadcast the results
209  * of the reduction to all other PEs
210  */
211  void handleReductionAll(CkReductionMsg *msg);
212 
213  /*
214  * This will be used as the callback function given to Charm++'s reduction
215  * function. It will be invoked on one PE and will broadcast the results
216  * of the reduction to all other master PEs
217  */
218  void handleReductionMaster(CkReductionMsg *msg);
219 
220  /*
221  * This function will be called by one PE and be executed on all other PEs. It sets
222  * temp value of all PEs to the reduction value and then increments the count
223  */
224  void broadcastReductionResult(int n, char* data);
225 
226  /*
227  * Entry function for all gather and all-to-all; receives data and increments count
228  */
229  template<typename T>
230  void recvIndexData(const int index, const T& data, const SynchronousCollectiveScope scope,
231  const unsigned int key);
232 
233  /*
234  * Entry function that receives data and increments count
235  */
236  template<typename T>
237  void recvBroadcast(const T& data, const unsigned int key);
238 
239 /*
240  * @brief Helper function used by waitAndAwaken
241  *
242  * This will use quiescence detection to allow for all outstanding charm++ operations
243  * to finish
244  */
245  void wait();
246 
247  /*
248  * Entry function used in all-scoped barrier; this will just increment the counter
249  */
250  void recvBarrierAll(const int PE);
251 
252  /*
253  * Entry function used in all-scoped barrier; this will just increment the counter
254  */
255  void recvBarrierMasterPe(const int deviceIndex);
256 
257 private:
258  /*
259  * Helper function to call entry functions for all gather
260  */
261  template<typename T>
262  void sendAllGather(const T& data, const SynchronousCollectiveScope scope, const unsigned int key);
263 
264  /*
265  * Helper function to call entry functions for all-to-all
266  */
267  template<typename T>
268  void sendAlltoallv(const std::vector<T>& data, const SynchronousCollectiveScope scope, const unsigned int key);
269 
270  /*
271  * Helper function to call entry functions for broadcast
272  */
273  template<typename T>
274  void sendBroadcast(const T& data, const SynchronousCollectiveScope scope, const unsigned int key);
275 
276  /*
277  * @brief Helper function for counting received messages
278  *
279  * This function is called by entry functions in order to record that a message
280  * has been received.
281  */
282  void incrementCount(const SynchronousCollectiveScope scope, const int index);
283 
284  /*
285  * @brief Helper function for waiting for all expected messages
286  *
287  * This function will enter a while loop yielding control to converse and checking
288  * if all the expected messages (based on scope) have been received.
289  */
290  void suspendAndCheck(const SynchronousCollectiveScope scope);
291  void setThread(CthThread thread) { self_awaken_thread_ = thread; }
292 
293  /*
294  * Helper function that grabs data from temp storage
295  */
296  template<typename T>
297  T retrieveTemp(const unsigned int key);
298 
299  /*
300  * Helper function that returns the key for the given scope
301  */
302  unsigned int getKey(const SynchronousCollectiveScope scope) {
303  return (scope == SynchronousCollectiveScope::all) ? tempDataAllKey_++ : tempDataMasterKey_++;
304  }
305 
306  /*
307  * Helper function that returns barrier based on the given scope
308  */
309  std::vector<int>& getBarrier(const SynchronousCollectiveScope scope) {
310  if (scope == SynchronousCollectiveScope::all) {
311  return currentBarrierAll_;
312  } else if (scope == SynchronousCollectiveScope::master) {
313  return currentBarrierMasterPe_;
314  } else {
315  return currentBarrierSingle_;
316  }
317  }
318 
319  std::vector<int> masterPeList_;
320  CProxySection_SynchronousCollectives masterPes_;
321  CProxySection_SynchronousCollectives masterPesMulticast_;
322  CkSectionInfo reductionCookie_;
323  CProxy_SynchronousCollectives allPes_;
324 
325  // Used to store temporary variables
326  unsigned int tempDataMasterKey_ = 0;
327  unsigned int tempDataAllKey_ = 0;
328  std::map<unsigned int, std::any> tempData_;
329  std::any reductionTemp_;
330  void* reductionPtr_;
331 
332  // Barrier counters
333  std::vector<int> currentBarrierAll_;
334  std::vector<int> currentBarrierMasterPe_;
335  std::vector<int> currentBarrierSingle_;
336 
337  // Used for QD
338  int waitPhase_ = 0;
339  CthThread self_awaken_thread_;
340 
341  // Store information for MasterPe scoped communication
342  int isMasterPe_ = 0;
343  int numDevices_ = 0;
344  int deviceIndex_ = -1;
345 };
346 
347 #if !(defined(__NVCC__) || defined(__HIPCC__))
348 #include <pup.h>
349 PUPbytes (cudaIpcMemHandle_t);
350 #endif
351 
352 #endif /* NAMD_CUDA || NAMD_HIP */
353 
354 #define CK_TEMPLATES_ONLY
355 #include "SynchronousCollectives.def.h"
356 #undef CK_TEMPLATES_ONLY
357 
358 #endif /* SYNCHRONOUS_COLLECTIVES_H */
void barrier(const SynchronousCollectiveScope scope)
std::vector< T > allGather(const T &data, const SynchronousCollectiveScope scope)
void handleReductionMaster(CkReductionMsg *msg)
std::vector< T > allReduce(std::vector< T > &data, CkReduction::reducerType type, const SynchronousCollectiveScope scope)
void initMasterScope(const int isMasterPe, const int isMasterDevice, const int numDevices, const int deviceIndex, const std::vector< int > &masterPeList)
int masterPeList[MAX_NUM_DEVICES]
Definition: DeviceCUDA.C:95
T broadcast(const T &data, const bool isRoot, const SynchronousCollectiveScope scope)
static SynchronousCollectives * ObjectOnPe(const int pe)
void recvBarrierAll(const int PE)
void recvBarrierMasterPe(const int deviceIndex)
void broadcastReductionResult(int n, char *data)
SynchronousCollectiveScope
void recvIndexData(const int index, const T &data, const SynchronousCollectiveScope scope, const unsigned int key)
void setupMulticastSection(SynchronousCollectivesMulticastMsg *msg)
void handleReductionAll(CkReductionMsg *msg)
std::vector< T > alltoallv(const std::vector< T > &data, const SynchronousCollectiveScope scope)
PUPbytes(cudaIpcMemHandle_t)
static SynchronousCollectives * Object()
void recvBroadcast(const T &data, const unsigned int key)