NAMD
Public Member Functions | Static Public Member Functions | List of all members
SynchronousCollectives Class Reference

#include <SynchronousCollectives.h>

Inheritance diagram for SynchronousCollectives:

Public Member Functions

 SynchronousCollectives ()
 
 ~SynchronousCollectives ()
 
void initAllScope ()
 
void initMasterScope (const int isMasterPe, const int isMasterDevice, const int numDevices, const int deviceIndex, const std::vector< int > &masterPeList)
 
template<typename T >
std::vector< T > allReduce (std::vector< T > &data, CkReduction::reducerType type, const SynchronousCollectiveScope scope)
 
template<typename T >
std::vector< T > allGather (const T &data, const SynchronousCollectiveScope scope)
 
template<typename T >
std::vector< T > alltoallv (const std::vector< T > &data, const SynchronousCollectiveScope scope)
 
template<typename T >
broadcast (const T &data, const bool isRoot, const SynchronousCollectiveScope scope)
 
void waitAndAwaken ()
 
void barrier (const SynchronousCollectiveScope scope)
 
void forceBarrierAll ()
 
void setupMulticastSection (SynchronousCollectivesMulticastMsg *msg)
 
void handleReductionAll (CkReductionMsg *msg)
 
void handleReductionMaster (CkReductionMsg *msg)
 
void broadcastReductionResult (int n, char *data)
 
template<typename T >
void recvIndexData (const int index, const T &data, const SynchronousCollectiveScope scope, const unsigned int key)
 
template<typename T >
void recvBroadcast (const T &data, const unsigned int key)
 
void wait ()
 
void recvBarrierAll (const int PE)
 
void recvBarrierMasterPe (const int deviceIndex)
 

Static Public Member Functions

static SynchronousCollectivesObject ()
 
static SynchronousCollectivesObjectOnPe (const int pe)
 

Detailed Description

Definition at line 60 of file SynchronousCollectives.h.

Constructor & Destructor Documentation

◆ SynchronousCollectives()

SynchronousCollectives::SynchronousCollectives ( void  )

Definition at line 26 of file SynchronousCollectives.C.

References NAMD_bug().

27 {
28  if (CkpvAccess(SynchronousCollectives_instance) == NULL) {
29  CkpvAccess(SynchronousCollectives_instance) = this;
30  } else {
31  NAMD_bug("SynchronousCollectives instanced twice on same processor!");
32  }
33 }
void NAMD_bug(const char *err_msg)
Definition: common.C:195

◆ ~SynchronousCollectives()

SynchronousCollectives::~SynchronousCollectives ( void  )

Definition at line 35 of file SynchronousCollectives.C.

35 { }

Member Function Documentation

◆ allGather()

template<typename T >
std::vector< T > SynchronousCollectives::allGather ( const T &  data,
const SynchronousCollectiveScope  scope 
)

Definition at line 260 of file SynchronousCollectives.C.

References all, NAMD_bug(), NAMD_EVENT_START, NAMD_EVENT_STOP, and single.

Referenced by GlobalGPUMgr::initialize().

260  {
261  if (scope == SynchronousCollectiveScope::single) {
262  NAMD_bug("SynchronousCollectives::allgather does not support single scope");
263  }
264  NAMD_EVENT_START(1, NamdProfileEvent::CHARM_ALLGATHER);
265 
266  std::vector<T> out;
267  if (scope == SynchronousCollectiveScope::all || isMasterPe_) {
268  const unsigned int key = getKey(scope);
269  sendAllGather<T>(data, scope, key);
270  suspendAndCheck(scope);
271 
272  out = retrieveTemp<std::vector<T>>(key);
273  }
274  NAMD_EVENT_STOP(1, NamdProfileEvent::CHARM_ALLGATHER);
275  return out;
276 }
#define NAMD_EVENT_STOP(eon, id)
#define NAMD_EVENT_START(eon, id)
void NAMD_bug(const char *err_msg)
Definition: common.C:195

◆ allReduce()

template<typename T >
std::vector< T > SynchronousCollectives::allReduce ( std::vector< T > &  data,
CkReduction::reducerType  type,
const SynchronousCollectiveScope  scope 
)

Definition at line 187 of file SynchronousCollectives.C.

References all, handleReductionAll(), handleReductionMaster(), NAMD_bug(), NAMD_EVENT_START, NAMD_EVENT_STOP, and single.

