Main Page | Namespace List | Class Hierarchy | Alphabetical List | Class List | File List | Class Members | File Members

ReductionMgr.C

Go to the documentation of this file.
00001 
00007 /*
00008    The order of execution is expected to be:
00009             0. instantiate object
00010    ------------------ (processing barrier)
00011    (mode 0) 1. register() and subscribe()
00012    ------------------
00013    (mode 1) 2. submit() and request()
00014    ------------------
00015    (mode 2) 3. unregister() and unsubscribe()
00016    ------------------ (processing barrier)
00017             4. destroy object
00018    Doing this out-of-order will cause errors.
00019 
00020    Assumes that *only* one thread will require() a specific sequence's data.
00021 */
00022 
00023 #include <stdlib.h>
00024 #include <stdio.h>
00025 
00026 #include "InfoStream.h"
00027 #include "PatchMap.h"   // for patchMap
00028 
00029 #include "Node.h"
00030 #include "SimParameters.h"
00031 
00032 #include "ReductionMgr.decl.h"
00033 #include "ReductionMgr.h"
00034 
00035 // #define DEBUGM
00036 #define MIN_DEBUG_LEVEL 4
00037 #include "Debug.h"
00038 
00039 // Used to register and unregister reductions to downstream nodes
00040 class ReductionRegisterMsg : public CMessage_ReductionRegisterMsg {
00041 public:
00042   int reductionSetID;
00043   int dataSize;
00044   int sourceNode;
00045 };
00046 
00047 // Used to send reduction data to downstream nodes
00048 class ReductionSubmitMsg : public CMessage_ReductionSubmitMsg {
00049 public:
00050   int reductionSetID;
00051   int sourceNode;
00052   int sequenceNumber;
00053   int dataSize;
00054   BigReal data[1];
00055 
00056   static void *alloc(int msgnum, int size, int *array, int priobits) {
00057     int totalsize = size + array[0]*sizeof(BigReal);
00058     ReductionSubmitMsg *newMsg = (ReductionSubmitMsg*)
00059                                 CkAllocMsg(msgnum,totalsize,priobits);
00060     // newMsg->data = (BigReal*) ((char*)newMsg + size);
00061     return (void*)newMsg;
00062   }
00063 
00064   static void *pack(ReductionSubmitMsg *in) {
00065     // in->data = (BigReal*) ((char*)in->data - (char*)&(in->data));
00066     return (void*)in;
00067   }
00068 
00069   static ReductionSubmitMsg *unpack(void *in) {
00070     ReductionSubmitMsg *me = (ReductionSubmitMsg*)in;
00071     // me->data = (BigReal*) ((char*)&(me->data) + (size_t)(me->data));
00072     return me;
00073   }
00074 
00075 };
00076 
00077 ReductionSet::ReductionSet(int setID, int size, int numChildren) {
00078   if ( setID == REDUCTIONS_BASIC ) {
00079     if ( size != -1 ) {
00080       NAMD_bug("ReductionSet size specified for REDUCTIONS_BASIC.");
00081     }
00082     size = REDUCTION_MAX_RESERVED;
00083   }
00084   if ( size == -1 ) NAMD_bug("ReductionSet size not specified.");
00085   dataSize = size;
00086   reductionSetID = setID;
00087   nextSequenceNumber = 0;
00088   submitsRegistered = 0;
00089   dataQueue = 0;
00090   requireRegistered = 0;
00091   threadIsWaiting = 0;
00092   addToRemoteSequenceNumber = new int[numChildren];
00093 }
00094 
00095 ReductionSet::~ReductionSet() {
00096 
00097   ReductionSetData *current = dataQueue;
00098 
00099   while ( current ) {
00100     ReductionSetData *next = current->next;
00101     delete current;
00102     current = next;
00103   }
00104   delete [] addToRemoteSequenceNumber;
00105 }
00106 
00107 // possibly create and return data for a particular seqNum
00108 ReductionSetData* ReductionSet::getData(int seqNum) {
00109 
00110   ReductionSetData **current = &dataQueue;
00111 
00112   while ( *current ) {
00113     if ( (*current)->sequenceNumber == seqNum ) return *current;
00114     current = &((*current)->next);
00115   }
00116 
00117 //iout << "seq " << seqNum << " created on " << CkMyPe() << "\n" << endi;
00118   *current = new ReductionSetData(seqNum, dataSize);
00119   return *current;
00120 }
00121 
00122 // possibly delete data for a particular seqNum
00123 ReductionSetData* ReductionSet::removeData(int seqNum) {
00124 
00125   ReductionSetData **current = &dataQueue;
00126 
00127   while ( *current ) {
00128     if ( (*current)->sequenceNumber == seqNum ) break;
00129     current = &((*current)->next);
00130   }
00131 
00132   if ( ! *current ) { NAMD_die("ReductionSet::removeData on missing seqNum"); }
00133 
00134   ReductionSetData *toremove = *current;
00135   *current = (*current)->next;
00136   return toremove;
00137 }
00138 
00139 void ReductionMgr::buildSpanTree(const int pe, 
00140                                  const int max_intranode_children,
00141                                  const int max_internode_children,
00142                                  int* parent, 
00143                                  int* num_children, 
00144                                  int** children)
00145 {
00146   // If pe is a first-node, children are same-node pes and perhaps some
00147   // other first-nodes, and parents are other first-nodes. If pe is not a 
00148   // first-node, build the spanning tree among the children, and the parent
00149   // is the corresponding first-node
00150 
00151 #if CHARM_VERSION < 60103
00152 #define CmiPhysicalNodeID(X)      X
00153 #define CmiNumPesOnPhysicalNode(X) 1
00154 #define CmiGetFirstPeOnPhysicalNode(X) (X)
00155 #define CmiPeOnSamePhysicalNode(X,Y) ((X)==(Y))
00156 #endif
00157   // No matter what, build list of PEs on my node first
00158   const int num_pes = CkNumPes();
00159   const int num_node_pes = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(pe)); 
00160   int *node_pes = new int[num_node_pes];
00161   int pe_index;
00162   const int first_pe = CmiGetFirstPeOnPhysicalNode(CmiPhysicalNodeID(pe));
00163   int num_nodes = 0;
00164   int *node_ids = new int[num_pes];
00165   int first_pe_index;
00166   int my_parent_index;
00167   
00168   // Make sure PE 0 is a first-node
00169   if (pe == 0 && first_pe != pe) {
00170     NAMD_die("PE 0 is not the first physical node. This shouldn't happen");
00171   }
00172   // Get all the PEs on my node, and also build the list of all first-nodes
00173   int i;
00174   int node_pe_count=0;
00175   for (i = 0; i < num_pes; i++) {
00176     // Save first-nodes
00177     if (CmiGetFirstPeOnPhysicalNode(CmiPhysicalNodeID(i)) == i) {
00178       node_ids[num_nodes] = i;
00179       if (i == first_pe)
00180         first_pe_index = num_nodes;
00181       num_nodes++;
00182     }
00183 
00184     // Also, find pes on my node
00185     const int i1 = (i + first_pe) % num_pes;
00186     if (CmiPeOnSamePhysicalNode(first_pe,i1)) {
00187       node_pes[node_pe_count] = i1;
00188       if (pe == i1)
00189         pe_index = node_pe_count;
00190       node_pe_count++;
00191     }
00192   }
00193   
00194   // Any PE might have children on the same node, plus, if its a first-node,
00195   // it may have several children on other nodes
00196 
00197   int first_loc_child_index = pe_index * max_intranode_children + 1;
00198   int last_loc_child_index 
00199     = first_loc_child_index + max_intranode_children - 1;
00200   if (first_loc_child_index > num_node_pes) {
00201     first_loc_child_index = num_node_pes;
00202     last_loc_child_index = num_node_pes;
00203   } else {
00204     if (last_loc_child_index >= num_node_pes) 
00205       last_loc_child_index = num_node_pes-1;
00206   }
00207 //  CkPrintf("Local [%d] firstpe %d max %d num %d firstloc %d lastloc %d\n",
00208 //           pe,pe_index,max_intranode_children,num_node_pes,
00209 //           first_loc_child_index,last_loc_child_index);
00210   
00211   int first_rem_child_index = num_nodes;
00212   int last_rem_child_index = num_nodes;
00213   
00214   if (first_pe != pe) {
00215     // I'm not a first_pe, so I have no more children, and my parent
00216     // is someone else on my node
00217     my_parent_index = (pe_index-1)/max_intranode_children;
00218     *parent = node_pes[my_parent_index];
00219   } else {
00220     // I am a first_pe, so I may have additional children
00221     // on other nodes, and my parent will be on another node
00222 
00223     if (pe == 0) {
00224       my_parent_index = -1;
00225       *parent = -1;
00226     } else {
00227       my_parent_index = (first_pe_index-1)/max_internode_children;
00228       *parent = node_ids[my_parent_index];
00229 //      CkPrintf("[%d] my_parent_index=%d parent=%d\n",
00230 //               pe,my_parent_index,*parent);
00231     }
00232     first_rem_child_index = first_pe_index * max_internode_children + 1;
00233     last_rem_child_index  = first_rem_child_index + max_internode_children -1;
00234     if (first_rem_child_index > num_nodes) {
00235       first_rem_child_index = num_nodes;
00236       last_rem_child_index = num_nodes;
00237     } else {
00238       if (last_rem_child_index >= num_nodes)
00239         last_rem_child_index = num_nodes-1;
00240     }
00241 //    CkPrintf("Remote [%d] firstpe %d max %d num %d firstrem %d lastrem %d\n",
00242 //             pe,first_pe_index,max_internode_children,num_nodes,
00243 //             first_rem_child_index,last_rem_child_index);
00244   }
00245 
00246   *num_children = 0;
00247   //CkPrintf("TREE pe %d my_parent %d %d\n",pe,my_parent_index,*parent);
00248 
00249   int loc_children=0;
00250   if (first_loc_child_index != num_node_pes) {
00251     loc_children = last_loc_child_index - first_loc_child_index + 1;
00252     *num_children += loc_children;
00253 //    CkPrintf("TREE pe %d %d local children\n",pe,loc_children);
00254 //  } else {
00255 //    CkPrintf("TREE pe %d No local children\n",pe);
00256   }
00257 
00258   int rem_children=0;
00259   if (first_rem_child_index != num_nodes) {
00260     rem_children = last_rem_child_index - first_rem_child_index + 1;
00261     *num_children += rem_children;
00262 //    CkPrintf("TREE pe %d %d rem children\n",pe,rem_children);
00263 //  } else {
00264 //    CkPrintf("TREE pe %d No rem children\n",pe);
00265   }
00266   if (*num_children == 0)
00267     *children = 0;
00268   else {
00269     *children = new int[*num_children];
00270 //    CkPrintf("TREE pe %d children %d\n",pe,*num_children);
00271     int k;
00272     int child=0;
00273     if (loc_children > 0) {
00274       for(k=first_loc_child_index; k <= last_loc_child_index; k++) {
00275 //        CkPrintf("TREE pe %d loc child[%d,%d] %d\n",pe,child,k,node_pes[k]);
00276         (*children)[child++]=node_pes[k];
00277       }
00278     }
00279     if (rem_children > 0) {
00280       for(k=first_rem_child_index; k <= last_rem_child_index; k++)  {
00281 //        CkPrintf("TREE pe %d rem child[%d,%d] %d\n",pe,child,k,node_ids[k]);
00282         (*children)[child++]=node_ids[k];
00283       }
00284     }
00285   }
00286   delete [] node_ids;
00287   delete [] node_pes;
00288 }
00289 
00290 // constructor
00291 ReductionMgr::ReductionMgr() {
00292     if (CkpvAccess(ReductionMgr_instance) == 0) {
00293       CkpvAccess(ReductionMgr_instance) = this;
00294     } else {
00295       DebugM(1, "ReductionMgr::ReductionMgr() - another instance exists!\n");
00296     }
00297     
00298     buildSpanTree(CkMyPe(),REDUCTION_MAX_CHILDREN,REDUCTION_MAX_CHILDREN,
00299                   &myParent,&numChildren,&children);
00300     
00301 //    CkPrintf("TREE [%d] parent %d %d children\n",
00302 //      CkMyPe(),myParent,numChildren);
00303 //    if (numChildren > 0) {
00304 //      for(int i=0; i < numChildren; i++)  {
00305 //        CkPrintf("TREE [%d] child %d %d\n",CkMyPe(),i,children[i]);
00306 //      }
00307 //    }
00308     
00309     // fill in the spanning tree fields
00310 #if 0  // Old spanning tree
00311     if (CkMyPe() == 0) {
00312       myParent = -1;
00313     } else {
00314       myParent = (CkMyPe()-1)/REDUCTION_MAX_CHILDREN;
00315     }
00316     firstChild = CkMyPe()*REDUCTION_MAX_CHILDREN + 1;
00317     if (firstChild > CkNumPes()) firstChild = CkNumPes();
00318     lastChild = firstChild + REDUCTION_MAX_CHILDREN;
00319     if (lastChild > CkNumPes()) lastChild = CkNumPes();
00320 #endif
00321 
00322     // initialize data
00323     for(int i=0; i<REDUCTION_MAX_SET_ID; i++) {
00324       reductionSets[i] = 0;
00325     }
00326 
00327     DebugM(1,"ReductionMgr() instantiated.\n");
00328 }
00329 
00330 // destructor
00331 ReductionMgr::~ReductionMgr() {
00332     if (children != 0)
00333       delete [] children;
00334     for(int i=0; i<REDUCTION_MAX_SET_ID; i++) {
00335       delete reductionSets[i];
00336     }
00337 
00338 }
00339 
00340 // possibly create and return reduction set
00341 ReductionSet* ReductionMgr::getSet(int setID, int size) {
00342   if ( reductionSets[setID] == 0 ) {
00343     reductionSets[setID] = new ReductionSet(setID,size,numChildren);
00344     if ( ! isRoot() ) {
00345       ReductionRegisterMsg *msg = new ReductionRegisterMsg;
00346       msg->reductionSetID = setID;
00347       msg->dataSize = size;
00348       msg->sourceNode = CkMyPe();
00349       CProxy_ReductionMgr reductionProxy(thisgroup);
00350       reductionProxy[myParent].remoteRegister(msg);
00351     }
00352   }
00353   return reductionSets[setID];
00354 }
00355 
00356 // possibly delete reduction set
00357 void ReductionMgr::delSet(int setID) {
00358   ReductionSet *set = reductionSets[setID];
00359   if ( set && ! set->submitsRegistered & ! set->requireRegistered ) {
00360     if ( ! isRoot() ) {
00361       ReductionRegisterMsg *msg = new ReductionRegisterMsg;
00362       msg->reductionSetID = setID;
00363       msg->sourceNode = CkMyPe();
00364       CProxy_ReductionMgr reductionProxy(thisgroup);
00365       reductionProxy[myParent].remoteUnregister(msg);
00366     }
00367     delete set;
00368     reductionSets[setID] = 0;
00369   }
00370 }
00371 
00372 // register local submit
00373 SubmitReduction* ReductionMgr::willSubmit(int setID, int size) {
00374   ReductionSet *set = getSet(setID, size);
00375   ReductionSetData *data = set->getData(set->nextSequenceNumber);
00376   if ( data->submitsRecorded ) {
00377     NAMD_die("ReductionMgr::willSubmit called while reductions outstanding!");
00378   }
00379 
00380   set->submitsRegistered++;
00381 
00382   SubmitReduction *handle = new SubmitReduction;
00383   handle->reductionSetID = setID;
00384   handle->sequenceNumber = set->nextSequenceNumber;
00385   handle->master = this;
00386   handle->data = data->data;
00387 
00388   return handle;
00389 }
00390 
00391 // unregister local submit
00392 void ReductionMgr::remove(SubmitReduction* handle) {
00393   int setID = handle->reductionSetID;
00394   ReductionSet *set = reductionSets[setID];
00395   if ( set->getData(set->nextSequenceNumber)->submitsRecorded ) {
00396     NAMD_die("SubmitReduction deleted while reductions outstanding!");
00397   }
00398 
00399   set->submitsRegistered--;
00400 
00401   delSet(setID);
00402 }
00403 
00404 // local submit
00405 void ReductionMgr::submit(SubmitReduction* handle) {
00406   int setID = handle->reductionSetID;
00407   int seqNum = handle->sequenceNumber;
00408   ReductionSet *set = reductionSets[setID];
00409   ReductionSetData *data = set->getData(seqNum);
00410 
00411   data->submitsRecorded++;
00412   if ( data->submitsRecorded == set->submitsRegistered ) {
00413     mergeAndDeliver(set,seqNum);
00414   }
00415 
00416   handle->sequenceNumber = ++seqNum;
00417   handle->data = set->getData(seqNum)->data;
00418 }
00419 
00420 // register submit from child
00421 void ReductionMgr::remoteRegister(ReductionRegisterMsg *msg) {
00422 
00423   int setID = msg->reductionSetID;
00424   int size = msg->dataSize;
00425   ReductionSet *set = getSet(setID,size);
00426   if ( set->getData(set->nextSequenceNumber)->submitsRecorded ) {
00427     NAMD_die("ReductionMgr::remoteRegister called while reductions outstanding on parent!");
00428   }
00429 
00430   set->submitsRegistered++;
00431   set->addToRemoteSequenceNumber[childIndex(msg->sourceNode)]
00432                                         = set->nextSequenceNumber;
00433 //  CkPrintf("[%d] reduction register received from node[%d] %d\n",
00434 //    CkMyPe(),childIndex(msg->sourceNode),msg->sourceNode);
00435     
00436   delete msg;
00437 }
00438 
00439 // unregister submit from child
00440 void ReductionMgr::remoteUnregister(ReductionRegisterMsg *msg) {
00441 
00442   int setID = msg->reductionSetID;
00443   ReductionSet *set = reductionSets[setID];
00444   if ( set->getData(set->nextSequenceNumber)->submitsRecorded ) {
00445     NAMD_die("SubmitReduction deleted while reductions outstanding on parent!");
00446   }
00447 
00448   set->submitsRegistered--;
00449 
00450   delSet(setID);
00451   delete msg;
00452 }
00453 
00454 // data submitted from child
00455 void ReductionMgr::remoteSubmit(ReductionSubmitMsg *msg) {
00456   int setID = msg->reductionSetID;
00457   ReductionSet *set = reductionSets[setID];
00458   int seqNum = msg->sequenceNumber
00459         + set->addToRemoteSequenceNumber[childIndex(msg->sourceNode)];
00460 
00461 //iout << "seq " << seqNum << " from " << msg->sourceNode << " received on " << CkMyPe() << "\n" << endi;
00462   int size = msg->dataSize;
00463   if ( size != set->dataSize ) {
00464     NAMD_bug("ReductionMgr::remoteSubmit data sizes do not match.");
00465   }
00466 
00467   BigReal *newData = msg->data;
00468   ReductionSetData *data = set->getData(seqNum);
00469   BigReal *curData = data->data;
00470 #ifdef ARCH_POWERPC
00471 #pragma disjoint (*curData,  *newData)
00472 #pragma unroll(4)
00473 #endif
00474   for ( int i = 0; i < size; ++i ) {
00475     curData[i] += newData[i];
00476   }
00477 //  CkPrintf("[%d] reduction Submit received from node[%d] %d\n",
00478 //    CkMyPe(),childIndex(msg->sourceNode),msg->sourceNode);
00479   delete msg;
00480 
00481   data->submitsRecorded++;
00482   if ( data->submitsRecorded == set->submitsRegistered ) {
00483     mergeAndDeliver(set,seqNum);
00484   }
00485 }
00486 
00487 // common code for submission and delivery
00488 void ReductionMgr::mergeAndDeliver(ReductionSet *set, int seqNum) {
00489 
00490 //iout << "seq " << seqNum << " complete on " << CkMyPe() << "\n" << endi;
00491  
00492     set->nextSequenceNumber++; // should match all clients
00493 
00494     ReductionSetData *data = set->getData(seqNum);
00495     if ( data->submitsRecorded != set->submitsRegistered ) {
00496       NAMD_bug("ReductionMgr::mergeAndDeliver not ready to deliver.");
00497     }
00498 
00499     if ( isRoot() ) {
00500       if ( set->requireRegistered ) {
00501         if ( set->threadIsWaiting && set->waitingForSequenceNumber == seqNum) {
00502           // awaken the thread so it can take the data
00503           CthAwaken(set->waitingThread);
00504         }
00505       } else {
00506         NAMD_die("ReductionSet::deliver will never deliver data");
00507       }
00508     } else {
00509       // send data to parent
00510       int size = set->dataSize;
00511       ReductionSubmitMsg *msg = new(&size,1) ReductionSubmitMsg;
00512       msg->reductionSetID = set->reductionSetID;
00513       msg->sourceNode = CkMyPe();
00514       msg->sequenceNumber = seqNum;
00515       msg->dataSize = set->dataSize;
00516       for ( int i = 0; i < msg->dataSize; ++i ) {
00517         msg->data[i] = data->data[i];
00518       }
00519       CProxy_ReductionMgr reductionProxy(thisgroup);
00520       reductionProxy[myParent].remoteSubmit(msg);
00521       delete set->removeData(seqNum);
00522     }
00523 
00524 }
00525 
00526 // register require
00527 RequireReduction* ReductionMgr::willRequire(int setID, int size) {
00528   ReductionSet *set = getSet(setID,size);
00529   set->requireRegistered++;
00530   if ( set->getData(set->nextSequenceNumber)->submitsRecorded ) {
00531     NAMD_die("ReductionMgr::willRequire called while reductions outstanding!");
00532   }
00533 
00534   RequireReduction *handle = new RequireReduction;
00535   handle->reductionSetID = setID;
00536   handle->sequenceNumber = set->nextSequenceNumber;
00537   handle->master = this;
00538 
00539   return handle;
00540 }
00541 
00542 // unregister require
00543 void ReductionMgr::remove(RequireReduction* handle) {
00544   int setID = handle->reductionSetID;
00545   ReductionSet *set = reductionSets[setID];
00546   if ( set->getData(set->nextSequenceNumber)->submitsRecorded ) {
00547     NAMD_die("RequireReduction deleted while reductions outstanding!");
00548   }
00549 
00550   set->requireRegistered--;
00551 
00552   delSet(setID);
00553 }
00554 
00555 // require the data from a thread
00556 void ReductionMgr::require(RequireReduction* handle) {
00557   int setID = handle->reductionSetID;
00558   ReductionSet *set = reductionSets[setID];
00559   int seqNum = handle->sequenceNumber;
00560   ReductionSetData *data = set->getData(seqNum);
00561   if ( data->submitsRecorded < set->submitsRegistered ) {
00562     set->threadIsWaiting = 1;
00563     set->waitingForSequenceNumber = seqNum;
00564     set->waitingThread = CthSelf();
00565 //iout << "seq " << seqNum << " waiting\n" << endi;
00566     CthSuspend();
00567   }
00568   set->threadIsWaiting = 0;
00569 
00570 //iout << "seq " << seqNum << " consumed\n" << endi;
00571   delete handle->currentData;
00572   handle->currentData = set->removeData(seqNum);
00573   handle->data = handle->currentData->data;
00574   handle->sequenceNumber = ++seqNum;
00575 }
00576 
00577 
00578 #include "ReductionMgr.def.h"
00579 // nothing should be placed below here
00580 

Generated on Mon Nov 23 04:59:24 2009 for NAMD by  doxygen 1.3.9.1