00001
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #include <stdlib.h>
00024 #include <stdio.h>
00025
00026 #include "InfoStream.h"
00027 #include "PatchMap.h"
00028
00029 #include "Node.h"
00030 #include "SimParameters.h"
00031
00032 #include "ReductionMgr.decl.h"
00033 #include "ReductionMgr.h"
00034
00035
00036 #define MIN_DEBUG_LEVEL 4
00037 #include "Debug.h"
00038
00039
00040 class ReductionRegisterMsg : public CMessage_ReductionRegisterMsg {
00041 public:
00042 int reductionSetID;
00043 int dataSize;
00044 int sourceNode;
00045 };
00046
00047
00048 class ReductionSubmitMsg : public CMessage_ReductionSubmitMsg {
00049 public:
00050 int reductionSetID;
00051 int sourceNode;
00052 int sequenceNumber;
00053 int dataSize;
00054 BigReal data[1];
00055
00056 static void *alloc(int msgnum, int size, int *array, int priobits) {
00057 int totalsize = size + array[0]*sizeof(BigReal);
00058 ReductionSubmitMsg *newMsg = (ReductionSubmitMsg*)
00059 CkAllocMsg(msgnum,totalsize,priobits);
00060
00061 return (void*)newMsg;
00062 }
00063
00064 static void *pack(ReductionSubmitMsg *in) {
00065
00066 return (void*)in;
00067 }
00068
00069 static ReductionSubmitMsg *unpack(void *in) {
00070 ReductionSubmitMsg *me = (ReductionSubmitMsg*)in;
00071
00072 return me;
00073 }
00074
00075 };
00076
00077 ReductionSet::ReductionSet(int setID, int size) {
00078 if ( setID == REDUCTIONS_BASIC ) {
00079 if ( size != -1 ) {
00080 NAMD_bug("ReductionSet size specified for REDUCTIONS_BASIC.");
00081 }
00082 size = REDUCTION_MAX_RESERVED;
00083 }
00084 if ( size == -1 ) NAMD_bug("ReductionSet size not specified.");
00085 dataSize = size;
00086 reductionSetID = setID;
00087 nextSequenceNumber = 0;
00088 submitsRegistered = 0;
00089 dataQueue = 0;
00090 requireRegistered = 0;
00091 threadIsWaiting = 0;
00092 }
00093
00094 ReductionSet::~ReductionSet() {
00095
00096 ReductionSetData *current = dataQueue;
00097
00098 while ( current ) {
00099 ReductionSetData *next = current->next;
00100 delete current;
00101 current = next;
00102 }
00103 }
00104
00105
00106 ReductionSetData* ReductionSet::getData(int seqNum) {
00107
00108 ReductionSetData **current = &dataQueue;
00109
00110 while ( *current ) {
00111 if ( (*current)->sequenceNumber == seqNum ) return *current;
00112 current = &((*current)->next);
00113 }
00114
00115
00116 *current = new ReductionSetData(seqNum, dataSize);
00117 return *current;
00118 }
00119
00120
00121 ReductionSetData* ReductionSet::removeData(int seqNum) {
00122
00123 ReductionSetData **current = &dataQueue;
00124
00125 while ( *current ) {
00126 if ( (*current)->sequenceNumber == seqNum ) break;
00127 current = &((*current)->next);
00128 }
00129
00130 if ( ! *current ) { NAMD_die("ReductionSet::removeData on missing seqNum"); }
00131
00132 ReductionSetData *toremove = *current;
00133 *current = (*current)->next;
00134 return toremove;
00135 }
00136
00137
00138 ReductionMgr::ReductionMgr() {
00139 if (CkpvAccess(ReductionMgr_instance) == 0) {
00140 CkpvAccess(ReductionMgr_instance) = this;
00141 } else {
00142 DebugM(1, "ReductionMgr::ReductionMgr() - another instance exists!\n");
00143 }
00144
00145
00146 if (CkMyPe() == 0) {
00147 myParent = -1;
00148 } else {
00149 myParent = (CkMyPe()-1)/REDUCTION_MAX_CHILDREN;
00150 }
00151 firstChild = CkMyPe()*REDUCTION_MAX_CHILDREN + 1;
00152 if (firstChild > CkNumPes()) firstChild = CkNumPes();
00153 lastChild = firstChild + REDUCTION_MAX_CHILDREN;
00154 if (lastChild > CkNumPes()) lastChild = CkNumPes();
00155
00156
00157 for(int i=0; i<REDUCTION_MAX_SET_ID; i++) {
00158 reductionSets[i] = 0;
00159 }
00160
00161 DebugM(1,"ReductionMgr() instantiated.\n");
00162 }
00163
00164
00165 ReductionMgr::~ReductionMgr() {
00166 for(int i=0; i<REDUCTION_MAX_SET_ID; i++) {
00167 delete reductionSets[i];
00168 }
00169
00170 }
00171
00172
00173 ReductionSet* ReductionMgr::getSet(int setID, int size) {
00174 if ( reductionSets[setID] == 0 ) {
00175 reductionSets[setID] = new ReductionSet(setID,size);
00176 if ( ! isRoot() ) {
00177 ReductionRegisterMsg *msg = new ReductionRegisterMsg;
00178 msg->reductionSetID = setID;
00179 msg->dataSize = size;
00180 msg->sourceNode = CkMyPe();
00181 #if CHARM_VERSION > 050402
00182 CProxy_ReductionMgr reductionProxy(thisgroup);
00183 reductionProxy[myParent].remoteRegister(msg);
00184 #else
00185 CProxy_ReductionMgr(thisgroup).remoteRegister(msg,myParent);
00186 #endif
00187 }
00188 }
00189 return reductionSets[setID];
00190 }
00191
00192
00193 void ReductionMgr::delSet(int setID) {
00194 ReductionSet *set = reductionSets[setID];
00195 if ( set && ! set->submitsRegistered & ! set->requireRegistered ) {
00196 if ( ! isRoot() ) {
00197 ReductionRegisterMsg *msg = new ReductionRegisterMsg;
00198 msg->reductionSetID = setID;
00199 msg->sourceNode = CkMyPe();
00200 #if CHARM_VERSION > 050402
00201 CProxy_ReductionMgr reductionProxy(thisgroup);
00202 reductionProxy[myParent].remoteUnregister(msg);
00203 #else
00204 CProxy_ReductionMgr(thisgroup).remoteUnregister(msg,myParent);
00205 #endif
00206 }
00207 delete set;
00208 reductionSets[setID] = 0;
00209 }
00210 }
00211
00212
00213 SubmitReduction* ReductionMgr::willSubmit(int setID, int size) {
00214 ReductionSet *set = getSet(setID, size);
00215 ReductionSetData *data = set->getData(set->nextSequenceNumber);
00216 if ( data->submitsRecorded ) {
00217 NAMD_die("ReductionMgr::willSubmit called while reductions outstanding!");
00218 }
00219
00220 set->submitsRegistered++;
00221
00222 SubmitReduction *handle = new SubmitReduction;
00223 handle->reductionSetID = setID;
00224 handle->sequenceNumber = set->nextSequenceNumber;
00225 handle->master = this;
00226 handle->data = data->data;
00227
00228 return handle;
00229 }
00230
00231
00232 void ReductionMgr::remove(SubmitReduction* handle) {
00233 int setID = handle->reductionSetID;
00234 ReductionSet *set = reductionSets[setID];
00235 if ( set->getData(set->nextSequenceNumber)->submitsRecorded ) {
00236 NAMD_die("SubmitReduction deleted while reductions outstanding!");
00237 }
00238
00239 set->submitsRegistered--;
00240
00241 delSet(setID);
00242 }
00243
00244
00245 void ReductionMgr::submit(SubmitReduction* handle) {
00246 int setID = handle->reductionSetID;
00247 int seqNum = handle->sequenceNumber;
00248 ReductionSet *set = reductionSets[setID];
00249 ReductionSetData *data = set->getData(seqNum);
00250
00251 data->submitsRecorded++;
00252 if ( data->submitsRecorded == set->submitsRegistered ) {
00253 mergeAndDeliver(set,seqNum);
00254 }
00255
00256 handle->sequenceNumber = ++seqNum;
00257 handle->data = set->getData(seqNum)->data;
00258 }
00259
00260
00261 void ReductionMgr::remoteRegister(ReductionRegisterMsg *msg) {
00262
00263 int setID = msg->reductionSetID;
00264 int size = msg->dataSize;
00265 ReductionSet *set = getSet(setID,size);
00266 if ( set->getData(set->nextSequenceNumber)->submitsRecorded ) {
00267 NAMD_die("ReductionMgr::remoteRegister called while reductions outstanding on parent!");
00268 }
00269
00270 set->submitsRegistered++;
00271 set->addToRemoteSequenceNumber[msg->sourceNode - firstChild]
00272 = set->nextSequenceNumber;
00273 delete msg;
00274 }
00275
00276
00277 void ReductionMgr::remoteUnregister(ReductionRegisterMsg *msg) {
00278
00279 int setID = msg->reductionSetID;
00280 ReductionSet *set = reductionSets[setID];
00281 if ( set->getData(set->nextSequenceNumber)->submitsRecorded ) {
00282 NAMD_die("SubmitReduction deleted while reductions outstanding on parent!");
00283 }
00284
00285 set->submitsRegistered--;
00286
00287 delSet(setID);
00288 delete msg;
00289 }
00290
00291
00292 void ReductionMgr::remoteSubmit(ReductionSubmitMsg *msg) {
00293 int setID = msg->reductionSetID;
00294 ReductionSet *set = reductionSets[setID];
00295 int seqNum = msg->sequenceNumber
00296 + set->addToRemoteSequenceNumber[msg->sourceNode - firstChild];
00297
00298
00299 int size = msg->dataSize;
00300 if ( size != set->dataSize ) {
00301 NAMD_bug("ReductionMgr::remoteSubmit data sizes do not match.");
00302 }
00303
00304 BigReal *newData = msg->data;
00305 ReductionSetData *data = set->getData(seqNum);
00306 BigReal *curData = data->data;
00307 #ifdef ARCH_POWERPC
00308 #pragma disjoint (*curData, *newData)
00309 #pragma unroll(4)
00310 #endif
00311 for ( int i = 0; i < size; ++i ) {
00312 curData[i] += newData[i];
00313 }
00314 delete msg;
00315
00316 data->submitsRecorded++;
00317 if ( data->submitsRecorded == set->submitsRegistered ) {
00318 mergeAndDeliver(set,seqNum);
00319 }
00320 }
00321
00322
00323 void ReductionMgr::mergeAndDeliver(ReductionSet *set, int seqNum) {
00324
00325
00326
00327 set->nextSequenceNumber++;
00328
00329 ReductionSetData *data = set->getData(seqNum);
00330 if ( data->submitsRecorded != set->submitsRegistered ) {
00331 NAMD_bug("ReductionMgr::mergeAndDeliver not ready to deliver.");
00332 }
00333
00334 if ( isRoot() ) {
00335 if ( set->requireRegistered ) {
00336 if ( set->threadIsWaiting && set->waitingForSequenceNumber == seqNum) {
00337
00338 CthAwaken(set->waitingThread);
00339 }
00340 } else {
00341 NAMD_die("ReductionSet::deliver will never deliver data");
00342 }
00343 } else {
00344
00345 int size = set->dataSize;
00346 ReductionSubmitMsg *msg = new(&size,1) ReductionSubmitMsg;
00347 msg->reductionSetID = set->reductionSetID;
00348 msg->sourceNode = CkMyPe();
00349 msg->sequenceNumber = seqNum;
00350 msg->dataSize = set->dataSize;
00351 for ( int i = 0; i < msg->dataSize; ++i ) {
00352 msg->data[i] = data->data[i];
00353 }
00354 #if CHARM_VERSION > 050402
00355 CProxy_ReductionMgr reductionProxy(thisgroup);
00356 reductionProxy[myParent].remoteSubmit(msg);
00357 #else
00358 CProxy_ReductionMgr(thisgroup).remoteSubmit(msg,myParent);
00359 #endif
00360 delete set->removeData(seqNum);
00361 }
00362
00363 }
00364
00365
00366 RequireReduction* ReductionMgr::willRequire(int setID, int size) {
00367 ReductionSet *set = getSet(setID,size);
00368 set->requireRegistered++;
00369 if ( set->getData(set->nextSequenceNumber)->submitsRecorded ) {
00370 NAMD_die("ReductionMgr::willRequire called while reductions outstanding!");
00371 }
00372
00373 RequireReduction *handle = new RequireReduction;
00374 handle->reductionSetID = setID;
00375 handle->sequenceNumber = set->nextSequenceNumber;
00376 handle->master = this;
00377
00378 return handle;
00379 }
00380
00381
00382 void ReductionMgr::remove(RequireReduction* handle) {
00383 int setID = handle->reductionSetID;
00384 ReductionSet *set = reductionSets[setID];
00385 if ( set->getData(set->nextSequenceNumber)->submitsRecorded ) {
00386 NAMD_die("RequireReduction deleted while reductions outstanding!");
00387 }
00388
00389 set->requireRegistered--;
00390
00391 delSet(setID);
00392 }
00393
00394
00395 void ReductionMgr::require(RequireReduction* handle) {
00396 int setID = handle->reductionSetID;
00397 ReductionSet *set = reductionSets[setID];
00398 int seqNum = handle->sequenceNumber;
00399 ReductionSetData *data = set->getData(seqNum);
00400 if ( data->submitsRecorded < set->submitsRegistered ) {
00401 set->threadIsWaiting = 1;
00402 set->waitingForSequenceNumber = seqNum;
00403 set->waitingThread = CthSelf();
00404
00405 CthSuspend();
00406 }
00407 set->threadIsWaiting = 0;
00408
00409
00410 delete handle->currentData;
00411 handle->currentData = set->removeData(seqNum);
00412 handle->data = handle->currentData->data;
00413 handle->sequenceNumber = ++seqNum;
00414 }
00415
00416
00417 #include "ReductionMgr.def.h"
00418
00419