188  {
189  if (scope == SynchronousCollectiveScope::single) {
190  NAMD_bug("SynchronousCollectives::allreduce does not support single scope");
191  }
192  NAMD_EVENT_START(1, NamdProfileEvent::CHARM_ALLREDUCE);
193 
194  std::vector<T> out;
195  reductionTemp_ = std::vector<T>(data.size());
196  reductionPtr_ = (void*) std::any_cast<std::vector<T>&>(reductionTemp_).data();
197 
198  setThread(CthSelf());
199  if (scope == SynchronousCollectiveScope::all) {
200  CkCallback cb(CkReductionTarget(SynchronousCollectives, handleReductionAll),
201  thisProxy[thisIndex]);
202 
203  contribute(data.size() * sizeof(T), data.data(), type, cb);
204  suspendAndCheck(SynchronousCollectiveScope::single);
205  } else if (isMasterPe_) {
206  CkCallback cb(CkReductionTarget(SynchronousCollectives, handleReductionMaster),
207  thisProxy[thisIndex]);
208 
209  CProxy_CkMulticastMgr mcastProxy = CkpvAccess(BOCclass_group).multicastMgr;
210  CkMulticastMgr *mcastPtr = CProxy_CkMulticastMgr(mcastProxy).ckLocalBranch();
211  mcastPtr->contribute(data.size() * sizeof(T), data.data(), type, reductionCookie_, cb);
212  suspendAndCheck(SynchronousCollectiveScope::single);
213  }
214  out = std::move(std::any_cast<std::vector<T>&>(reductionTemp_));
215  NAMD_EVENT_STOP(1, NamdProfileEvent::CHARM_ALLREDUCE);
216  return out;
217 }
#define NAMD_EVENT_STOP(eon, id)
void handleReductionMaster(CkReductionMsg *msg)
#define NAMD_EVENT_START(eon, id)
void NAMD_bug(const char *err_msg)
Definition: common.C:195
void handleReductionAll(CkReductionMsg *msg)

◆ alltoallv()

template<typename T >
std::vector< T > SynchronousCollectives::alltoallv ( const std::vector< T > &  data,
const SynchronousCollectiveScope  scope 
)

Definition at line 296 of file SynchronousCollectives.C.

References all, NAMD_bug(), NAMD_EVENT_START, NAMD_EVENT_STOP, and single.

296  {
297  if (scope == SynchronousCollectiveScope::single) {
298  NAMD_bug("SynchronousCollectives::alltoallv does not support single scope");
299  }
300  NAMD_EVENT_START(1, NamdProfileEvent::CHARM_ALLTOALL);
301 
302  std::vector<T> out;
303 
304  if (scope == SynchronousCollectiveScope::all || isMasterPe_) {
305  const unsigned int key = getKey(scope);
306  sendAlltoallv<T>(data, scope, key);
307  suspendAndCheck(scope);
308  out = retrieveTemp<std::vector<T>>(key);
309  }
310 
311  NAMD_EVENT_STOP(1, NamdProfileEvent::CHARM_ALLTOALL);
312  return out;
313 }
#define NAMD_EVENT_STOP(eon, id)
#define NAMD_EVENT_START(eon, id)
void NAMD_bug(const char *err_msg)
Definition: common.C:195

◆ barrier()

void SynchronousCollectives::barrier ( const SynchronousCollectiveScope  scope)

Definition at line 145 of file SynchronousCollectives.C.

References all, forceBarrierAll(), master, NAMD_bug(), NAMD_EVENT_START, NAMD_EVENT_STOP, and single.

145  {
146  if (scope == SynchronousCollectiveScope::single) {
147  NAMD_bug("SynchronousCollectives::barrier does not support single scope");
148  }
149 
150  NAMD_EVENT_START(1, NamdProfileEvent::CHARM_BARRIER);
151  if (scope == SynchronousCollectiveScope::all) {
152  if (CkNumNodes() == 1) {
153  CmiNodeBarrier();
154  } else if (currentBarrierMasterPe_.size() == 0) {
155  // If expectedBarrierMasterPe is not set, then we need to
156  // default back to a true all synchronization
157  forceBarrierAll();
158  } else {
159  if (isMasterPe_) {
161  }
162  CmiNodeBarrier();
163  }
164  } else if (isMasterPe_) {
165  masterPes_.recvBarrierMasterPe(deviceIndex_);
166  suspendAndCheck(SynchronousCollectiveScope::master);
167  }
168  NAMD_EVENT_STOP(1, NamdProfileEvent::CHARM_BARRIER);
169 }
void barrier(const SynchronousCollectiveScope scope)
#define NAMD_EVENT_STOP(eon, id)
#define NAMD_EVENT_START(eon, id)
void NAMD_bug(const char *err_msg)
Definition: common.C:195

◆ broadcast()

