NAMD
SynchronousCollectives.C
Go to the documentation of this file.
1 #include "CudaRecord.h"
2 #include "CudaUtils.h"
3 #include "NamdTypes.h"
4 
5 #include "SynchronousCollectives.decl.h"
7 #include "Node.h"
8 #include "SimParameters.h"
9 #include "NamdEventsProfiling.h"
10 #include "Priorities.h"
11 
12 #include <cstring> // std::memcpy
13 
14 #include "charm++.h"
15 
16 #if defined(NAMD_CUDA) || defined(NAMD_HIP)
17 
18 /*
19  * PUP (Pack-UnPack) various types that will communicated via Charm++
20  */
21 #if !(defined(__NVCC__) || defined(__HIPCC__))
22 #include <pup.h>
24 #endif // !((__NVCC__) || (__HIPCC__))
25 
27 {
28  if (CkpvAccess(SynchronousCollectives_instance) == NULL) {
29  CkpvAccess(SynchronousCollectives_instance) = this;
30  } else {
31  NAMD_bug("SynchronousCollectives instanced twice on same processor!");
32  }
33 }
34 
36 
38  allPes_ = CkpvAccess(BOCclass_group).synchronousCollectives;
39  currentBarrierAll_ = std::vector<int>(CkNumPes(), 0);
40  currentBarrierSingle_ = std::vector<int>(1, 0);
41 }
42 
43 void SynchronousCollectives::initMasterScope(const int isMasterPe, const int isMasterDevice,
44  const int numDevices, const int deviceIndex, const std::vector<int>& masterPeList) {
45 
46  isMasterPe_ = isMasterPe;
47  numDevices_ = numDevices;
48  deviceIndex_ = deviceIndex;
49  masterPeList_ = masterPeList;
50 
51  currentBarrierMasterPe_ = std::vector<int>(numDevices_, 0);
52  forceBarrierAll(); // Make sure all PEs have set expectedBarrierMasterPe
53 
54  masterPes_ = CProxySection_SynchronousCollectives(allPes_.ckGetGroupID(),
55  masterPeList_.data(), masterPeList_.size());
56  masterPesMulticast_ = CProxySection_SynchronousCollectives(allPes_.ckGetGroupID(),
57  masterPeList_.data(), masterPeList_.size());
58 
59  //
60  // For section broadcasts, we must use the multi-cast library; however, it requires explicitly
61  // defined messages, so the multi-cast section will not be used for the non-reduction sections
62  //
63  CProxy_CkMulticastMgr mcastProxy = CkpvAccess(BOCclass_group).multicastMgr;
64  CkMulticastMgr *mcastPtr = CProxy_CkMulticastMgr(mcastProxy).ckLocalBranch();
65  masterPesMulticast_.ckSectionDelegate(mcastPtr);
66 
67  if (isMasterDevice && isMasterPe_) {
69  setThread(CthSelf());
70  masterPesMulticast_.setupMulticastSection(msg);
71  suspendAndCheck(SynchronousCollectiveScope::single);
72  } else if (isMasterPe_) {
73  suspendAndCheck(SynchronousCollectiveScope::single);
74  }
75 
76  forceBarrierAll(); // Make sure all PEs have set expectedBarrierMasterPe
77 }
78 
79 void SynchronousCollectives::incrementCount(const SynchronousCollectiveScope scope, const int index) {
80  auto& currentBarrier = getBarrier(scope);
81  if (currentBarrier.size() <= index) {
82  NAMD_bug("SynchronousCollectives currentBarrier not large enough");
83  }
84  currentBarrier[index]++;
85 }
86 
87 void SynchronousCollectives::suspendAndCheck(const SynchronousCollectiveScope scope) {
88  auto& currentBarrier = getBarrier(scope);
89  bool done = true;
90  do {
91  CthYield();
92  done = true;
93  for (size_t i = 0; i < currentBarrier.size(); i++) {
94  done = (done && currentBarrier[i]);
95  }
96  } while (!done);
97 
98  for (size_t i = 0; i < currentBarrier.size(); i++) {
99  currentBarrier[i]--;
100  }
101 }
102 
104  int finished = false;
105  switch (waitPhase_) {
106  case 0:
107  break;
108  case 1:
109  finished = true;
110  break;
111  }
112 
113  waitPhase_++;
114  if (!CkMyPe()) {
115  if (!finished) {
116  CkStartQD(CkCallback(CkIndex_SynchronousCollectives::wait(), thisgroup));
117  }
118  }
119 
120  if (finished) {
121  waitPhase_ = 0;
122  CthAwaken(self_awaken_thread_);
123  }
124 }
125 
127  setThread(CthSelf());
128  wait();
129  CthSuspend();
130 }
131 
133  incrementCount(SynchronousCollectiveScope::all, PE);
134 }
135 
136 void SynchronousCollectives::recvBarrierMasterPe(const int deviceIndex) {
137  incrementCount(SynchronousCollectiveScope::master, deviceIndex);
138 }
139 
141  allPes_.recvBarrierAll(CkMyPe());
142  suspendAndCheck(SynchronousCollectiveScope::all);
143 }
144 
146  if (scope == SynchronousCollectiveScope::single) {
147  NAMD_bug("SynchronousCollectives::barrier does not support single scope");
148  }
149 
150  NAMD_EVENT_START(1, NamdProfileEvent::CHARM_BARRIER);
151  if (scope == SynchronousCollectiveScope::all) {
152  if (CkNumNodes() == 1) {
153  CmiNodeBarrier();
154  } else if (currentBarrierMasterPe_.size() == 0) {
155  // If expectedBarrierMasterPe is not set, then we need to
156  // default back to a true all synchronization
157  forceBarrierAll();
158  } else {
159  if (isMasterPe_) {
161  }
162  CmiNodeBarrier();
163  }
164  } else if (isMasterPe_) {
165  masterPes_.recvBarrierMasterPe(deviceIndex_);
166  suspendAndCheck(SynchronousCollectiveScope::master);
167  }
168  NAMD_EVENT_STOP(1, NamdProfileEvent::CHARM_BARRIER);
169 }
170 
172  allPes_.broadcastReductionResult(msg->getSize(), (char*) msg->getData());
173  // TODO: Should this message be deleted here
174 }
175 
177  masterPes_.broadcastReductionResult(msg->getSize(), (char*) msg->getData());
178  // TODO: Should this message be deleted here
179 }
180 
182  std::memcpy(reductionPtr_, (void*) data, n);
183  incrementCount(SynchronousCollectiveScope::single, 0);
184 }
185 
186 template<typename T>
187 std::vector<T> SynchronousCollectives::allReduce(std::vector<T>& data, CkReduction::reducerType type,
189  if (scope == SynchronousCollectiveScope::single) {
190  NAMD_bug("SynchronousCollectives::allreduce does not support single scope");
191  }
192  NAMD_EVENT_START(1, NamdProfileEvent::CHARM_ALLREDUCE);
193 
194  std::vector<T> out;
195  reductionTemp_ = std::vector<T>(data.size());
196  reductionPtr_ = (void*) std::any_cast<std::vector<T>&>(reductionTemp_).data();
197 
198  setThread(CthSelf());
199  if (scope == SynchronousCollectiveScope::all) {
200  CkCallback cb(CkReductionTarget(SynchronousCollectives, handleReductionAll),
201  thisProxy[thisIndex]);
202 
203  contribute(data.size() * sizeof(T), data.data(), type, cb);
204  suspendAndCheck(SynchronousCollectiveScope::single);
205  } else if (isMasterPe_) {
206  CkCallback cb(CkReductionTarget(SynchronousCollectives, handleReductionMaster),
207  thisProxy[thisIndex]);
208 
209  CProxy_CkMulticastMgr mcastProxy = CkpvAccess(BOCclass_group).multicastMgr;
210  CkMulticastMgr *mcastPtr = CProxy_CkMulticastMgr(mcastProxy).ckLocalBranch();
211  mcastPtr->contribute(data.size() * sizeof(T), data.data(), type, reductionCookie_, cb);
212  suspendAndCheck(SynchronousCollectiveScope::single);
213  }
214  out = std::move(std::any_cast<std::vector<T>&>(reductionTemp_));
215  NAMD_EVENT_STOP(1, NamdProfileEvent::CHARM_ALLREDUCE);
216  return out;
217 }
218 
220  CkGetSectionInfo(reductionCookie_, msg);
221  delete msg;
222  incrementCount(SynchronousCollectiveScope::single, 0);
223 }
224 
225 template<typename T>
226 void SynchronousCollectives::sendAllGather(const T& data, const SynchronousCollectiveScope scope, const unsigned int key) {
227  if (scope == SynchronousCollectiveScope::all) {
228  allPes_.recvIndexData(CkMyPe(), data, scope, key);
229  } else if (isMasterPe_) {
230  masterPes_.recvIndexData(deviceIndex_, data, scope, key);
231  }
232 }
233 
234 template<typename T>
235 void SynchronousCollectives::recvIndexData(const int index, const T& data, const SynchronousCollectiveScope scope, const unsigned int key) {
236  const int tempSize = (scope == SynchronousCollectiveScope::all) ? CkNumPes() : numDevices_;
237  auto res = tempData_.try_emplace(key, std::in_place_type<std::vector<T>>, tempSize);
238 
239  std::vector<T>& tempVec = std::any_cast<std::vector<T>&>(res.first->second);
240  if (index >= tempVec.size()) {
241  NAMD_die("SynchronousCollectives::recvIndexData: temp array not large enough");
242  }
243 
244  tempVec[index] = std::move(data);
245  incrementCount(scope, index);
246 }
247 
248 template<typename T>
249 T SynchronousCollectives::retrieveTemp(const unsigned int key) {
250  auto outIter = tempData_.find(key);
251  if (outIter == tempData_.end()) {
252  NAMD_die("SynchronousCollectives::retrieveTemp: could not find data");
253  }
254  auto out = std::move(std::any_cast<T&>(outIter->second));
255  tempData_.erase(key);
256  return out;
257 }
258 
259 template<typename T>
260 std::vector<T> SynchronousCollectives::allGather(const T& data, const SynchronousCollectiveScope scope) {
261  if (scope == SynchronousCollectiveScope::single) {
262  NAMD_bug("SynchronousCollectives::allgather does not support single scope");
263  }
264  NAMD_EVENT_START(1, NamdProfileEvent::CHARM_ALLGATHER);
265 
266  std::vector<T> out;
267  if (scope == SynchronousCollectiveScope::all || isMasterPe_) {
268  const unsigned int key = getKey(scope);
269  sendAllGather<T>(data, scope, key);
270  suspendAndCheck(scope);
271 
272  out = retrieveTemp<std::vector<T>>(key);
273  }
274  NAMD_EVENT_STOP(1, NamdProfileEvent::CHARM_ALLGATHER);
275  return out;
276 }
277 
278 template<typename T>
279 void SynchronousCollectives::sendAlltoallv(const std::vector<T>& data,
280  const SynchronousCollectiveScope scope, const unsigned int key) {
281 
282  CProxy_SynchronousCollectives cp(thisgroup);
283  if (scope == SynchronousCollectiveScope::all) {
284  for (size_t i = 0; i < CkNumPes(); i++) {
285  cp[i].recvIndexData<T>(CkMyPe(), data[i], scope, key);
286  }
287  } else if (isMasterPe_) {
288  for (size_t i = 0; i < numDevices_; i++) {
289  const int PE = masterPeList_[i];
290  cp[PE].recvIndexData<T>(deviceIndex_, data[i], scope, key);
291  }
292  }
293 }
294 
295 template<typename T>
296 std::vector<T> SynchronousCollectives::alltoallv(const std::vector<T>& data, const SynchronousCollectiveScope scope) {
297  if (scope == SynchronousCollectiveScope::single) {
298  NAMD_bug("SynchronousCollectives::alltoallv does not support single scope");
299  }
300  NAMD_EVENT_START(1, NamdProfileEvent::CHARM_ALLTOALL);
301 
302  std::vector<T> out;
303 
304  if (scope == SynchronousCollectiveScope::all || isMasterPe_) {
305  const unsigned int key = getKey(scope);
306  sendAlltoallv<T>(data, scope, key);
307  suspendAndCheck(scope);
308  out = retrieveTemp<std::vector<T>>(key);
309  }
310 
311  NAMD_EVENT_STOP(1, NamdProfileEvent::CHARM_ALLTOALL);
312  return out;
313 }
314 
315 template<typename T>
316 void SynchronousCollectives::sendBroadcast(const T& data, const SynchronousCollectiveScope scope, const unsigned int key) {
317  if (scope == SynchronousCollectiveScope::all) {
318  allPes_.recvBroadcast<T>(data, key);
319  } else if (isMasterPe_) {
320  masterPes_.recvBroadcast<T>(data, key);
321  }
322 }
323 
324 template<typename T>
325 void SynchronousCollectives::recvBroadcast(const T& data, const unsigned int key) {
326  // Since we are only expecting one message, the key should not exist in the map
327  tempData_.insert({key, std::move(data)});
328  incrementCount(SynchronousCollectiveScope::single, 0);
329 }
330 
331 template<typename T>
332 T SynchronousCollectives::broadcast(const T& data, const bool isRoot, const SynchronousCollectiveScope scope) {
333  if (scope == SynchronousCollectiveScope::single) {
334  NAMD_bug("SynchronousCollectives::broadcast does not support single scope");
335  }
336  NAMD_EVENT_START(1, NamdProfileEvent::CHARM_BROADCAST);
337 
338  T out = data; // If we are not participating in the broadcast, just return the input
339  if (scope == SynchronousCollectiveScope::all || isMasterPe_) {
340  const unsigned int key = getKey(scope);
341  if (isRoot) {
342  sendBroadcast(data, scope, key);
343  }
344  suspendAndCheck(SynchronousCollectiveScope::single);
345 
346  out = retrieveTemp<T>(key);
347  }
348  NAMD_EVENT_STOP(1, NamdProfileEvent::CHARM_BROADCAST);
349  return out;
350 }
351 
352 #define INSTANTIATE_ALLGATHER(type) \
353 template std::vector<type> \
354 SynchronousCollectives::allGather<type>(const type&, SynchronousCollectiveScope);
355 
357 INSTANTIATE_ALLGATHER(unsigned long long);
358 #if defined(NAMD_CUDA) || defined(NAMD_HIP)
359 INSTANTIATE_ALLGATHER(cudaIpcMemHandle_t);
360 #endif // NAMD_CUDA || NAMD_HIP
361 INSTANTIATE_ALLGATHER(std::vector<CudaLocalRecord>);
362 INSTANTIATE_ALLGATHER(std::vector<int>);
363 
364 #undef INSTANTIATE_ALLGATHER
365 
366 #define INSTANTIATE_ALLTOALLV(type) \
367 template std::vector<type> \
368 SynchronousCollectives::alltoallv<type>(const std::vector<type>&, SynchronousCollectiveScope);
369 
370 INSTANTIATE_ALLTOALLV(std::vector<int>);
372 
373 #undef INSTANTIATE_ALLTOALLV
374 
375 #define INSTANTIATE_ALLREDUCE(type) \
376 template std::vector<type> \
377 SynchronousCollectives::allReduce<type>(std::vector<type>&, \
378  CkReduction::reducerType, SynchronousCollectiveScope)
379 
380 INSTANTIATE_ALLREDUCE(unsigned int);
381 INSTANTIATE_ALLREDUCE(size_t);
382 INSTANTIATE_ALLREDUCE(double);
383 
384 #undef INSTANTIATE_ALLREDUCE
385 
386 #endif /* NAMD_CUDA || NAMD_HIP */
387 
388 #include "SynchronousCollectives.def.h"
389 
void barrier(const SynchronousCollectiveScope scope)
#define NAMD_EVENT_STOP(eon, id)
std::vector< T > allGather(const T &data, const SynchronousCollectiveScope scope)
void handleReductionMaster(CkReductionMsg *msg)
std::vector< T > allReduce(std::vector< T > &data, CkReduction::reducerType type, const SynchronousCollectiveScope scope)
#define INSTANTIATE_ALLGATHER(type)
void initMasterScope(const int isMasterPe, const int isMasterDevice, const int numDevices, const int deviceIndex, const std::vector< int > &masterPeList)
int masterPeList[MAX_NUM_DEVICES]
Definition: DeviceCUDA.C:95
T broadcast(const T &data, const bool isRoot, const SynchronousCollectiveScope scope)
#define NAMD_EVENT_START(eon, id)
void recvBarrierAll(const int PE)
void NAMD_bug(const char *err_msg)
Definition: common.C:195
void recvBarrierMasterPe(const int deviceIndex)
void broadcastReductionResult(int n, char *data)
void NAMD_die(const char *err_msg)
Definition: common.C:147
SynchronousCollectiveScope
void recvIndexData(const int index, const T &data, const SynchronousCollectiveScope scope, const unsigned int key)
#define INSTANTIATE_ALLREDUCE(type)
void setupMulticastSection(SynchronousCollectivesMulticastMsg *msg)
void handleReductionAll(CkReductionMsg *msg)
PUPbytes(CudaLocalRecord)
#define INSTANTIATE_ALLTOALLV(type)
std::vector< T > alltoallv(const std::vector< T > &data, const SynchronousCollectiveScope scope)
void recvBroadcast(const T &data, const unsigned int key)