NAMD
Public Member Functions | Static Public Member Functions | List of all members
SynchronousCollectives Class Reference

#include <SynchronousCollectives.h>

Inheritance diagram for SynchronousCollectives:

Public Member Functions

 SynchronousCollectives ()
 
 ~SynchronousCollectives ()
 
void initAllScope ()
 
void initMasterScope (const int isMasterPe, const int isMasterDevice, const int numDevices, const int deviceIndex, const std::vector< int > &masterPeList)
 
template<typename T >
std::vector< T > allReduce (std::vector< T > &data, CkReduction::reducerType type, const SynchronousCollectiveScope scope)
 
template<typename T >
std::vector< T > allGather (const T &data, const SynchronousCollectiveScope scope)
 
template<typename T >
std::vector< T > alltoallv (const std::vector< T > &data, const SynchronousCollectiveScope scope)
 
template<typename T >
broadcast (const T &data, const bool isRoot, const SynchronousCollectiveScope scope)
 
void waitAndAwaken ()
 
void barrier (const SynchronousCollectiveScope scope)
 
void forceBarrierAll ()
 
void setupMulticastSection (SynchronousCollectivesMulticastMsg *msg)
 
void handleReductionAll (CkReductionMsg *msg)
 
void handleReductionMaster (CkReductionMsg *msg)
 
void broadcastReductionResult (int n, char *data)
 
template<typename T >
void recvIndexData (const int index, const T &data, const SynchronousCollectiveScope scope, const unsigned int key)
 
template<typename T >
void recvBroadcast (const T &data, const unsigned int key)
 
void wait ()
 
void recvBarrierAll (const int PE)
 
void recvBarrierMasterPe (const int deviceIndex)
 

Static Public Member Functions

static SynchronousCollectivesObject ()
 
static SynchronousCollectivesObjectOnPe (const int pe)
 

Detailed Description

Definition at line 60 of file SynchronousCollectives.h.

Constructor & Destructor Documentation

◆ SynchronousCollectives()

SynchronousCollectives::SynchronousCollectives ( void  )

Definition at line 26 of file SynchronousCollectives.C.

References NAMD_bug().

27 {
28  if (CkpvAccess(SynchronousCollectives_instance) == NULL) {
29  CkpvAccess(SynchronousCollectives_instance) = this;
30  } else {
31  NAMD_bug("SynchronousCollectives instanced twice on same processor!");
32  }
33 }
void NAMD_bug(const char *err_msg)
Definition: common.C:196

◆ ~SynchronousCollectives()

SynchronousCollectives::~SynchronousCollectives ( void  )

Definition at line 35 of file SynchronousCollectives.C.

35 { }

Member Function Documentation

◆ allGather()

template<typename T >
std::vector< T > SynchronousCollectives::allGather ( const T &  data,
const SynchronousCollectiveScope  scope 
)

Definition at line 262 of file SynchronousCollectives.C.

References all, NAMD_bug(), NAMD_EVENT_START, NAMD_EVENT_STOP, and single.

Referenced by CollectiveDeviceBuffer< T >::allocate_no_check(), and GlobalGPUMgr::initialize().

262  {
263  if (scope == SynchronousCollectiveScope::single) {
264  NAMD_bug("SynchronousCollectives::allgather does not support single scope");
265  }
266  NAMD_EVENT_START(1, NamdProfileEvent::CHARM_ALLGATHER);
267 
268  std::vector<T> out;
269  if (scope == SynchronousCollectiveScope::all || isMasterPe_) {
270  const unsigned int key = getKey(scope);
271  sendAllGather<T>(data, scope, key);
272  suspendAndCheck(scope);
273 
274  out = retrieveTemp<std::vector<T>>(key);
275  }
276  NAMD_EVENT_STOP(1, NamdProfileEvent::CHARM_ALLGATHER);
277  return out;
278 }
#define NAMD_EVENT_STOP(eon, id)
#define NAMD_EVENT_START(eon, id)
void NAMD_bug(const char *err_msg)
Definition: common.C:196

◆ allReduce()

template<typename T >
std::vector< T > SynchronousCollectives::allReduce ( std::vector< T > &  data,
CkReduction::reducerType  type,
const SynchronousCollectiveScope  scope 
)

Definition at line 189 of file SynchronousCollectives.C.

References all, handleReductionAll(), handleReductionMaster(), NAMD_bug(), NAMD_EVENT_START, NAMD_EVENT_STOP, and single.

Referenced by CollectiveDeviceBuffer< T >::allocate(), and CollectiveDeviceBuffer< T >::reallocate().