template<typename T >
T SynchronousCollectives::broadcast ( const T &  data,
const bool  isRoot,
const SynchronousCollectiveScope  scope 
)

Definition at line 332 of file SynchronousCollectives.C.

References all, NAMD_bug(), NAMD_EVENT_START, NAMD_EVENT_STOP, and single.

332  {
333  if (scope == SynchronousCollectiveScope::single) {
334  NAMD_bug("SynchronousCollectives::broadcast does not support single scope");
335  }
336  NAMD_EVENT_START(1, NamdProfileEvent::CHARM_BROADCAST);
337 
338  T out = data; // If we are not participating in the broadcast, just return the input
339  if (scope == SynchronousCollectiveScope::all || isMasterPe_) {
340  const unsigned int key = getKey(scope);
341  if (isRoot) {
342  sendBroadcast(data, scope, key);
343  }
344  suspendAndCheck(SynchronousCollectiveScope::single);
345 
346  out = retrieveTemp<T>(key);
347  }
348  NAMD_EVENT_STOP(1, NamdProfileEvent::CHARM_BROADCAST);
349  return out;
350 }
#define NAMD_EVENT_STOP(eon, id)
#define NAMD_EVENT_START(eon, id)
void NAMD_bug(const char *err_msg)
Definition: common.C:195

◆ broadcastReductionResult()

void SynchronousCollectives::broadcastReductionResult ( int  n,
char *  data 
)

Definition at line 181 of file SynchronousCollectives.C.

References single.

181  {
182  std::memcpy(reductionPtr_, (void*) data, n);
183  incrementCount(SynchronousCollectiveScope::single, 0);
184 }

◆ forceBarrierAll()

void SynchronousCollectives::forceBarrierAll ( )

Definition at line 140 of file SynchronousCollectives.C.

References all.

Referenced by barrier(), GlobalGPUMgr::initialize(), initializeGPUResident(), and initMasterScope().

140  {
141  allPes_.recvBarrierAll(CkMyPe());
142  suspendAndCheck(SynchronousCollectiveScope::all);
143 }

◆ handleReductionAll()

void SynchronousCollectives::handleReductionAll ( CkReductionMsg *  msg)

Definition at line 171 of file SynchronousCollectives.C.

Referenced by allReduce().

171  {
172  allPes_.broadcastReductionResult(msg->getSize(), (char*) msg->getData());
173  // TODO: Should this message be deleted here
174 }

◆ handleReductionMaster()

void SynchronousCollectives::handleReductionMaster ( CkReductionMsg *  msg)

Definition at line 176 of file SynchronousCollectives.C.

Referenced by allReduce().

176  {
177  masterPes_.broadcastReductionResult(msg->getSize(), (char*) msg->getData());
178  // TODO: Should this message be deleted here
179 }

◆ initAllScope()

void SynchronousCollectives::initAllScope ( )

Definition at line 37 of file SynchronousCollectives.C.

Referenced by Node::startup().

37  {
38  allPes_ = CkpvAccess(BOCclass_group).synchronousCollectives;
39  currentBarrierAll_ = std::vector<int>(CkNumPes(), 0);
40  currentBarrierSingle_ = std::vector<int>(1, 0);
41 }

◆ initMasterScope()

void SynchronousCollectives::initMasterScope ( const int  isMasterPe,
const int  isMasterDevice,
const int  numDevices,
const int  deviceIndex,
const std::vector< int > &  masterPeList 
)

Definition at line 43 of file SynchronousCollectives.C.

References forceBarrierAll(), masterPeList, and single.

Referenced by initializeGPUResident().

44  {
45 
46  isMasterPe_ = isMasterPe;
47  numDevices_ = numDevices;
48  deviceIndex_ = deviceIndex;
49  masterPeList_ = masterPeList;
50 
51  currentBarrierMasterPe_ = std::vector<int>(numDevices_, 0);
52  forceBarrierAll(); // Make sure all PEs have set expectedBarrierMasterPe
53 
54  masterPes_ = CProxySection_SynchronousCollectives(allPes_.ckGetGroupID(),
55  masterPeList_.data(), masterPeList_.size());
56  masterPesMulticast_ = CProxySection_SynchronousCollectives(allPes_.ckGetGroupID(),
57  masterPeList_.data(), masterPeList_.size());
58 
59  //
60  // For section broadcasts, we must use the multi-cast library; however, it requires explicitly
61  // defined messages, so the multi-cast section will not be used for the non-reduction sections
62  //
63  CProxy_CkMulticastMgr mcastProxy = CkpvAccess(BOCclass_group).multicastMgr;
64  CkMulticastMgr *mcastPtr = CProxy_CkMulticastMgr(mcastProxy).ckLocalBranch();
65  masterPesMulticast_.ckSectionDelegate(mcastPtr);
66 
67  if (isMasterDevice && isMasterPe_) {
69  setThread(CthSelf());
70  masterPesMulticast_.setupMulticastSection(msg);
71  suspendAndCheck(SynchronousCollectiveScope::single);
72  } else if (isMasterPe_) {
73  suspendAndCheck(SynchronousCollectiveScope::single);
74  }
75 
76  forceBarrierAll(); // Make sure all PEs have set expectedBarrierMasterPe
77 }
int masterPeList[MAX_NUM_DEVICES]
Definition: DeviceCUDA.C:95

