5 #include "SynchronousCollectives.decl.h" 16 #if defined(NAMD_CUDA) || defined(NAMD_HIP) 21 #if !(defined(__NVCC__) || defined(__HIPCC__)) 24 #endif // !((__NVCC__) || (__HIPCC__)) 28 if (CkpvAccess(SynchronousCollectives_instance) == NULL) {
29 CkpvAccess(SynchronousCollectives_instance) =
this;
31 NAMD_bug(
"SynchronousCollectives instanced twice on same processor!");
38 allPes_ = CkpvAccess(BOCclass_group).synchronousCollectives;
39 currentBarrierAll_ = std::vector<int>(CkNumPes(), 0);
40 currentBarrierSingle_ = std::vector<int>(1, 0);
44 const int numDevices,
const int deviceIndex,
const std::vector<int>&
masterPeList) {
46 isMasterPe_ = isMasterPe;
47 numDevices_ = numDevices;
48 deviceIndex_ = deviceIndex;
51 currentBarrierMasterPe_ = std::vector<int>(numDevices_, 0);
54 masterPes_ = CProxySection_SynchronousCollectives(allPes_.ckGetGroupID(),
55 masterPeList_.data(), masterPeList_.size());
56 masterPesMulticast_ = CProxySection_SynchronousCollectives(allPes_.ckGetGroupID(),
57 masterPeList_.data(), masterPeList_.size());
63 CProxy_CkMulticastMgr mcastProxy = CkpvAccess(BOCclass_group).multicastMgr;
64 CkMulticastMgr *mcastPtr = CProxy_CkMulticastMgr(mcastProxy).ckLocalBranch();
65 masterPesMulticast_.ckSectionDelegate(mcastPtr);
67 if (isMasterDevice && isMasterPe_) {
70 masterPesMulticast_.setupMulticastSection(msg);
72 }
else if (isMasterPe_) {
80 auto& currentBarrier = getBarrier(scope);
81 if (currentBarrier.size() <= index) {
82 NAMD_bug(
"SynchronousCollectives currentBarrier not large enough");
84 currentBarrier[index]++;
88 auto& currentBarrier = getBarrier(scope);
93 for (
size_t i = 0; i < currentBarrier.size(); i++) {
94 done = (done && currentBarrier[i]);
98 for (
size_t i = 0; i < currentBarrier.size(); i++) {
104 int finished =
false;
105 switch (waitPhase_) {
116 CkStartQD(CkCallback(CkIndex_SynchronousCollectives::wait(), thisgroup));
122 CthAwaken(self_awaken_thread_);
127 setThread(CthSelf());
141 allPes_.recvBarrierAll(CkMyPe());
147 NAMD_bug(
"SynchronousCollectives::barrier does not support single scope");
152 if (CkNumNodes() == 1) {
154 }
else if (currentBarrierMasterPe_.size() == 0) {
164 }
else if (isMasterPe_) {
165 masterPes_.recvBarrierMasterPe(deviceIndex_);
172 allPes_.broadcastReductionResult(msg->getSize(), (
char*) msg->getData());
177 masterPes_.broadcastReductionResult(msg->getSize(), (
char*) msg->getData());
182 std::memcpy(reductionPtr_, (
void*) data, n);
190 NAMD_bug(
"SynchronousCollectives::allreduce does not support single scope");
195 reductionTemp_ = std::vector<T>(data.size());
196 reductionPtr_ = (
void*) std::any_cast<std::vector<T>&>(reductionTemp_).data();
198 setThread(CthSelf());
201 thisProxy[thisIndex]);
203 contribute(data.size() *
sizeof(T), data.data(), type, cb);
205 }
else if (isMasterPe_) {
207 thisProxy[thisIndex]);
209 CProxy_CkMulticastMgr mcastProxy = CkpvAccess(BOCclass_group).multicastMgr;
210 CkMulticastMgr *mcastPtr = CProxy_CkMulticastMgr(mcastProxy).ckLocalBranch();
211 mcastPtr->contribute(data.size() *
sizeof(T), data.data(), type, reductionCookie_, cb);
214 out = std::move(std::any_cast<std::vector<T>&>(reductionTemp_));
220 CkGetSectionInfo(reductionCookie_, msg);
228 allPes_.recvIndexData(CkMyPe(), data, scope, key);
229 }
else if (isMasterPe_) {
230 masterPes_.recvIndexData(deviceIndex_, data, scope, key);
237 auto res = tempData_.try_emplace(key, std::in_place_type<std::vector<T>>, tempSize);
239 std::vector<T>& tempVec = std::any_cast<std::vector<T>&>(res.first->second);
240 if (index >= tempVec.size()) {
241 NAMD_die(
"SynchronousCollectives::recvIndexData: temp array not large enough");
244 tempVec[index] = std::move(data);
245 incrementCount(scope, index);
249 T SynchronousCollectives::retrieveTemp(
const unsigned int key) {
250 auto outIter = tempData_.find(key);
251 if (outIter == tempData_.end()) {
252 NAMD_die(
"SynchronousCollectives::retrieveTemp: could not find data");
254 auto out = std::move(std::any_cast<T&>(outIter->second));
255 tempData_.erase(key);
262 NAMD_bug(
"SynchronousCollectives::allgather does not support single scope");
268 const unsigned int key = getKey(scope);
269 sendAllGather<T>(data, scope, key);
270 suspendAndCheck(scope);
272 out = retrieveTemp<std::vector<T>>(key);
279 void SynchronousCollectives::sendAlltoallv(
const std::vector<T>& data,
282 CProxy_SynchronousCollectives cp(thisgroup);
284 for (
size_t i = 0; i < CkNumPes(); i++) {
285 cp[i].recvIndexData<T>(CkMyPe(), data[i], scope, key);
287 }
else if (isMasterPe_) {
288 for (
size_t i = 0; i < numDevices_; i++) {
289 const int PE = masterPeList_[i];
290 cp[PE].recvIndexData<T>(deviceIndex_, data[i], scope, key);
298 NAMD_bug(
"SynchronousCollectives::alltoallv does not support single scope");
305 const unsigned int key = getKey(scope);
306 sendAlltoallv<T>(data, scope, key);
307 suspendAndCheck(scope);
308 out = retrieveTemp<std::vector<T>>(key);
318 allPes_.recvBroadcast<T>(data, key);
319 }
else if (isMasterPe_) {
320 masterPes_.recvBroadcast<T>(data, key);
327 tempData_.insert({key, std::move(data)});
334 NAMD_bug(
"SynchronousCollectives::broadcast does not support single scope");
340 const unsigned int key = getKey(scope);
342 sendBroadcast(data, scope, key);
346 out = retrieveTemp<T>(key);
352 #define INSTANTIATE_ALLGATHER(type) \ 353 template std::vector<type> \ 354 SynchronousCollectives::allGather<type>(const type&, SynchronousCollectiveScope); 358 #if defined(NAMD_CUDA) || defined(NAMD_HIP) 360 #endif // NAMD_CUDA || NAMD_HIP 364 #undef INSTANTIATE_ALLGATHER 366 #define INSTANTIATE_ALLTOALLV(type) \ 367 template std::vector<type> \ 368 SynchronousCollectives::alltoallv<type>(const std::vector<type>&, SynchronousCollectiveScope); 373 #undef INSTANTIATE_ALLTOALLV 375 #define INSTANTIATE_ALLREDUCE(type) \ 376 template std::vector<type> \ 377 SynchronousCollectives::allReduce<type>(std::vector<type>&, \ 378 CkReduction::reducerType, SynchronousCollectiveScope) 384 #undef INSTANTIATE_ALLREDUCE 388 #include "SynchronousCollectives.def.h"
void barrier(const SynchronousCollectiveScope scope)
#define NAMD_EVENT_STOP(eon, id)
std::vector< T > allGather(const T &data, const SynchronousCollectiveScope scope)
void handleReductionMaster(CkReductionMsg *msg)
std::vector< T > allReduce(std::vector< T > &data, CkReduction::reducerType type, const SynchronousCollectiveScope scope)
#define INSTANTIATE_ALLGATHER(type)
void initMasterScope(const int isMasterPe, const int isMasterDevice, const int numDevices, const int deviceIndex, const std::vector< int > &masterPeList)
int masterPeList[MAX_NUM_DEVICES]
T broadcast(const T &data, const bool isRoot, const SynchronousCollectiveScope scope)
#define NAMD_EVENT_START(eon, id)
void recvBarrierAll(const int PE)
~SynchronousCollectives()
void NAMD_bug(const char *err_msg)
void recvBarrierMasterPe(const int deviceIndex)
void broadcastReductionResult(int n, char *data)
void NAMD_die(const char *err_msg)
SynchronousCollectiveScope
void recvIndexData(const int index, const T &data, const SynchronousCollectiveScope scope, const unsigned int key)
#define INSTANTIATE_ALLREDUCE(type)
void setupMulticastSection(SynchronousCollectivesMulticastMsg *msg)
void handleReductionAll(CkReductionMsg *msg)
PUPbytes(CudaLocalRecord)
#define INSTANTIATE_ALLTOALLV(type)
std::vector< T > alltoallv(const std::vector< T > &data, const SynchronousCollectiveScope scope)
void recvBroadcast(const T &data, const unsigned int key)