190  {
191  if (scope == SynchronousCollectiveScope::single) {
192  NAMD_bug("SynchronousCollectives::allreduce does not support single scope");
193  }
194  NAMD_EVENT_START(1, NamdProfileEvent::CHARM_ALLREDUCE);
195 
196  std::vector<T> out;
197  reductionTemp_ = std::vector<T>(data.size());
198  reductionPtr_ = (void*) std::any_cast<std::vector<T>&>(reductionTemp_).data();
199 
200  setThread(CthSelf());
201  if (scope == SynchronousCollectiveScope::all) {
202  CkCallback cb(CkReductionTarget(SynchronousCollectives, handleReductionAll),
203  thisProxy[thisIndex]);
204 
205  contribute(data.size() * sizeof(T), data.data(), type, cb);
206  suspendAndCheck(SynchronousCollectiveScope::single);
207  } else if (isMasterPe_) {
208  CkCallback cb(CkReductionTarget(SynchronousCollectives, handleReductionMaster),
209  thisProxy[thisIndex]);
210 
211  CProxy_CkMulticastMgr mcastProxy = CkpvAccess(BOCclass_group).multicastMgr;
212  CkMulticastMgr *mcastPtr = CProxy_CkMulticastMgr(mcastProxy).ckLocalBranch();
213  mcastPtr->contribute(data.size() * sizeof(T), data.data(), type, reductionCookie_, cb);
214  suspendAndCheck(SynchronousCollectiveScope::single);
215  }
216  out = std::move(std::any_cast<std::vector<T>&>(reductionTemp_));
217  NAMD_EVENT_STOP(1, NamdProfileEvent::CHARM_ALLREDUCE);
218  return out;
219 }
#define NAMD_EVENT_STOP(eon, id)
void handleReductionMaster(CkReductionMsg *msg)
#define NAMD_EVENT_START(eon, id)
void NAMD_bug(const char *err_msg)
Definition: common.C:196
void handleReductionAll(CkReductionMsg *msg)

◆ alltoallv()

template<typename T >
std::vector< T > SynchronousCollectives::alltoallv ( const std::vector< T > &  data,
const SynchronousCollectiveScope  scope 
)

Definition at line 298 of file SynchronousCollectives.C.

References all, NAMD_bug(), NAMD_EVENT_START, NAMD_EVENT_STOP, and single.

298  {
299  if (scope == SynchronousCollectiveScope::single) {
300  NAMD_bug("SynchronousCollectives::alltoallv does not support single scope");
301  }
302  NAMD_EVENT_START(1, NamdProfileEvent::CHARM_ALLTOALL);
303 
304  std::vector<T> out;
305 
306  if (scope == SynchronousCollectiveScope::all || isMasterPe_) {
307  const unsigned int key = getKey(scope);
308  sendAlltoallv<T>(data, scope, key);
309  suspendAndCheck(scope);
310  out = retrieveTemp<std::vector<T>>(key);
311  }
312 
313  NAMD_EVENT_STOP(1, NamdProfileEvent::CHARM_ALLTOALL);
314  return out;
315 }
#define NAMD_EVENT_STOP(eon, id)
#define NAMD_EVENT_START(eon, id)
void NAMD_bug(const char *err_msg)
Definition: common.C:196

◆ barrier()

void SynchronousCollectives::barrier ( const SynchronousCollectiveScope  scope)

Definition at line 147 of file SynchronousCollectives.C.

References all, forceBarrierAll(), master, NAMD_bug(), NAMD_EVENT_START, NAMD_EVENT_STOP, and single.

147  {
148  if (scope == SynchronousCollectiveScope::single) {
149  NAMD_bug("SynchronousCollectives::barrier does not support single scope");
150  }
151 
152  NAMD_EVENT_START(1, NamdProfileEvent::CHARM_BARRIER);
153  if (scope == SynchronousCollectiveScope::all) {
154  if (CkNumNodes() == 1) {
155  CmiNodeBarrier();
156  } else if (currentBarrierMasterPe_.size() == 0) {
157  // If expectedBarrierMasterPe is not set, then we need to
158  // default back to a true all synchronization
159  forceBarrierAll();
160  } else {
161  if (isMasterPe_) {
163  }
164  CmiNodeBarrier();
165  }
166  } else if (isMasterPe_) {
167  masterPes_.recvBarrierMasterPe(deviceIndex_);
168  suspendAndCheck(SynchronousCollectiveScope::master);
169  }
170  NAMD_EVENT_STOP(1, NamdProfileEvent::CHARM_BARRIER);
171 }
void barrier(const SynchronousCollectiveScope scope)
#define NAMD_EVENT_STOP(eon, id)
#define NAMD_EVENT_START(eon, id)
void NAMD_bug(const char *err_msg)
Definition: common.C:196

◆ broadcast()