◆ Object()

static SynchronousCollectives* SynchronousCollectives::Object ( void  )
inlinestatic

Definition at line 63 of file SynchronousCollectives.h.

Referenced by GlobalGPUMgr::initialize(), initializeGPUResident(), Sequencer::Sequencer(), and Node::startup().

63  {
64  return CkpvAccess(SynchronousCollectives_instance);
65  }

◆ ObjectOnPe()

static SynchronousCollectives* SynchronousCollectives::ObjectOnPe ( const int  pe)
inlinestatic

Definition at line 66 of file SynchronousCollectives.h.

66  {
67  return CkpvAccessOther(SynchronousCollectives_instance, CmiRankOf(pe));
68  }

◆ recvBarrierAll()

void SynchronousCollectives::recvBarrierAll ( const int  PE)

Definition at line 132 of file SynchronousCollectives.C.

References all.

132  {
133  incrementCount(SynchronousCollectiveScope::all, PE);
134 }

◆ recvBarrierMasterPe()

void SynchronousCollectives::recvBarrierMasterPe ( const int  deviceIndex)

Definition at line 136 of file SynchronousCollectives.C.

References master.

136  {
137  incrementCount(SynchronousCollectiveScope::master, deviceIndex);
138 }

◆ recvBroadcast()

template<typename T >
void SynchronousCollectives::recvBroadcast ( const T &  data,
const unsigned int  key 
)

Definition at line 325 of file SynchronousCollectives.C.

References single.

325  {
326  // Since we are only expecting one message, the key should not exist in the map
327  tempData_.insert({key, std::move(data)});
328  incrementCount(SynchronousCollectiveScope::single, 0);
329 }

◆ recvIndexData()

template<typename T >
void SynchronousCollectives::recvIndexData ( const int  index,
const T &  data,
const SynchronousCollectiveScope  scope,
const unsigned int  key 
)

Definition at line 235 of file SynchronousCollectives.C.

References all, and NAMD_die().

235  {
236  const int tempSize = (scope == SynchronousCollectiveScope::all) ? CkNumPes() : numDevices_;
237  auto res = tempData_.try_emplace(key, std::in_place_type<std::vector<T>>, tempSize);
238 
239  std::vector<T>& tempVec = std::any_cast<std::vector<T>&>(res.first->second);
240  if (index >= tempVec.size()) {
241  NAMD_die("SynchronousCollectives::recvIndexData: temp array not large enough");
242  }
243 
244  tempVec[index] = std::move(data);
245  incrementCount(scope, index);
246 }
void NAMD_die(const char *err_msg)
Definition: common.C:147

◆ setupMulticastSection()

void SynchronousCollectives::setupMulticastSection ( SynchronousCollectivesMulticastMsg msg)

Definition at line 219 of file SynchronousCollectives.C.

References single.

219  {
220  CkGetSectionInfo(reductionCookie_, msg);
221  delete msg;
222  incrementCount(SynchronousCollectiveScope::single, 0);
223 }

◆ wait()

void SynchronousCollectives::wait ( )

Definition at line 103 of file SynchronousCollectives.C.

Referenced by waitAndAwaken().

103  {
104  int finished = false;
105  switch (waitPhase_) {
106  case 0:
107  break;
108  case 1:
109  finished = true;
110  break;
111  }
112 
113  waitPhase_++;
114  if (!CkMyPe()) {
115  if (!finished) {
116  CkStartQD(CkCallback(CkIndex_SynchronousCollectives::wait(), thisgroup));
117  }
118  }
119 
120  if (finished) {
121  waitPhase_ = 0;
122  CthAwaken(self_awaken_thread_);
123  }
124 }

◆ waitAndAwaken()

void SynchronousCollectives::waitAndAwaken ( )

Definition at line 126 of file SynchronousCollectives.C.

References wait().

126  {
127  setThread(CthSelf());
128  wait();
129  CthSuspend();
130 }

The documentation for this class was generated from the following files: