32 #include "ReductionMgr.decl.h" 36 #define MIN_DEBUG_LEVEL 4 61 NAMD_bug(
"ReductionSet size specified for REDUCTIONS_BASIC or REDUCTIONS_AMD.");
65 if ( size == -1 )
NAMD_bug(
"ReductionSet size not specified.");
94 if ( (*current)->sequenceNumber == seqNum )
return *current;
95 current = &((*current)->next);
109 if ( (*current)->sequenceNumber == seqNum )
break;
110 current = &((*current)->next);
113 if ( ! *current ) {
NAMD_die(
"ReductionSet::removeData on missing seqNum"); }
116 *current = (*current)->
next;
121 const int max_intranode_children,
122 const int max_internode_children,
133 const int num_pes = CkNumPes();
134 const int num_node_pes = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(pe));
135 int *node_pes =
new int[num_node_pes];
137 const int first_pe = CmiGetFirstPeOnPhysicalNode(CmiPhysicalNodeID(pe));
139 int *node_ids =
new int[num_pes];
140 int first_pe_index = -1;
144 if (pe == 0 && first_pe != pe) {
145 NAMD_die(
"PE 0 is not the first physical node. This shouldn't happen");
150 for (i = 0; i < num_pes; i++) {
152 if (CmiGetFirstPeOnPhysicalNode(CmiPhysicalNodeID(i)) == i) {
153 node_ids[num_nodes] = i;
155 first_pe_index = num_nodes;
160 const int i1 = (i + first_pe) % num_pes;
161 if (CmiPeOnSamePhysicalNode(first_pe,i1)) {
162 if ( node_pe_count == num_node_pes )
163 NAMD_bug(
"ReductionMgr::buildSpanTree found inconsistent physical node data from Charm++ runtime");
164 node_pes[node_pe_count] = i1;
166 pe_index = node_pe_count;
170 if ( pe_index < 0 || first_pe_index < 0 )
171 NAMD_bug(
"ReductionMgr::buildSpanTree found inconsistent physical node data from Charm++ runtime");
176 int first_loc_child_index = pe_index * max_intranode_children + 1;
177 int last_loc_child_index
178 = first_loc_child_index + max_intranode_children - 1;
179 if (first_loc_child_index > num_node_pes) {
180 first_loc_child_index = num_node_pes;
181 last_loc_child_index = num_node_pes;
183 if (last_loc_child_index >= num_node_pes)
184 last_loc_child_index = num_node_pes-1;
190 int first_rem_child_index = num_nodes;
191 int last_rem_child_index = num_nodes;
193 int *rem_child_index =
new int[max_internode_children];
195 if (first_pe != pe) {
198 my_parent_index = (pe_index-1)/max_intranode_children;
199 *parent = node_pes[my_parent_index];
205 int range_end = num_nodes;
208 my_parent_index = -1;
212 while ( first_pe_index != range_begin ) {
213 my_parent_index = range_begin;
215 for (
int i = 0; i < max_internode_children; ++i ) {
216 int split = range_begin + ( range_end - range_begin ) / ( max_internode_children - i );
217 if ( first_pe_index <
split ) { range_end =
split;
break; }
218 else { range_begin =
split; }
221 *parent = node_ids[my_parent_index];
225 int prev_child_index = range_begin;
227 for (
int i = 0; i < max_internode_children; ++i ) {
228 if ( range_begin >= range_end )
break;
229 if ( range_begin > prev_child_index ) {
230 rem_child_index[rem_children++] = prev_child_index = range_begin;
232 range_begin += ( range_end - range_begin ) / ( max_internode_children - i );
240 if (first_loc_child_index != num_node_pes) {
241 loc_children = last_loc_child_index - first_loc_child_index + 1;
242 *num_children += loc_children;
249 *num_children += rem_children;
254 if (*num_children == 0)
257 *children =
new int[*num_children];
261 if (loc_children > 0) {
262 for(k=first_loc_child_index; k <= last_loc_child_index; k++) {
264 (*children)[child++]=node_pes[k];
267 if (rem_children > 0) {
268 for(k=0; k < rem_children; k++) {
270 (*children)[child++]=node_ids[rem_child_index[k]];
274 delete [] rem_child_index;
281 if (CkpvAccess(ReductionMgr_instance) == 0) {
282 CkpvAccess(ReductionMgr_instance) =
this;
284 DebugM(1,
"ReductionMgr::ReductionMgr() - another instance exists!\n");
288 &myParent,&numChildren,&children);
299 #if 0 // Old spanning tree 306 if (firstChild > CkNumPes()) firstChild = CkNumPes();
308 if (lastChild > CkNumPes()) lastChild = CkNumPes();
313 reductionSets[i] = 0;
316 DebugM(1,
"ReductionMgr() instantiated.\n");
324 delete reductionSets[i];
330 ReductionSet* ReductionMgr::getSet(
int setID,
int size) {
331 if ( reductionSets[setID] == 0 ) {
332 reductionSets[setID] =
new ReductionSet(setID,size,numChildren);
338 CProxy_ReductionMgr reductionProxy(thisgroup);
339 reductionProxy[myParent].remoteRegister(msg);
342 if ( size != -1 )
NAMD_bug(
"ReductionMgr::getSet size set");
343 }
else if ( size < 0 || reductionSets[setID]->dataSize != size ) {
344 NAMD_bug(
"ReductionMgr::getSet size mismatch");
346 return reductionSets[setID];
350 void ReductionMgr::delSet(
int setID) {
352 if (
set && ! set->submitsRegistered & ! set->requireRegistered ) {
357 CProxy_ReductionMgr reductionProxy(thisgroup);
358 reductionProxy[myParent].remoteUnregister(msg);
361 reductionSets[setID] = 0;
370 NAMD_die(
"ReductionMgr::willSubmit called while reductions outstanding!");
373 set->submitsRegistered++;
376 handle->reductionSetID = setID;
377 handle->sequenceNumber =
set->nextSequenceNumber;
378 handle->master =
this;
379 handle->data = data->
data;
386 int setID = handle->reductionSetID;
388 if ( set->getData(set->nextSequenceNumber)->submitsRecorded ) {
389 NAMD_die(
"SubmitReduction deleted while reductions outstanding!");
392 set->submitsRegistered--;
399 int setID = handle->reductionSetID;
400 int seqNum = handle->sequenceNumber;
406 mergeAndDeliver(
set,seqNum);
409 handle->sequenceNumber = ++seqNum;
410 handle->data =
set->getData(seqNum)->data;
419 if ( set->getData(set->nextSequenceNumber)->submitsRecorded ) {
420 NAMD_die(
"ReductionMgr::remoteRegister called while reductions outstanding on parent!");
423 set->submitsRegistered++;
424 set->addToRemoteSequenceNumber[childIndex(msg->
sourceNode)]
425 =
set->nextSequenceNumber;
437 if ( set->getData(set->nextSequenceNumber)->submitsRecorded ) {
438 NAMD_die(
"SubmitReduction deleted while reductions outstanding on parent!");
441 set->submitsRegistered--;
452 +
set->addToRemoteSequenceNumber[childIndex(msg->
sourceNode)];
456 if ( size != set->dataSize ) {
457 NAMD_bug(
"ReductionMgr::remoteSubmit data sizes do not match.");
464 #pragma disjoint (*curData, *newData) 468 for (
int i = 0; i < size; ++i ) {
469 if ( newData[i] > curData[i] ) {
470 curData[i] = newData[i];
474 for (
int i = 0; i < size; ++i ) {
475 curData[i] += newData[i];
484 mergeAndDeliver(
set,seqNum);
489 void ReductionMgr::mergeAndDeliver(
ReductionSet *
set,
int seqNum) {
493 set->nextSequenceNumber++;
497 NAMD_bug(
"ReductionMgr::mergeAndDeliver not ready to deliver.");
501 if ( set->requireRegistered ) {
502 if ( set->threadIsWaiting && set->waitingForSequenceNumber == seqNum) {
504 CthAwaken(set->waitingThread);
507 NAMD_die(
"ReductionSet::deliver will never deliver data");
516 for (
int i = 0; i < msg->
dataSize; ++i ) {
519 CProxy_ReductionMgr reductionProxy(thisgroup);
520 reductionProxy[myParent].remoteSubmit(msg);
521 delete set->removeData(seqNum);
529 set->requireRegistered++;
530 if ( set->getData(set->nextSequenceNumber)->submitsRecorded ) {
531 NAMD_die(
"ReductionMgr::willRequire called while reductions outstanding!");
535 handle->reductionSetID = setID;
536 handle->sequenceNumber =
set->nextSequenceNumber;
537 handle->master =
this;
544 int setID = handle->reductionSetID;
546 if ( set->getData(set->nextSequenceNumber)->submitsRecorded ) {
547 NAMD_die(
"RequireReduction deleted while reductions outstanding!");
550 set->requireRegistered--;
557 int setID = handle->reductionSetID;
559 int seqNum = handle->sequenceNumber;
562 set->threadIsWaiting = 1;
563 set->waitingForSequenceNumber = seqNum;
564 set->waitingThread = CthSelf();
568 set->threadIsWaiting = 0;
571 delete handle->currentData;
572 handle->currentData =
set->removeData(seqNum);
573 handle->data = handle->currentData->
data;
574 handle->sequenceNumber = ++seqNum;
613 ReductionValue::operator int(){
614 return (
int)reducedValue;
617 ReductionValue::operator double(){
624 this->
set[i].reducedValue = 0.0;
642 this->
set[i].reducedValue = 0.0;
659 #include "ReductionMgr.def.h"
void buildSpanTree(const int pe, const int max_intranode_children, const int max_internode_children, int *parent, int *num_children, int **children)
ReductionValue & operator[](int index)
int * addToRemoteSequenceNumber
ReductionSetData * getData(int seqNum)
ReductionSet(int setID, int size, int numChildren)
void remoteRegister(ReductionRegisterMsg *msg)
SubmitReduction * willSubmit(int setID, int size=-1)
friend class SubmitReduction
double operator+=(double other)
#define REDUCTION_MAX_CHILDREN
static Units next(Units u)
void NAMD_bug(const char *err_msg)
ReductionValue & item(int index)
ReductionSetData * removeData(int seqNum)
void NAMD_die(const char *err_msg)
std::vector< std::string > split(const std::string &text, std::string delimiter)
void remoteSubmit(ReductionSubmitMsg *msg)
void setVal(const NodeReduction *other)
RequireReduction * willRequire(int setID, int size=-1)
friend class RequireReduction
void remoteUnregister(ReductionRegisterMsg *msg)
ReductionSetData * dataQueue