template<typename T >
T SynchronousCollectives::broadcast ( const T &  data,
const bool  isRoot,
const SynchronousCollectiveScope  scope 
)

Definition at line 334 of file SynchronousCollectives.C.

References all, NAMD_bug(), NAMD_EVENT_START, NAMD_EVENT_STOP, and single.

334  {
335  if (scope == SynchronousCollectiveScope::single) {
336  NAMD_bug("SynchronousCollectives::broadcast does not support single scope");
337  }
338  NAMD_EVENT_START(1, NamdProfileEvent::CHARM_BROADCAST);
339 
340  T out = data; // If we are not participating in the broadcast, just return the input
341  if (scope == SynchronousCollectiveScope::all || isMasterPe_) {
342  const unsigned int key = getKey(scope);
343  if (isRoot) {
344  sendBroadcast(data, scope, key);
345  }
346  suspendAndCheck(SynchronousCollectiveScope::single);
347 
348  out = retrieveTemp<T>(key);
349  }
350  NAMD_EVENT_STOP(1, NamdProfileEvent::CHARM_BROADCAST);
351  return out;
352 }
#define NAMD_EVENT_STOP(eon, id)
#define NAMD_EVENT_START(eon, id)
void NAMD_bug(const char *err_msg)
Definition: common.C:196

◆ broadcastReductionResult()

void SynchronousCollectives::broadcastReductionResult ( int  n,
char *  data 
)

Definition at line 183 of file SynchronousCollectives.C.

References single.

183  {
184  std::memcpy(reductionPtr_, (void*) data, n);
185  incrementCount(SynchronousCollectiveScope::single, 0);
186 }

◆ forceBarrierAll()

void SynchronousCollectives::forceBarrierAll ( )

Definition at line 142 of file SynchronousCollectives.C.

References all.

Referenced by barrier(), GlobalGPUMgr::initialize(), initializeGPUResident(), and initMasterScope().

142  {
143  allPes_.recvBarrierAll(CkMyPe());
144  suspendAndCheck(SynchronousCollectiveScope::all);
145 }

◆ handleReductionAll()

void SynchronousCollectives::handleReductionAll ( CkReductionMsg *  msg)

Definition at line 173 of file SynchronousCollectives.C.

Referenced by allReduce().

173  {
174  allPes_.broadcastReductionResult(msg->getSize(), (char*) msg->getData());
175  // TODO: Should this message be deleted here
176 }

◆ handleReductionMaster()

void SynchronousCollectives::handleReductionMaster ( CkReductionMsg *  msg)

Definition at line 178 of file SynchronousCollectives.C.

Referenced by allReduce().

178  {
179  masterPes_.broadcastReductionResult(msg->getSize(), (char*) msg->getData());
180  // TODO: Should this message be deleted here
181 }

◆ initAllScope()

void SynchronousCollectives::initAllScope ( )

Definition at line 37 of file SynchronousCollectives.C.

Referenced by Node::startup().

37  {
38  allPes_ = CkpvAccess(BOCclass_group).synchronousCollectives;
39  currentBarrierAll_ = std::vector<int>(CkNumPes(), 0);
40  currentBarrierSingle_ = std::vector<int>(1, 0);
41 }

◆ initMasterScope()

void SynchronousCollectives::initMasterScope ( const int  isMasterPe,
const int  isMasterDevice,
const int  numDevices,
const int  deviceIndex,
const std::vector< int > &  masterPeList 
)

Definition at line 43 of file SynchronousCollectives.C.

References forceBarrierAll(), masterPeList, and single.

Referenced by initializeGPUResident().

44  {
45 
46  isMasterPe_ = isMasterPe;
47  numDevices_ = numDevices;
48  deviceIndex_ = deviceIndex;
49  masterPeList_ = masterPeList;
50 
51  currentBarrierMasterPe_ = std::vector<int>(numDevices_, 0);
52  forceBarrierAll(); // Make sure all PEs have set expectedBarrierMasterPe
53 
54  masterPes_ = CProxySection_SynchronousCollectives(allPes_.ckGetGroupID(),
55  masterPeList_.data(), masterPeList_.size());
56  masterPesMulticast_ = CProxySection_SynchronousCollectives(allPes_.ckGetGroupID(),
57  masterPeList_.data(), masterPeList_.size());
58 
59  //
60  // For section broadcasts, we must use the multi-cast library; however, it requires explicitly
61  // defined messages, so the multi-cast section will not be used for the non-reduction sections
62  //
63  CProxy_CkMulticastMgr mcastProxy = CkpvAccess(BOCclass_group).multicastMgr;
64  CkMulticastMgr *mcastPtr = CProxy_CkMulticastMgr(mcastProxy).ckLocalBranch();
65  masterPesMulticast_.ckSectionDelegate(mcastPtr);
66 
67  if (isMasterDevice && isMasterPe_) {
69  setThread(CthSelf());
70  masterPesMulticast_.setupMulticastSection(msg);
71  suspendAndCheck(SynchronousCollectiveScope::single);
72  } else if (isMasterPe_) {
73  suspendAndCheck(SynchronousCollectiveScope::single);
74  }
75 
76  forceBarrierAll(); // Make sure all PEs have set expectedBarrierMasterPe
77 }
int masterPeList[MAX_NUM_DEVICES]
Definition: DeviceCUDA.C:95

