Main Page | Namespace List | Class Hierarchy | Alphabetical List | Class List | File List | Class Members | File Members

ReductionMgr.C

Go to the documentation of this file.
00001 
00007 /*
00008    The order of execution is expected to be:
00009             0. instantiate object
00010    ------------------ (processing barrier)
00011    (mode 0) 1. register() and subscribe()
00012    ------------------
00013    (mode 1) 2. submit() and request()
00014    ------------------
00015    (mode 2) 3. unregister() and unsubscribe()
00016    ------------------ (processing barrier)
00017             4. destroy object
00018    Doing this out-of-order will cause errors.
00019 
00020    Assumes that *only* one thread will require() a specific sequence's data.
00021 */
00022 
00023 #include <stdlib.h>
00024 #include <stdio.h>
00025 
00026 #include "InfoStream.h"
00027 #include "PatchMap.h"   // for patchMap
00028 
00029 #include "Node.h"
00030 #include "SimParameters.h"
00031 
00032 #include "ReductionMgr.decl.h"
00033 #include "ReductionMgr.h"
00034 
00035 // #define DEBUGM
00036 #define MIN_DEBUG_LEVEL 4
00037 #include "Debug.h"
00038 
00039 // Used to register and unregister reductions to downstream nodes
00040 class ReductionRegisterMsg : public CMessage_ReductionRegisterMsg {
00041 public:
00042   int reductionSetID;
00043   int dataSize;
00044   int sourceNode;
00045 };
00046 
00047 // Used to send reduction data to downstream nodes
00048 class ReductionSubmitMsg : public CMessage_ReductionSubmitMsg {
00049 public:
00050   int reductionSetID;
00051   int sourceNode;
00052   int sequenceNumber;
00053   int dataSize;
00054   BigReal data[1];
00055 
00056   static void *alloc(int msgnum, int size, int *array, int priobits) {
00057     int totalsize = size + array[0]*sizeof(BigReal);
00058     ReductionSubmitMsg *newMsg = (ReductionSubmitMsg*)
00059                                 CkAllocMsg(msgnum,totalsize,priobits);
00060     // newMsg->data = (BigReal*) ((char*)newMsg + size);
00061     return (void*)newMsg;
00062   }
00063 
00064   static void *pack(ReductionSubmitMsg *in) {
00065     // in->data = (BigReal*) ((char*)in->data - (char*)&(in->data));
00066     return (void*)in;
00067   }
00068 
00069   static ReductionSubmitMsg *unpack(void *in) {
00070     ReductionSubmitMsg *me = (ReductionSubmitMsg*)in;
00071     // me->data = (BigReal*) ((char*)&(me->data) + (size_t)(me->data));
00072     return me;
00073   }
00074 
00075 };
00076 
00077 ReductionSet::ReductionSet(int setID, int size) {
00078   if ( setID == REDUCTIONS_BASIC ) {
00079     if ( size != -1 ) {
00080       NAMD_bug("ReductionSet size specified for REDUCTIONS_BASIC.");
00081     }
00082     size = REDUCTION_MAX_RESERVED;
00083   }
00084   if ( size == -1 ) NAMD_bug("ReductionSet size not specified.");
00085   dataSize = size;
00086   reductionSetID = setID;
00087   nextSequenceNumber = 0;
00088   submitsRegistered = 0;
00089   dataQueue = 0;
00090   requireRegistered = 0;
00091   threadIsWaiting = 0;
00092 }
00093 
00094 ReductionSet::~ReductionSet() {
00095 
00096   ReductionSetData *current = dataQueue;
00097 
00098   while ( current ) {
00099     ReductionSetData *next = current->next;
00100     delete current;
00101     current = next;
00102   }
00103 }
00104 
00105 // possibly create and return data for a particular seqNum
00106 ReductionSetData* ReductionSet::getData(int seqNum) {
00107 
00108   ReductionSetData **current = &dataQueue;
00109 
00110   while ( *current ) {
00111     if ( (*current)->sequenceNumber == seqNum ) return *current;
00112     current = &((*current)->next);
00113   }
00114 
00115 //iout << "seq " << seqNum << " created on " << CkMyPe() << "\n" << endi;
00116   *current = new ReductionSetData(seqNum, dataSize);
00117   return *current;
00118 }
00119 
00120 // possibly delete data for a particular seqNum
00121 ReductionSetData* ReductionSet::removeData(int seqNum) {
00122 
00123   ReductionSetData **current = &dataQueue;
00124 
00125   while ( *current ) {
00126     if ( (*current)->sequenceNumber == seqNum ) break;
00127     current = &((*current)->next);
00128   }
00129 
00130   if ( ! *current ) { NAMD_die("ReductionSet::removeData on missing seqNum"); }
00131 
00132   ReductionSetData *toremove = *current;
00133   *current = (*current)->next;
00134   return toremove;
00135 }
00136 
00137 // constructor
00138 ReductionMgr::ReductionMgr() {
00139     if (CkpvAccess(ReductionMgr_instance) == 0) {
00140       CkpvAccess(ReductionMgr_instance) = this;
00141     } else {
00142       DebugM(1, "ReductionMgr::ReductionMgr() - another instance exists!\n");
00143     }
00144 
00145     // fill in the spanning tree fields
00146     if (CkMyPe() == 0) {
00147       myParent = -1;
00148     } else {
00149       myParent = (CkMyPe()-1)/REDUCTION_MAX_CHILDREN;
00150     }
00151     firstChild = CkMyPe()*REDUCTION_MAX_CHILDREN + 1;
00152     if (firstChild > CkNumPes()) firstChild = CkNumPes();
00153     lastChild = firstChild + REDUCTION_MAX_CHILDREN;
00154     if (lastChild > CkNumPes()) lastChild = CkNumPes();
00155 
00156     // initialize data
00157     for(int i=0; i<REDUCTION_MAX_SET_ID; i++) {
00158       reductionSets[i] = 0;
00159     }
00160 
00161     DebugM(1,"ReductionMgr() instantiated.\n");
00162 }
00163 
00164 // destructor
00165 ReductionMgr::~ReductionMgr() {
00166     for(int i=0; i<REDUCTION_MAX_SET_ID; i++) {
00167       delete reductionSets[i];
00168     }
00169 
00170 }
00171 
00172 // possibly create and return reduction set
00173 ReductionSet* ReductionMgr::getSet(int setID, int size) {
00174   if ( reductionSets[setID] == 0 ) {
00175     reductionSets[setID] = new ReductionSet(setID,size);
00176     if ( ! isRoot() ) {
00177       ReductionRegisterMsg *msg = new ReductionRegisterMsg;
00178       msg->reductionSetID = setID;
00179       msg->dataSize = size;
00180       msg->sourceNode = CkMyPe();
00181 #if CHARM_VERSION > 050402
00182       CProxy_ReductionMgr reductionProxy(thisgroup);
00183       reductionProxy[myParent].remoteRegister(msg);
00184 #else
00185       CProxy_ReductionMgr(thisgroup).remoteRegister(msg,myParent);
00186 #endif
00187     }
00188   }
00189   return reductionSets[setID];
00190 }
00191 
00192 // possibly delete reduction set
00193 void ReductionMgr::delSet(int setID) {
00194   ReductionSet *set = reductionSets[setID];
00195   if ( set && ! set->submitsRegistered & ! set->requireRegistered ) {
00196     if ( ! isRoot() ) {
00197       ReductionRegisterMsg *msg = new ReductionRegisterMsg;
00198       msg->reductionSetID = setID;
00199       msg->sourceNode = CkMyPe();
00200 #if CHARM_VERSION > 050402
00201       CProxy_ReductionMgr reductionProxy(thisgroup);
00202       reductionProxy[myParent].remoteUnregister(msg);
00203 #else
00204       CProxy_ReductionMgr(thisgroup).remoteUnregister(msg,myParent);
00205 #endif
00206     }
00207     delete set;
00208     reductionSets[setID] = 0;
00209   }
00210 }
00211 
00212 // register local submit
00213 SubmitReduction* ReductionMgr::willSubmit(int setID, int size) {
00214   ReductionSet *set = getSet(setID, size);
00215   ReductionSetData *data = set->getData(set->nextSequenceNumber);
00216   if ( data->submitsRecorded ) {
00217     NAMD_die("ReductionMgr::willSubmit called while reductions outstanding!");
00218   }
00219 
00220   set->submitsRegistered++;
00221 
00222   SubmitReduction *handle = new SubmitReduction;
00223   handle->reductionSetID = setID;
00224   handle->sequenceNumber = set->nextSequenceNumber;
00225   handle->master = this;
00226   handle->data = data->data;
00227 
00228   return handle;
00229 }
00230 
00231 // unregister local submit
00232 void ReductionMgr::remove(SubmitReduction* handle) {
00233   int setID = handle->reductionSetID;
00234   ReductionSet *set = reductionSets[setID];
00235   if ( set->getData(set->nextSequenceNumber)->submitsRecorded ) {
00236     NAMD_die("SubmitReduction deleted while reductions outstanding!");
00237   }
00238 
00239   set->submitsRegistered--;
00240 
00241   delSet(setID);
00242 }
00243 
00244 // local submit
00245 void ReductionMgr::submit(SubmitReduction* handle) {
00246   int setID = handle->reductionSetID;
00247   int seqNum = handle->sequenceNumber;
00248   ReductionSet *set = reductionSets[setID];
00249   ReductionSetData *data = set->getData(seqNum);
00250 
00251   data->submitsRecorded++;
00252   if ( data->submitsRecorded == set->submitsRegistered ) {
00253     mergeAndDeliver(set,seqNum);
00254   }
00255 
00256   handle->sequenceNumber = ++seqNum;
00257   handle->data = set->getData(seqNum)->data;
00258 }
00259 
00260 // register submit from child
00261 void ReductionMgr::remoteRegister(ReductionRegisterMsg *msg) {
00262 
00263   int setID = msg->reductionSetID;
00264   int size = msg->dataSize;
00265   ReductionSet *set = getSet(setID,size);
00266   if ( set->getData(set->nextSequenceNumber)->submitsRecorded ) {
00267     NAMD_die("ReductionMgr::remoteRegister called while reductions outstanding on parent!");
00268   }
00269 
00270   set->submitsRegistered++;
00271   set->addToRemoteSequenceNumber[msg->sourceNode - firstChild]
00272                                         = set->nextSequenceNumber;
00273   delete msg;
00274 }
00275 
00276 // unregister submit from child
00277 void ReductionMgr::remoteUnregister(ReductionRegisterMsg *msg) {
00278 
00279   int setID = msg->reductionSetID;
00280   ReductionSet *set = reductionSets[setID];
00281   if ( set->getData(set->nextSequenceNumber)->submitsRecorded ) {
00282     NAMD_die("SubmitReduction deleted while reductions outstanding on parent!");
00283   }
00284 
00285   set->submitsRegistered--;
00286 
00287   delSet(setID);
00288   delete msg;
00289 }
00290 
00291 // data submitted from child
00292 void ReductionMgr::remoteSubmit(ReductionSubmitMsg *msg) {
00293   int setID = msg->reductionSetID;
00294   ReductionSet *set = reductionSets[setID];
00295   int seqNum = msg->sequenceNumber
00296         + set->addToRemoteSequenceNumber[msg->sourceNode - firstChild];
00297 
00298 //iout << "seq " << seqNum << " from " << msg->sourceNode << " received on " << CkMyPe() << "\n" << endi;
00299   int size = msg->dataSize;
00300   if ( size != set->dataSize ) {
00301     NAMD_bug("ReductionMgr::remoteSubmit data sizes do not match.");
00302   }
00303 
00304   BigReal *newData = msg->data;
00305   ReductionSetData *data = set->getData(seqNum);
00306   BigReal *curData = data->data;
00307 #ifdef ARCH_POWERPC
00308 #pragma disjoint (*curData,  *newData)
00309 #pragma unroll(4)
00310 #endif
00311   for ( int i = 0; i < size; ++i ) {
00312     curData[i] += newData[i];
00313   }
00314   delete msg;
00315 
00316   data->submitsRecorded++;
00317   if ( data->submitsRecorded == set->submitsRegistered ) {
00318     mergeAndDeliver(set,seqNum);
00319   }
00320 }
00321 
00322 // common code for submission and delivery
00323 void ReductionMgr::mergeAndDeliver(ReductionSet *set, int seqNum) {
00324 
00325 //iout << "seq " << seqNum << " complete on " << CkMyPe() << "\n" << endi;
00326  
00327     set->nextSequenceNumber++; // should match all clients
00328 
00329     ReductionSetData *data = set->getData(seqNum);
00330     if ( data->submitsRecorded != set->submitsRegistered ) {
00331       NAMD_bug("ReductionMgr::mergeAndDeliver not ready to deliver.");
00332     }
00333 
00334     if ( isRoot() ) {
00335       if ( set->requireRegistered ) {
00336         if ( set->threadIsWaiting && set->waitingForSequenceNumber == seqNum) {
00337           // awaken the thread so it can take the data
00338           CthAwaken(set->waitingThread);
00339         }
00340       } else {
00341         NAMD_die("ReductionSet::deliver will never deliver data");
00342       }
00343     } else {
00344       // send data to parent
00345       int size = set->dataSize;
00346       ReductionSubmitMsg *msg = new(&size,1) ReductionSubmitMsg;
00347       msg->reductionSetID = set->reductionSetID;
00348       msg->sourceNode = CkMyPe();
00349       msg->sequenceNumber = seqNum;
00350       msg->dataSize = set->dataSize;
00351       for ( int i = 0; i < msg->dataSize; ++i ) {
00352         msg->data[i] = data->data[i];
00353       }
00354 #if CHARM_VERSION > 050402
00355       CProxy_ReductionMgr reductionProxy(thisgroup);
00356       reductionProxy[myParent].remoteSubmit(msg);
00357 #else
00358       CProxy_ReductionMgr(thisgroup).remoteSubmit(msg,myParent);
00359 #endif
00360       delete set->removeData(seqNum);
00361     }
00362 
00363 }
00364 
00365 // register require
00366 RequireReduction* ReductionMgr::willRequire(int setID, int size) {
00367   ReductionSet *set = getSet(setID,size);
00368   set->requireRegistered++;
00369   if ( set->getData(set->nextSequenceNumber)->submitsRecorded ) {
00370     NAMD_die("ReductionMgr::willRequire called while reductions outstanding!");
00371   }
00372 
00373   RequireReduction *handle = new RequireReduction;
00374   handle->reductionSetID = setID;
00375   handle->sequenceNumber = set->nextSequenceNumber;
00376   handle->master = this;
00377 
00378   return handle;
00379 }
00380 
00381 // unregister require
00382 void ReductionMgr::remove(RequireReduction* handle) {
00383   int setID = handle->reductionSetID;
00384   ReductionSet *set = reductionSets[setID];
00385   if ( set->getData(set->nextSequenceNumber)->submitsRecorded ) {
00386     NAMD_die("RequireReduction deleted while reductions outstanding!");
00387   }
00388 
00389   set->requireRegistered--;
00390 
00391   delSet(setID);
00392 }
00393 
00394 // require the data from a thread
00395 void ReductionMgr::require(RequireReduction* handle) {
00396   int setID = handle->reductionSetID;
00397   ReductionSet *set = reductionSets[setID];
00398   int seqNum = handle->sequenceNumber;
00399   ReductionSetData *data = set->getData(seqNum);
00400   if ( data->submitsRecorded < set->submitsRegistered ) {
00401     set->threadIsWaiting = 1;
00402     set->waitingForSequenceNumber = seqNum;
00403     set->waitingThread = CthSelf();
00404 //iout << "seq " << seqNum << " waiting\n" << endi;
00405     CthSuspend();
00406   }
00407   set->threadIsWaiting = 0;
00408 
00409 //iout << "seq " << seqNum << " consumed\n" << endi;
00410   delete handle->currentData;
00411   handle->currentData = set->removeData(seqNum);
00412   handle->data = handle->currentData->data;
00413   handle->sequenceNumber = ++seqNum;
00414 }
00415 
00416 
00417 #include "ReductionMgr.def.h"
00418 // nothing should be placed below here
00419 

Generated on Fri Sep 5 04:07:15 2008 for NAMD by  doxygen 1.3.9.1