ReductionMgr.C

Go to the documentation of this file.
00001 
00007 /*
00008    The order of execution is expected to be:
00009             0. instantiate object
00010    ------------------ (processing barrier)
00011    (mode 0) 1. register() and subscribe()
00012    ------------------
00013    (mode 1) 2. submit() and request()
00014    ------------------
00015    (mode 2) 3. unregister() and unsubscribe()
00016    ------------------ (processing barrier)
00017             4. destroy object
00018    Doing this out-of-order will cause errors.
00019 
00020    Assumes that *only* one thread will require() a specific sequence's data.
00021 */
00022 
00023 #include <stdlib.h>
00024 #include <stdio.h>
00025 
00026 #include "InfoStream.h"
00027 #include "PatchMap.h"   // for patchMap
00028 
00029 #include "Node.h"
00030 #include "SimParameters.h"
00031 
00032 #include "ReductionMgr.decl.h"
00033 #include "ReductionMgr.h"
00034 
00035 // #define DEBUGM
00036 #define MIN_DEBUG_LEVEL 4
00037 #include "Debug.h"
00038 
00039 // Used to register and unregister reductions to downstream nodes
00040 class ReductionRegisterMsg : public CMessage_ReductionRegisterMsg {
00041 public:
00042   int reductionSetID;
00043   int dataSize;
00044   int sourceNode;
00045 };
00046 
00047 // Used to send reduction data to downstream nodes
00048 class ReductionSubmitMsg : public CMessage_ReductionSubmitMsg {
00049 public:
00050   int reductionSetID;
00051   int sourceNode;
00052   int sequenceNumber;
00053   int dataSize;
00054   BigReal *data;
00055 };
00056 
00057 ReductionSet::ReductionSet(int setID, int size, int numChildren) {
00058   if ( setID == REDUCTIONS_BASIC || setID == REDUCTIONS_AMD ) {
00059     if ( size != -1 ) {
00060       NAMD_bug("ReductionSet size specified for REDUCTIONS_BASIC or REDUCTIONS_AMD.");
00061     }
00062     size = REDUCTION_MAX_RESERVED;
00063   }
00064   if ( size == -1 ) NAMD_bug("ReductionSet size not specified.");
00065   dataSize = size;
00066   reductionSetID = setID;
00067   nextSequenceNumber = 0;
00068   submitsRegistered = 0;
00069   dataQueue = 0;
00070   requireRegistered = 0;
00071   threadIsWaiting = 0;
00072   addToRemoteSequenceNumber = new int[numChildren];
00073 }
00074 
00075 ReductionSet::~ReductionSet() {
00076 
00077   ReductionSetData *current = dataQueue;
00078 
00079   while ( current ) {
00080     ReductionSetData *next = current->next;
00081     delete current;
00082     current = next;
00083   }
00084   delete [] addToRemoteSequenceNumber;
00085 }
00086 
00087 // possibly create and return data for a particular seqNum
00088 ReductionSetData* ReductionSet::getData(int seqNum) {
00089 
00090   ReductionSetData **current = &dataQueue;
00091 
00092   while ( *current ) {
00093     if ( (*current)->sequenceNumber == seqNum ) return *current;
00094     current = &((*current)->next);
00095   }
00096 
00097 //iout << "seq " << seqNum << " created on " << CkMyPe() << "\n" << endi;
00098   *current = new ReductionSetData(seqNum, dataSize);
00099   return *current;
00100 }
00101 
00102 // possibly delete data for a particular seqNum
00103 ReductionSetData* ReductionSet::removeData(int seqNum) {
00104 
00105   ReductionSetData **current = &dataQueue;
00106 
00107   while ( *current ) {
00108     if ( (*current)->sequenceNumber == seqNum ) break;
00109     current = &((*current)->next);
00110   }
00111 
00112   if ( ! *current ) { NAMD_die("ReductionSet::removeData on missing seqNum"); }
00113 
00114   ReductionSetData *toremove = *current;
00115   *current = (*current)->next;
00116   return toremove;
00117 }
00118 
00119 void ReductionMgr::buildSpanTree(const int pe, 
00120                                  const int max_intranode_children,
00121                                  const int max_internode_children,
00122                                  int* parent, 
00123                                  int* num_children, 
00124                                  int** children)
00125 {
00126   // If pe is a first-node, children are same-node pes and perhaps some
00127   // other first-nodes, and parents are other first-nodes. If pe is not a 
00128   // first-node, build the spanning tree among the children, and the parent
00129   // is the corresponding first-node
00130 
00131   // No matter what, build list of PEs on my node first
00132   const int num_pes = CkNumPes();
00133   const int num_node_pes = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(pe)); 
00134   int *node_pes = new int[num_node_pes];
00135   int pe_index = -1;
00136   const int first_pe = CmiGetFirstPeOnPhysicalNode(CmiPhysicalNodeID(pe));
00137   int num_nodes = 0;
00138   int *node_ids = new int[num_pes];
00139   int first_pe_index = -1;
00140   int my_parent_index;
00141   
00142   // Make sure PE 0 is a first-node
00143   if (pe == 0 && first_pe != pe) {
00144     NAMD_die("PE 0 is not the first physical node. This shouldn't happen");
00145   }
00146   // Get all the PEs on my node, and also build the list of all first-nodes
00147   int i;
00148   int node_pe_count=0;
00149   for (i = 0; i < num_pes; i++) {
00150     // Save first-nodes
00151     if (CmiGetFirstPeOnPhysicalNode(CmiPhysicalNodeID(i)) == i) {
00152       node_ids[num_nodes] = i;
00153       if (i == first_pe)
00154         first_pe_index = num_nodes;
00155       num_nodes++;
00156     }
00157 
00158     // Also, find pes on my node
00159     const int i1 = (i + first_pe) % num_pes;
00160     if (CmiPeOnSamePhysicalNode(first_pe,i1)) {
00161       if ( node_pe_count == num_node_pes )
00162         NAMD_bug("ReductionMgr::buildSpanTree found inconsistent physical node data from Charm++ runtime");
00163       node_pes[node_pe_count] = i1;
00164       if (pe == i1)
00165         pe_index = node_pe_count;
00166       node_pe_count++;
00167     }
00168   }
00169   if ( pe_index < 0 || first_pe_index < 0 )
00170     NAMD_bug("ReductionMgr::buildSpanTree found inconsistent physical node data from Charm++ runtime");
00171   
00172   // Any PE might have children on the same node, plus, if its a first-node,
00173   // it may have several children on other nodes
00174 
00175   int first_loc_child_index = pe_index * max_intranode_children + 1;
00176   int last_loc_child_index 
00177     = first_loc_child_index + max_intranode_children - 1;
00178   if (first_loc_child_index > num_node_pes) {
00179     first_loc_child_index = num_node_pes;
00180     last_loc_child_index = num_node_pes;
00181   } else {
00182     if (last_loc_child_index >= num_node_pes) 
00183       last_loc_child_index = num_node_pes-1;
00184   }
00185 //  CkPrintf("Local [%d] firstpe %d max %d num %d firstloc %d lastloc %d\n",
00186 //           pe,pe_index,max_intranode_children,num_node_pes,
00187 //           first_loc_child_index,last_loc_child_index);
00188   
00189   int first_rem_child_index = num_nodes;
00190   int last_rem_child_index = num_nodes;
00191   int rem_children=0;
00192   int *rem_child_index = new int[max_internode_children];
00193   
00194   if (first_pe != pe) {
00195     // I'm not a first_pe, so I have no more children, and my parent
00196     // is someone else on my node
00197     my_parent_index = (pe_index-1)/max_intranode_children;
00198     *parent = node_pes[my_parent_index];
00199   } else {
00200     // I am a first_pe, so I may have additional children
00201     // on other nodes, and my parent will be on another node
00202 
00203     int range_begin = 0;
00204     int range_end = num_nodes;
00205 
00206     if (pe == 0) {
00207       my_parent_index = -1;
00208       *parent = -1;
00209     } else {
00210       my_parent_index = 0;
00211       while ( first_pe_index != range_begin ) {
00212         my_parent_index = range_begin;
00213         ++range_begin;
00214         for ( int i = 0; i < max_internode_children; ++i ) {
00215           int split = range_begin + ( range_end - range_begin ) / ( max_internode_children - i );
00216           if ( first_pe_index < split ) { range_end = split; break; } 
00217           else { range_begin = split; }
00218         }
00219       }
00220       *parent = node_ids[my_parent_index];
00221     }
00222 
00223     // now we know parent and need only repeat calculation of children
00224     int prev_child_index = range_begin;
00225     ++range_begin;
00226     for ( int i = 0; i < max_internode_children; ++i ) {
00227       if ( range_begin >= range_end ) break;
00228       if ( range_begin > prev_child_index ) {
00229         rem_child_index[rem_children++] = prev_child_index = range_begin;
00230       }
00231       range_begin += ( range_end - range_begin ) / ( max_internode_children - i );
00232     }
00233   }
00234 
00235   *num_children = 0;
00236   //CkPrintf("TREE pe %d my_parent %d %d\n",pe,my_parent_index,*parent);
00237 
00238   int loc_children=0;
00239   if (first_loc_child_index != num_node_pes) {
00240     loc_children = last_loc_child_index - first_loc_child_index + 1;
00241     *num_children += loc_children;
00242 //    CkPrintf("TREE pe %d %d local children\n",pe,loc_children);
00243 //  } else {
00244 //    CkPrintf("TREE pe %d No local children\n",pe);
00245   }
00246 
00247   if (rem_children) {
00248     *num_children += rem_children;
00249 //    CkPrintf("TREE pe %d %d rem children\n",pe,rem_children);
00250 //  } else {
00251 //    CkPrintf("TREE pe %d No rem children\n",pe);
00252   }
00253   if (*num_children == 0)
00254     *children = 0;
00255   else {
00256     *children = new int[*num_children];
00257 //    CkPrintf("TREE pe %d children %d\n",pe,*num_children);
00258     int k;
00259     int child=0;
00260     if (loc_children > 0) {
00261       for(k=first_loc_child_index; k <= last_loc_child_index; k++) {
00262 //        CkPrintf("TREE pe %d loc child[%d,%d] %d\n",pe,child,k,node_pes[k]);
00263         (*children)[child++]=node_pes[k];
00264       }
00265     }
00266     if (rem_children > 0) {
00267       for(k=0; k < rem_children; k++)  {
00268 //        CkPrintf("TREE pe %d rem child[%d,%d] %d\n",pe,child,k,node_ids[rem_child_index[k]]);
00269         (*children)[child++]=node_ids[rem_child_index[k]];
00270       }
00271     }
00272   }
00273   delete [] rem_child_index;
00274   delete [] node_ids;
00275   delete [] node_pes;
00276 }
00277 
00278 // constructor
00279 ReductionMgr::ReductionMgr() {
00280     if (CkpvAccess(ReductionMgr_instance) == 0) {
00281       CkpvAccess(ReductionMgr_instance) = this;
00282     } else {
00283       DebugM(1, "ReductionMgr::ReductionMgr() - another instance exists!\n");
00284     }
00285     
00286     buildSpanTree(CkMyPe(),REDUCTION_MAX_CHILDREN,REDUCTION_MAX_CHILDREN,
00287                   &myParent,&numChildren,&children);
00288     
00289 //    CkPrintf("TREE [%d] parent %d %d children\n",
00290 //      CkMyPe(),myParent,numChildren);
00291 //    if (numChildren > 0) {
00292 //      for(int i=0; i < numChildren; i++)  {
00293 //        CkPrintf("TREE [%d] child %d %d\n",CkMyPe(),i,children[i]);
00294 //      }
00295 //    }
00296     
00297     // fill in the spanning tree fields
00298 #if 0  // Old spanning tree
00299     if (CkMyPe() == 0) {
00300       myParent = -1;
00301     } else {
00302       myParent = (CkMyPe()-1)/REDUCTION_MAX_CHILDREN;
00303     }
00304     firstChild = CkMyPe()*REDUCTION_MAX_CHILDREN + 1;
00305     if (firstChild > CkNumPes()) firstChild = CkNumPes();
00306     lastChild = firstChild + REDUCTION_MAX_CHILDREN;
00307     if (lastChild > CkNumPes()) lastChild = CkNumPes();
00308 #endif
00309 
00310     // initialize data
00311     for(int i=0; i<REDUCTION_MAX_SET_ID; i++) {
00312       reductionSets[i] = 0;
00313     }
00314 
00315     DebugM(1,"ReductionMgr() instantiated.\n");
00316 }
00317 
00318 // destructor
00319 ReductionMgr::~ReductionMgr() {
00320     if (children != 0)
00321       delete [] children;
00322     for(int i=0; i<REDUCTION_MAX_SET_ID; i++) {
00323       delete reductionSets[i];
00324     }
00325 
00326 }
00327 
00328 // possibly create and return reduction set
00329 ReductionSet* ReductionMgr::getSet(int setID, int size) {
00330   if ( reductionSets[setID] == 0 ) {
00331     reductionSets[setID] = new ReductionSet(setID,size,numChildren);
00332     if ( ! isRoot() ) {
00333       ReductionRegisterMsg *msg = new ReductionRegisterMsg;
00334       msg->reductionSetID = setID;
00335       msg->dataSize = size;
00336       msg->sourceNode = CkMyPe();
00337       CProxy_ReductionMgr reductionProxy(thisgroup);
00338       reductionProxy[myParent].remoteRegister(msg);
00339     }
00340   } else if ( setID == REDUCTIONS_BASIC || setID == REDUCTIONS_AMD ) {
00341     if ( size != -1 ) NAMD_bug("ReductionMgr::getSet size set");
00342   } else if ( size < 0 || reductionSets[setID]->dataSize != size ) {
00343     NAMD_bug("ReductionMgr::getSet size mismatch");
00344   }
00345   return reductionSets[setID];
00346 }
00347 
00348 // possibly delete reduction set
00349 void ReductionMgr::delSet(int setID) {
00350   ReductionSet *set = reductionSets[setID];
00351   if ( set && ! set->submitsRegistered & ! set->requireRegistered ) {
00352     if ( ! isRoot() ) {
00353       ReductionRegisterMsg *msg = new ReductionRegisterMsg;
00354       msg->reductionSetID = setID;
00355       msg->sourceNode = CkMyPe();
00356       CProxy_ReductionMgr reductionProxy(thisgroup);
00357       reductionProxy[myParent].remoteUnregister(msg);
00358     }
00359     delete set;
00360     reductionSets[setID] = 0;
00361   }
00362 }
00363 
00364 // register local submit
00365 SubmitReduction* ReductionMgr::willSubmit(int setID, int size) {
00366   ReductionSet *set = getSet(setID, size);
00367   ReductionSetData *data = set->getData(set->nextSequenceNumber);
00368   if ( data->submitsRecorded ) {
00369     NAMD_die("ReductionMgr::willSubmit called while reductions outstanding!");
00370   }
00371 
00372   set->submitsRegistered++;
00373 
00374   SubmitReduction *handle = new SubmitReduction;
00375   handle->reductionSetID = setID;
00376   handle->sequenceNumber = set->nextSequenceNumber;
00377   handle->master = this;
00378   handle->data = data->data;
00379 
00380   return handle;
00381 }
00382 
00383 // unregister local submit
00384 void ReductionMgr::remove(SubmitReduction* handle) {
00385   int setID = handle->reductionSetID;
00386   ReductionSet *set = reductionSets[setID];
00387   if ( set->getData(set->nextSequenceNumber)->submitsRecorded ) {
00388     NAMD_die("SubmitReduction deleted while reductions outstanding!");
00389   }
00390 
00391   set->submitsRegistered--;
00392 
00393   delSet(setID);
00394 }
00395 
00396 // local submit
00397 void ReductionMgr::submit(SubmitReduction* handle) {
00398   int setID = handle->reductionSetID;
00399   int seqNum = handle->sequenceNumber;
00400   ReductionSet *set = reductionSets[setID];
00401   ReductionSetData *data = set->getData(seqNum);
00402 
00403   data->submitsRecorded++;
00404   if ( data->submitsRecorded == set->submitsRegistered ) {
00405     mergeAndDeliver(set,seqNum);
00406   }
00407 
00408   handle->sequenceNumber = ++seqNum;
00409   handle->data = set->getData(seqNum)->data;
00410 }
00411 
00412 // register submit from child
00413 void ReductionMgr::remoteRegister(ReductionRegisterMsg *msg) {
00414 
00415   int setID = msg->reductionSetID;
00416   int size = msg->dataSize;
00417   ReductionSet *set = getSet(setID,size);
00418   if ( set->getData(set->nextSequenceNumber)->submitsRecorded ) {
00419     NAMD_die("ReductionMgr::remoteRegister called while reductions outstanding on parent!");
00420   }
00421 
00422   set->submitsRegistered++;
00423   set->addToRemoteSequenceNumber[childIndex(msg->sourceNode)]
00424                                         = set->nextSequenceNumber;
00425 //  CkPrintf("[%d] reduction register received from node[%d] %d\n",
00426 //    CkMyPe(),childIndex(msg->sourceNode),msg->sourceNode);
00427     
00428   delete msg;
00429 }
00430 
00431 // unregister submit from child
00432 void ReductionMgr::remoteUnregister(ReductionRegisterMsg *msg) {
00433 
00434   int setID = msg->reductionSetID;
00435   ReductionSet *set = reductionSets[setID];
00436   if ( set->getData(set->nextSequenceNumber)->submitsRecorded ) {
00437     NAMD_die("SubmitReduction deleted while reductions outstanding on parent!");
00438   }
00439 
00440   set->submitsRegistered--;
00441 
00442   delSet(setID);
00443   delete msg;
00444 }
00445 
00446 // data submitted from child
00447 void ReductionMgr::remoteSubmit(ReductionSubmitMsg *msg) {
00448   int setID = msg->reductionSetID;
00449   ReductionSet *set = reductionSets[setID];
00450   int seqNum = msg->sequenceNumber
00451         + set->addToRemoteSequenceNumber[childIndex(msg->sourceNode)];
00452 
00453 //iout << "seq " << seqNum << " from " << msg->sourceNode << " received on " << CkMyPe() << "\n" << endi;
00454   int size = msg->dataSize;
00455   if ( size != set->dataSize ) {
00456     NAMD_bug("ReductionMgr::remoteSubmit data sizes do not match.");
00457   }
00458 
00459   BigReal *newData = msg->data;
00460   ReductionSetData *data = set->getData(seqNum);
00461   BigReal *curData = data->data;
00462 #ifdef ARCH_POWERPC
00463 #pragma disjoint (*curData,  *newData)
00464 #pragma unroll(4)
00465 #endif
00466   if ( setID == REDUCTIONS_MINIMIZER ) {
00467     for ( int i = 0; i < size; ++i ) {
00468       if ( newData[i] > curData[i] ) {
00469         curData[i] = newData[i];
00470       }
00471     }
00472   } else {
00473     for ( int i = 0; i < size; ++i ) {
00474       curData[i] += newData[i];
00475     }
00476   }
00477 //  CkPrintf("[%d] reduction Submit received from node[%d] %d\n",
00478 //    CkMyPe(),childIndex(msg->sourceNode),msg->sourceNode);
00479   delete msg;
00480 
00481   data->submitsRecorded++;
00482   if ( data->submitsRecorded == set->submitsRegistered ) {
00483     mergeAndDeliver(set,seqNum);
00484   }
00485 }
00486 
00487 // common code for submission and delivery
00488 void ReductionMgr::mergeAndDeliver(ReductionSet *set, int seqNum) {
00489 
00490 //iout << "seq " << seqNum << " complete on " << CkMyPe() << "\n" << endi;
00491  
00492     set->nextSequenceNumber++; // should match all clients
00493 
00494     ReductionSetData *data = set->getData(seqNum);
00495     if ( data->submitsRecorded != set->submitsRegistered ) {
00496       NAMD_bug("ReductionMgr::mergeAndDeliver not ready to deliver.");
00497     }
00498 
00499     if ( isRoot() ) {
00500       if ( set->requireRegistered ) {
00501         if ( set->threadIsWaiting && set->waitingForSequenceNumber == seqNum) {
00502           // awaken the thread so it can take the data
00503           CthAwaken(set->waitingThread);
00504         }
00505       } else {
00506         NAMD_die("ReductionSet::deliver will never deliver data");
00507       }
00508     } else {
00509       // send data to parent
00510       ReductionSubmitMsg *msg = new(set->dataSize) ReductionSubmitMsg;
00511       msg->reductionSetID = set->reductionSetID;
00512       msg->sourceNode = CkMyPe();
00513       msg->sequenceNumber = seqNum;
00514       msg->dataSize = set->dataSize;
00515       for ( int i = 0; i < msg->dataSize; ++i ) {
00516         msg->data[i] = data->data[i];
00517       }
00518       CProxy_ReductionMgr reductionProxy(thisgroup);
00519       reductionProxy[myParent].remoteSubmit(msg);
00520       delete set->removeData(seqNum);
00521     }
00522 
00523 }
00524 
00525 // register require
00526 RequireReduction* ReductionMgr::willRequire(int setID, int size) {
00527   ReductionSet *set = getSet(setID,size);
00528   set->requireRegistered++;
00529   if ( set->getData(set->nextSequenceNumber)->submitsRecorded ) {
00530     NAMD_die("ReductionMgr::willRequire called while reductions outstanding!");
00531   }
00532 
00533   RequireReduction *handle = new RequireReduction;
00534   handle->reductionSetID = setID;
00535   handle->sequenceNumber = set->nextSequenceNumber;
00536   handle->master = this;
00537 
00538   return handle;
00539 }
00540 
00541 // unregister require
00542 void ReductionMgr::remove(RequireReduction* handle) {
00543   int setID = handle->reductionSetID;
00544   ReductionSet *set = reductionSets[setID];
00545   if ( set->getData(set->nextSequenceNumber)->submitsRecorded ) {
00546     NAMD_die("RequireReduction deleted while reductions outstanding!");
00547   }
00548 
00549   set->requireRegistered--;
00550 
00551   delSet(setID);
00552 }
00553 
00554 // require the data from a thread
00555 void ReductionMgr::require(RequireReduction* handle) {
00556   int setID = handle->reductionSetID;
00557   ReductionSet *set = reductionSets[setID];
00558   int seqNum = handle->sequenceNumber;
00559   ReductionSetData *data = set->getData(seqNum);
00560   if ( data->submitsRecorded < set->submitsRegistered ) {
00561     set->threadIsWaiting = 1;
00562     set->waitingForSequenceNumber = seqNum;
00563     set->waitingThread = CthSelf();
00564 //iout << "seq " << seqNum << " waiting\n" << endi;
00565     CthSuspend();
00566   }
00567   set->threadIsWaiting = 0;
00568 
00569 //iout << "seq " << seqNum << " consumed\n" << endi;
00570   delete handle->currentData;
00571   handle->currentData = set->removeData(seqNum);
00572   handle->data = handle->currentData->data;
00573   handle->sequenceNumber = ++seqNum;
00574 }
00575 
00576 
00577 #include "ReductionMgr.def.h"
00578 // nothing should be placed below here
00579 

Generated on Sat Sep 23 01:17:15 2017 for NAMD by  doxygen 1.4.7