00001
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #include <stdlib.h>
00024 #include <stdio.h>
00025
00026 #include "InfoStream.h"
00027 #include "PatchMap.h"
00028
00029 #include "Node.h"
00030 #include "SimParameters.h"
00031
00032 #include "ReductionMgr.decl.h"
00033 #include "ReductionMgr.h"
00034
00035
00036 #define MIN_DEBUG_LEVEL 4
00037 #include "Debug.h"
00038
00039
00040 class ReductionRegisterMsg : public CMessage_ReductionRegisterMsg {
00041 public:
00042 int reductionSetID;
00043 int dataSize;
00044 int sourceNode;
00045 };
00046
00047
00048 class ReductionSubmitMsg : public CMessage_ReductionSubmitMsg {
00049 public:
00050 int reductionSetID;
00051 int sourceNode;
00052 int sequenceNumber;
00053 int dataSize;
00054 BigReal data[1];
00055
00056 static void *alloc(int msgnum, int size, int *array, int priobits) {
00057 int totalsize = size + array[0]*sizeof(BigReal);
00058 ReductionSubmitMsg *newMsg = (ReductionSubmitMsg*)
00059 CkAllocMsg(msgnum,totalsize,priobits);
00060
00061 return (void*)newMsg;
00062 }
00063
00064 static void *pack(ReductionSubmitMsg *in) {
00065
00066 return (void*)in;
00067 }
00068
00069 static ReductionSubmitMsg *unpack(void *in) {
00070 ReductionSubmitMsg *me = (ReductionSubmitMsg*)in;
00071
00072 return me;
00073 }
00074
00075 };
00076
00077 ReductionSet::ReductionSet(int setID, int size, int numChildren) {
00078 if ( setID == REDUCTIONS_BASIC ) {
00079 if ( size != -1 ) {
00080 NAMD_bug("ReductionSet size specified for REDUCTIONS_BASIC.");
00081 }
00082 size = REDUCTION_MAX_RESERVED;
00083 }
00084 if ( size == -1 ) NAMD_bug("ReductionSet size not specified.");
00085 dataSize = size;
00086 reductionSetID = setID;
00087 nextSequenceNumber = 0;
00088 submitsRegistered = 0;
00089 dataQueue = 0;
00090 requireRegistered = 0;
00091 threadIsWaiting = 0;
00092 addToRemoteSequenceNumber = new int[numChildren];
00093 }
00094
00095 ReductionSet::~ReductionSet() {
00096
00097 ReductionSetData *current = dataQueue;
00098
00099 while ( current ) {
00100 ReductionSetData *next = current->next;
00101 delete current;
00102 current = next;
00103 }
00104 delete [] addToRemoteSequenceNumber;
00105 }
00106
00107
00108 ReductionSetData* ReductionSet::getData(int seqNum) {
00109
00110 ReductionSetData **current = &dataQueue;
00111
00112 while ( *current ) {
00113 if ( (*current)->sequenceNumber == seqNum ) return *current;
00114 current = &((*current)->next);
00115 }
00116
00117
00118 *current = new ReductionSetData(seqNum, dataSize);
00119 return *current;
00120 }
00121
00122
00123 ReductionSetData* ReductionSet::removeData(int seqNum) {
00124
00125 ReductionSetData **current = &dataQueue;
00126
00127 while ( *current ) {
00128 if ( (*current)->sequenceNumber == seqNum ) break;
00129 current = &((*current)->next);
00130 }
00131
00132 if ( ! *current ) { NAMD_die("ReductionSet::removeData on missing seqNum"); }
00133
00134 ReductionSetData *toremove = *current;
00135 *current = (*current)->next;
00136 return toremove;
00137 }
00138
00139 void ReductionMgr::buildSpanTree(const int pe,
00140 const int max_intranode_children,
00141 const int max_internode_children,
00142 int* parent,
00143 int* num_children,
00144 int** children)
00145 {
00146
00147
00148
00149
00150
00151 #if CHARM_VERSION < 60103
00152 #define CmiPhysicalNodeID(X) X
00153 #define CmiNumPesOnPhysicalNode(X) 1
00154 #define CmiGetFirstPeOnPhysicalNode(X) (X)
00155 #define CmiPeOnSamePhysicalNode(X,Y) ((X)==(Y))
00156 #endif
00157
00158 const int num_pes = CkNumPes();
00159 const int num_node_pes = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(pe));
00160 int *node_pes = new int[num_node_pes];
00161 int pe_index;
00162 const int first_pe = CmiGetFirstPeOnPhysicalNode(CmiPhysicalNodeID(pe));
00163 int num_nodes = 0;
00164 int *node_ids = new int[num_pes];
00165 int first_pe_index;
00166 int my_parent_index;
00167
00168
00169 if (pe == 0 && first_pe != pe) {
00170 NAMD_die("PE 0 is not the first physical node. This shouldn't happen");
00171 }
00172
00173 int i;
00174 int node_pe_count=0;
00175 for (i = 0; i < num_pes; i++) {
00176
00177 if (CmiGetFirstPeOnPhysicalNode(CmiPhysicalNodeID(i)) == i) {
00178 node_ids[num_nodes] = i;
00179 if (i == first_pe)
00180 first_pe_index = num_nodes;
00181 num_nodes++;
00182 }
00183
00184
00185 const int i1 = (i + first_pe) % num_pes;
00186 if (CmiPeOnSamePhysicalNode(first_pe,i1)) {
00187 node_pes[node_pe_count] = i1;
00188 if (pe == i1)
00189 pe_index = node_pe_count;
00190 node_pe_count++;
00191 }
00192 }
00193
00194
00195
00196
00197 int first_loc_child_index = pe_index * max_intranode_children + 1;
00198 int last_loc_child_index
00199 = first_loc_child_index + max_intranode_children - 1;
00200 if (first_loc_child_index > num_node_pes) {
00201 first_loc_child_index = num_node_pes;
00202 last_loc_child_index = num_node_pes;
00203 } else {
00204 if (last_loc_child_index >= num_node_pes)
00205 last_loc_child_index = num_node_pes-1;
00206 }
00207
00208
00209
00210
00211 int first_rem_child_index = num_nodes;
00212 int last_rem_child_index = num_nodes;
00213
00214 if (first_pe != pe) {
00215
00216
00217 my_parent_index = (pe_index-1)/max_intranode_children;
00218 *parent = node_pes[my_parent_index];
00219 } else {
00220
00221
00222
00223 if (pe == 0) {
00224 my_parent_index = -1;
00225 *parent = -1;
00226 } else {
00227 my_parent_index = (first_pe_index-1)/max_internode_children;
00228 *parent = node_ids[my_parent_index];
00229
00230
00231 }
00232 first_rem_child_index = first_pe_index * max_internode_children + 1;
00233 last_rem_child_index = first_rem_child_index + max_internode_children -1;
00234 if (first_rem_child_index > num_nodes) {
00235 first_rem_child_index = num_nodes;
00236 last_rem_child_index = num_nodes;
00237 } else {
00238 if (last_rem_child_index >= num_nodes)
00239 last_rem_child_index = num_nodes-1;
00240 }
00241
00242
00243
00244 }
00245
00246 *num_children = 0;
00247
00248
00249 int loc_children=0;
00250 if (first_loc_child_index != num_node_pes) {
00251 loc_children = last_loc_child_index - first_loc_child_index + 1;
00252 *num_children += loc_children;
00253
00254
00255
00256 }
00257
00258 int rem_children=0;
00259 if (first_rem_child_index != num_nodes) {
00260 rem_children = last_rem_child_index - first_rem_child_index + 1;
00261 *num_children += rem_children;
00262
00263
00264
00265 }
00266 if (*num_children == 0)
00267 *children = 0;
00268 else {
00269 *children = new int[*num_children];
00270
00271 int k;
00272 int child=0;
00273 if (loc_children > 0) {
00274 for(k=first_loc_child_index; k <= last_loc_child_index; k++) {
00275
00276 (*children)[child++]=node_pes[k];
00277 }
00278 }
00279 if (rem_children > 0) {
00280 for(k=first_rem_child_index; k <= last_rem_child_index; k++) {
00281
00282 (*children)[child++]=node_ids[k];
00283 }
00284 }
00285 }
00286 delete [] node_ids;
00287 delete [] node_pes;
00288 }
00289
00290
00291 ReductionMgr::ReductionMgr() {
00292 if (CkpvAccess(ReductionMgr_instance) == 0) {
00293 CkpvAccess(ReductionMgr_instance) = this;
00294 } else {
00295 DebugM(1, "ReductionMgr::ReductionMgr() - another instance exists!\n");
00296 }
00297
00298 buildSpanTree(CkMyPe(),REDUCTION_MAX_CHILDREN,REDUCTION_MAX_CHILDREN,
00299 &myParent,&numChildren,&children);
00300
00301
00302
00303
00304
00305
00306
00307
00308
00309
00310 #if 0 // Old spanning tree
00311 if (CkMyPe() == 0) {
00312 myParent = -1;
00313 } else {
00314 myParent = (CkMyPe()-1)/REDUCTION_MAX_CHILDREN;
00315 }
00316 firstChild = CkMyPe()*REDUCTION_MAX_CHILDREN + 1;
00317 if (firstChild > CkNumPes()) firstChild = CkNumPes();
00318 lastChild = firstChild + REDUCTION_MAX_CHILDREN;
00319 if (lastChild > CkNumPes()) lastChild = CkNumPes();
00320 #endif
00321
00322
00323 for(int i=0; i<REDUCTION_MAX_SET_ID; i++) {
00324 reductionSets[i] = 0;
00325 }
00326
00327 DebugM(1,"ReductionMgr() instantiated.\n");
00328 }
00329
00330
00331 ReductionMgr::~ReductionMgr() {
00332 if (children != 0)
00333 delete [] children;
00334 for(int i=0; i<REDUCTION_MAX_SET_ID; i++) {
00335 delete reductionSets[i];
00336 }
00337
00338 }
00339
00340
00341 ReductionSet* ReductionMgr::getSet(int setID, int size) {
00342 if ( reductionSets[setID] == 0 ) {
00343 reductionSets[setID] = new ReductionSet(setID,size,numChildren);
00344 if ( ! isRoot() ) {
00345 ReductionRegisterMsg *msg = new ReductionRegisterMsg;
00346 msg->reductionSetID = setID;
00347 msg->dataSize = size;
00348 msg->sourceNode = CkMyPe();
00349 CProxy_ReductionMgr reductionProxy(thisgroup);
00350 reductionProxy[myParent].remoteRegister(msg);
00351 }
00352 }
00353 return reductionSets[setID];
00354 }
00355
00356
00357 void ReductionMgr::delSet(int setID) {
00358 ReductionSet *set = reductionSets[setID];
00359 if ( set && ! set->submitsRegistered & ! set->requireRegistered ) {
00360 if ( ! isRoot() ) {
00361 ReductionRegisterMsg *msg = new ReductionRegisterMsg;
00362 msg->reductionSetID = setID;
00363 msg->sourceNode = CkMyPe();
00364 CProxy_ReductionMgr reductionProxy(thisgroup);
00365 reductionProxy[myParent].remoteUnregister(msg);
00366 }
00367 delete set;
00368 reductionSets[setID] = 0;
00369 }
00370 }
00371
00372
00373 SubmitReduction* ReductionMgr::willSubmit(int setID, int size) {
00374 ReductionSet *set = getSet(setID, size);
00375 ReductionSetData *data = set->getData(set->nextSequenceNumber);
00376 if ( data->submitsRecorded ) {
00377 NAMD_die("ReductionMgr::willSubmit called while reductions outstanding!");
00378 }
00379
00380 set->submitsRegistered++;
00381
00382 SubmitReduction *handle = new SubmitReduction;
00383 handle->reductionSetID = setID;
00384 handle->sequenceNumber = set->nextSequenceNumber;
00385 handle->master = this;
00386 handle->data = data->data;
00387
00388 return handle;
00389 }
00390
00391
00392 void ReductionMgr::remove(SubmitReduction* handle) {
00393 int setID = handle->reductionSetID;
00394 ReductionSet *set = reductionSets[setID];
00395 if ( set->getData(set->nextSequenceNumber)->submitsRecorded ) {
00396 NAMD_die("SubmitReduction deleted while reductions outstanding!");
00397 }
00398
00399 set->submitsRegistered--;
00400
00401 delSet(setID);
00402 }
00403
00404
00405 void ReductionMgr::submit(SubmitReduction* handle) {
00406 int setID = handle->reductionSetID;
00407 int seqNum = handle->sequenceNumber;
00408 ReductionSet *set = reductionSets[setID];
00409 ReductionSetData *data = set->getData(seqNum);
00410
00411 data->submitsRecorded++;
00412 if ( data->submitsRecorded == set->submitsRegistered ) {
00413 mergeAndDeliver(set,seqNum);
00414 }
00415
00416 handle->sequenceNumber = ++seqNum;
00417 handle->data = set->getData(seqNum)->data;
00418 }
00419
00420
00421 void ReductionMgr::remoteRegister(ReductionRegisterMsg *msg) {
00422
00423 int setID = msg->reductionSetID;
00424 int size = msg->dataSize;
00425 ReductionSet *set = getSet(setID,size);
00426 if ( set->getData(set->nextSequenceNumber)->submitsRecorded ) {
00427 NAMD_die("ReductionMgr::remoteRegister called while reductions outstanding on parent!");
00428 }
00429
00430 set->submitsRegistered++;
00431 set->addToRemoteSequenceNumber[childIndex(msg->sourceNode)]
00432 = set->nextSequenceNumber;
00433
00434
00435
00436 delete msg;
00437 }
00438
00439
00440 void ReductionMgr::remoteUnregister(ReductionRegisterMsg *msg) {
00441
00442 int setID = msg->reductionSetID;
00443 ReductionSet *set = reductionSets[setID];
00444 if ( set->getData(set->nextSequenceNumber)->submitsRecorded ) {
00445 NAMD_die("SubmitReduction deleted while reductions outstanding on parent!");
00446 }
00447
00448 set->submitsRegistered--;
00449
00450 delSet(setID);
00451 delete msg;
00452 }
00453
00454
00455 void ReductionMgr::remoteSubmit(ReductionSubmitMsg *msg) {
00456 int setID = msg->reductionSetID;
00457 ReductionSet *set = reductionSets[setID];
00458 int seqNum = msg->sequenceNumber
00459 + set->addToRemoteSequenceNumber[childIndex(msg->sourceNode)];
00460
00461
00462 int size = msg->dataSize;
00463 if ( size != set->dataSize ) {
00464 NAMD_bug("ReductionMgr::remoteSubmit data sizes do not match.");
00465 }
00466
00467 BigReal *newData = msg->data;
00468 ReductionSetData *data = set->getData(seqNum);
00469 BigReal *curData = data->data;
00470 #ifdef ARCH_POWERPC
00471 #pragma disjoint (*curData, *newData)
00472 #pragma unroll(4)
00473 #endif
00474 for ( int i = 0; i < size; ++i ) {
00475 curData[i] += newData[i];
00476 }
00477
00478
00479 delete msg;
00480
00481 data->submitsRecorded++;
00482 if ( data->submitsRecorded == set->submitsRegistered ) {
00483 mergeAndDeliver(set,seqNum);
00484 }
00485 }
00486
00487
00488 void ReductionMgr::mergeAndDeliver(ReductionSet *set, int seqNum) {
00489
00490
00491
00492 set->nextSequenceNumber++;
00493
00494 ReductionSetData *data = set->getData(seqNum);
00495 if ( data->submitsRecorded != set->submitsRegistered ) {
00496 NAMD_bug("ReductionMgr::mergeAndDeliver not ready to deliver.");
00497 }
00498
00499 if ( isRoot() ) {
00500 if ( set->requireRegistered ) {
00501 if ( set->threadIsWaiting && set->waitingForSequenceNumber == seqNum) {
00502
00503 CthAwaken(set->waitingThread);
00504 }
00505 } else {
00506 NAMD_die("ReductionSet::deliver will never deliver data");
00507 }
00508 } else {
00509
00510 int size = set->dataSize;
00511 ReductionSubmitMsg *msg = new(&size,1) ReductionSubmitMsg;
00512 msg->reductionSetID = set->reductionSetID;
00513 msg->sourceNode = CkMyPe();
00514 msg->sequenceNumber = seqNum;
00515 msg->dataSize = set->dataSize;
00516 for ( int i = 0; i < msg->dataSize; ++i ) {
00517 msg->data[i] = data->data[i];
00518 }
00519 CProxy_ReductionMgr reductionProxy(thisgroup);
00520 reductionProxy[myParent].remoteSubmit(msg);
00521 delete set->removeData(seqNum);
00522 }
00523
00524 }
00525
00526
00527 RequireReduction* ReductionMgr::willRequire(int setID, int size) {
00528 ReductionSet *set = getSet(setID,size);
00529 set->requireRegistered++;
00530 if ( set->getData(set->nextSequenceNumber)->submitsRecorded ) {
00531 NAMD_die("ReductionMgr::willRequire called while reductions outstanding!");
00532 }
00533
00534 RequireReduction *handle = new RequireReduction;
00535 handle->reductionSetID = setID;
00536 handle->sequenceNumber = set->nextSequenceNumber;
00537 handle->master = this;
00538
00539 return handle;
00540 }
00541
00542
00543 void ReductionMgr::remove(RequireReduction* handle) {
00544 int setID = handle->reductionSetID;
00545 ReductionSet *set = reductionSets[setID];
00546 if ( set->getData(set->nextSequenceNumber)->submitsRecorded ) {
00547 NAMD_die("RequireReduction deleted while reductions outstanding!");
00548 }
00549
00550 set->requireRegistered--;
00551
00552 delSet(setID);
00553 }
00554
00555
00556 void ReductionMgr::require(RequireReduction* handle) {
00557 int setID = handle->reductionSetID;
00558 ReductionSet *set = reductionSets[setID];
00559 int seqNum = handle->sequenceNumber;
00560 ReductionSetData *data = set->getData(seqNum);
00561 if ( data->submitsRecorded < set->submitsRegistered ) {
00562 set->threadIsWaiting = 1;
00563 set->waitingForSequenceNumber = seqNum;
00564 set->waitingThread = CthSelf();
00565
00566 CthSuspend();
00567 }
00568 set->threadIsWaiting = 0;
00569
00570
00571 delete handle->currentData;
00572 handle->currentData = set->removeData(seqNum);
00573 handle->data = handle->currentData->data;
00574 handle->sequenceNumber = ++seqNum;
00575 }
00576
00577
00578 #include "ReductionMgr.def.h"
00579
00580