◆ Object()

static SynchronousCollectives* SynchronousCollectives::Object ( void  )
inlinestatic

◆ ObjectOnPe()

static SynchronousCollectives* SynchronousCollectives::ObjectOnPe ( const int  pe)
inlinestatic

Definition at line 66 of file SynchronousCollectives.h.

66  {
67  return CkpvAccessOther(SynchronousCollectives_instance, CmiRankOf(pe));
68  }

◆ recvBarrierAll()

void SynchronousCollectives::recvBarrierAll ( const int  PE)

Definition at line 134 of file SynchronousCollectives.C.

References all.

134  {
135  incrementCount(SynchronousCollectiveScope::all, PE);
136 }

◆ recvBarrierMasterPe()

void SynchronousCollectives::recvBarrierMasterPe ( const int  deviceIndex)

Definition at line 138 of file SynchronousCollectives.C.

References master.

138  {
139  incrementCount(SynchronousCollectiveScope::master, deviceIndex);
140 }

◆ recvBroadcast()

template<typename T >
void SynchronousCollectives::recvBroadcast ( const T &  data,
const unsigned int  key 
)

Definition at line 327 of file SynchronousCollectives.C.

References single.

327  {
328  // Since we are only expecting one message, the key should not exist in the map
329  tempData_.insert({key, std::move(data)});
330  incrementCount(SynchronousCollectiveScope::single, 0);
331 }

◆ recvIndexData()

template<typename T >
void SynchronousCollectives::recvIndexData ( const int  index,
const T &  data,
const SynchronousCollectiveScope  scope,
const unsigned int  key 
)

Definition at line 237 of file SynchronousCollectives.C.

References all, and NAMD_die().

237  {
238  const int tempSize = (scope == SynchronousCollectiveScope::all) ? CkNumPes() : numDevices_;
239  auto res = tempData_.try_emplace(key, std::in_place_type<std::vector<T>>, tempSize);
240 
241  std::vector<T>& tempVec = std::any_cast<std::vector<T>&>(res.first->second);
242  if (index >= tempVec.size()) {
243  NAMD_die("SynchronousCollectives::recvIndexData: temp array not large enough");
244  }
245 
246  tempVec[index] = std::move(data);
247  incrementCount(scope, index);
248 }
void NAMD_die(const char *err_msg)
Definition: common.C:148

◆ setupMulticastSection()

void SynchronousCollectives::setupMulticastSection ( SynchronousCollectivesMulticastMsg msg)

Definition at line 221 of file SynchronousCollectives.C.

References single.

221  {
222  CkGetSectionInfo(reductionCookie_, msg);
223  delete msg;
224  incrementCount(SynchronousCollectiveScope::single, 0);
225 }

◆ wait()

void SynchronousCollectives::wait ( )

Definition at line 103 of file SynchronousCollectives.C.

Referenced by waitAndAwaken().

103  {
104  int finished = false;
105  switch (waitPhase_) {
106  case 0:
107  break;
108  case 1:
109  finished = true;
110  break;
111  }
112 
113  waitPhase_++;
114  if (!CkMyPe()) {
115  if (!finished) {
116  CkStartQD(CkCallback(CkIndex_SynchronousCollectives::wait(), thisgroup));
117  }
118  }
119 
120  if (finished) {
121  waitPhase_ = 0;
122  CthAwaken(self_awaken_thread_);
123  }
124 }

◆ waitAndAwaken()

void SynchronousCollectives::waitAndAwaken ( )

Definition at line 126 of file SynchronousCollectives.C.

References NAMD_EVENT_START, NAMD_EVENT_STOP, and wait().

126  {
127  NAMD_EVENT_START(1, NamdProfileEvent::CHARM_WAITANDAWAKEN);
128  setThread(CthSelf());
129  wait();
130  CthSuspend();
131  NAMD_EVENT_STOP(1, NamdProfileEvent::CHARM_WAITANDAWAKEN);
132 }
#define NAMD_EVENT_STOP(eon, id)
#define NAMD_EVENT_START(eon, id)

The documentation for this class was generated from the following files: