00001
00007
00008
00009
00010
00011
00012
00013
00014 #include <stdlib.h>
00015
00016 #include "InfoStream.h"
00017 #include "NamdCentLB.h"
00018 #include "NamdNborLB.h"
00019
00020 #include "HomePatch.h"
00021 #include "LdbCoordinator.decl.h"
00022 #include "LdbCoordinator.h"
00023 #include "NamdTypes.h"
00024 #include "Node.h"
00025 #include "SimParameters.h"
00026 #include "PatchMap.inl"
00027 #include "ComputeMap.h"
00028
00029 #define MIN_DEBUG_LEVEL 3
00030 #include "Debug.h"
00031 #include "Controller.h"
00032 #include "Sequencer.h"
00033 #include "RefineOnly.h"
00034 #include "ComputeMgr.h"
00035 #include "packmsg.h"
00036
00037 #include "elements.h"
00038 #include "ComputeMgr.decl.h"
00039
00040 #define DEBUG_LEVEL 4
00041
00042 #if CONVERSE_VERSION_ELAN
00043 extern "C" void enableBlockingReceives();
00044 extern "C" void disableBlockingReceives();
00045 #endif
00046
00047 void LdbCoordinator_initproc() {
00048 #if CHARM_VERSION >= 50804
00049
00050
00051
00052
00053
00054 LBSetPeriod(1.0e-5);
00055 #endif
00056 }
00057
00058 void LdbCoordinator::staticMigrateFn(LDObjHandle handle, int dest)
00059 {
00060 #if CHARM_VERSION > 050606
00061 LdbCoordinator *ldbCoordinator = (LdbCoordinator *)LDOMUserData(handle.omhandle);
00062 ldbCoordinator->Migrate(handle,dest);
00063 #else
00064 ((LdbCoordinator*)handle.omhandle.user_ptr)->Migrate(handle,dest);
00065 #endif
00066 }
00067
00068 void LdbCoordinator::Migrate(LDObjHandle handle, int dest)
00069 {
00070 LdbMigrateMsg* msg = new LdbMigrateMsg;
00071 msg->handle = handle;
00072 msg->from = CkMyPe();
00073 msg->to = dest;
00074 #if CHARM_VERSION > 050402
00075 CProxy_LdbCoordinator ldbProxy(thisgroup);
00076 ldbProxy[CkMyPe()].RecvMigrate(msg);
00077 #else
00078 CProxy_LdbCoordinator(thisgroup).RecvMigrate(msg,CkMyPe());
00079 #endif
00080 }
00081
00082 void LdbCoordinator::staticStatsFn(LDOMHandle h, int state)
00083 {
00084 CkPrintf("I'm supposed to set stats\n");
00085 }
00086
00087 void LdbCoordinator::staticQueryEstLoadFn(LDOMHandle h)
00088 {
00089 CkPrintf("I'm supposed to query load\n");
00090 }
00091
00092 void LdbCoordinator::staticReceiveAtSync(void* data)
00093 {
00094
00095 #if CONVERSE_VERSION_ELAN
00096
00097 #endif
00098
00099 ((LdbCoordinator*)data)->ReceiveAtSync();
00100 }
00101
00102 void LdbCoordinator::ReceiveAtSync()
00103 {
00104 theLbdb->RegisteringObjects(myHandle);
00105 }
00106
00107 void LdbCoordinator::staticResumeFromSync(void* data)
00108 {
00109 ((LdbCoordinator*)data)->ResumeFromSync();
00110 }
00111
00112 void LdbCoordinator::ResumeFromSync()
00113 {
00114 theLbdb->DoneRegisteringObjects(myHandle);
00115 #if CMK_PERSISTENT_COMM
00116 if (takingLdbData) {
00117
00118
00119 HomePatchList *hpl = PatchMap::Object()->homePatchList();
00120 ResizeArrayIter<HomePatchElem> ai(*hpl);
00121 for (ai=ai.begin(); ai != ai.end(); ai++) {
00122 HomePatch *patch = (*ai).patch;
00123 patch->destoryPersistComm();
00124 }
00125 }
00126 #endif
00127 CProxy_LdbCoordinator cl(thisgroup);
00128 #if CHARM_VERSION > 050402
00129 cl[0].nodeDone();
00130 #else
00131 cl.nodeDone(0);
00132 #endif
00133 }
00134
00135 LdbCoordinator::LdbCoordinator()
00136 {
00137 if (CkpvAccess(LdbCoordinator_instance) == NULL) {
00138 CkpvAccess(LdbCoordinator_instance) = this;
00139 } else {
00140 iout << iFILE << iERROR << iPE
00141 << "LdbCoordinator instanced twice on same node!" << endi;
00142 CkExit();
00143 }
00144
00145 #if 0
00146
00147 if (CkMyPe() == 0) {
00148
00149 CreateNamdCentLB();
00150
00151 }
00152 #endif
00153
00154 ldbCycleNum = 1;
00155 takingLdbData = 1;
00156 totalStepsDone = 0;
00157 nLocalComputes = nLocalPatches = 0;
00158 patchNAtoms = (int *) NULL;
00159 sequencerThreads = (Sequencer **) NULL;
00160 ldbStatsFP = NULL;
00161 computeArray = NULL;
00162 patchArray = NULL;
00163 processorArray = NULL;
00164
00165 nodesDone = 0;
00166
00167
00168 #if CHARM_VERSION > 050610
00169 theLbdb = LBDatabase::Object();
00170 #else
00171 theLbdb = CProxy_LBDatabase::ckLocalBranch(lbdb);
00172 #endif
00173
00174
00175
00176
00177
00178
00179
00180
00181 theLbdb->SetLBPeriod(1.0e-5);
00182
00183 #if CHARM_VERSION > 050403
00184 myOMid.id.idx = 1;
00185 #else
00186 myOMid.id = 1;
00187 #endif
00188 LDCallbacks cb = { (LDMigrateFn)staticMigrateFn,
00189 (LDStatsFn)staticStatsFn,
00190 (LDQueryEstLoadFn)staticQueryEstLoadFn
00191 };
00192 myHandle = theLbdb->RegisterOM(myOMid,(void*)this,cb);
00193
00194
00195
00196 theLbdb->AddLocalBarrierReceiver((LDBarrierFn)staticReceiveAtSync,
00197 (void*)this);;
00198
00199
00200 ldBarrierHandle = theLbdb->
00201 AddLocalBarrierClient((LDResumeFn)staticResumeFromSync,
00202 (void*)this);
00203 objHandles = 0;
00204 reg_all_objs = 1;
00205
00206 }
00207
00208 LdbCoordinator::~LdbCoordinator(void)
00209 {
00210 delete [] patchNAtoms;
00211 delete [] sequencerThreads;
00212 delete [] objHandles;
00213 if (CkMyPe() == 0)
00214 {
00215 delete [] computeArray;
00216 delete [] patchArray;
00217 delete [] processorArray;
00218 }
00219 if (ldbStatsFP)
00220 fclose(ldbStatsFP);
00221
00222 }
00223
00224 void LdbCoordinator::createLoadBalancer()
00225 {
00226 if (CkMyPe()==0)
00227 iout << "LDB: Measuring processor speeds ..." << endi;
00228 const SimParameters *simParams = Node::Object()->simParameters;
00229
00230
00231
00232 CreateNamdCentLB();
00233 if (CkMyPe()==0)
00234 iout << " Done.\n" << endi;
00235 }
00236
00237 void LdbCoordinator::initialize(PatchMap *pMap, ComputeMap *cMap, int reinit)
00238 {
00239 const SimParameters *simParams = Node::Object()->simParameters;
00240
00241 #if 0
00242 static int lbcreated = 0;
00243
00244 if (CkMyPe() == 0 && !lbcreated) {
00245 if (simParams->ldbStrategy == LDBSTRAT_ALGNBOR)
00246 CreateNamdNborLB();
00247 else {
00248
00249 CreateNamdCentLB();
00250 }
00251 lbcreated = 1;
00252 }
00253 #endif
00254
00255
00256 stepsPerLdbCycle = simParams->ldbPeriod;
00257 firstLdbStep = simParams->firstLdbStep;
00258 int lastLdbStep = simParams->lastLdbStep;
00259
00260 computeMap = cMap;
00261 patchMap = pMap;
00262
00263
00264
00265 nStatsMessagesExpected = Node::Object()->numNodes();
00266 nStatsMessagesReceived = 0;
00267
00268 if (patchNAtoms)
00269 delete [] patchNAtoms;
00270 nPatches = patchMap->numPatches();
00271 patchNAtoms = new int[nPatches];
00272
00273 typedef Sequencer *seqPtr;
00274
00275 if ( ! reinit ) {
00276 delete [] sequencerThreads;
00277 sequencerThreads = new seqPtr[nPatches];
00278 }
00279
00280 nLocalPatches=0;
00281
00282 int i;
00283 for(i=0;i<nPatches;i++)
00284 {
00285 if (patchMap->node(i) == Node::Object()->myid())
00286 {
00287 nLocalPatches++;
00288 patchNAtoms[i]=0;
00289 } else {
00290 patchNAtoms[i]=-1;
00291 }
00292 if ( ! reinit ) sequencerThreads[i]=NULL;
00293 }
00294 if ( ! reinit ) controllerThread = NULL;
00295 if (nLocalPatches != patchMap->numHomePatches())
00296 NAMD_die("Disaggreement in patchMap data.\n");
00297
00298 nLocalComputes = 0;
00299 numComputes = cMap->numComputes();
00300
00301 for(i=0;i<numComputes;i++) {
00302 if ( (computeMap->node(i) == Node::Object()->myid())
00303 && ( (computeMap->type(i) == computeNonbondedPairType)
00304 || (computeMap->type(i) == computeSelfBondsType)
00305 || (computeMap->type(i) == computeSelfAnglesType)
00306 || (computeMap->type(i) == computeSelfDihedralsType)
00307 || (computeMap->type(i) == computeSelfImpropersType)
00308 || (computeMap->type(i) == computeSelfCrosstermsType)
00309 || (computeMap->type(i) == computeNonbondedSelfType) ) ) {
00310 nLocalComputes++;
00311 }
00312 }
00313
00314
00315
00316
00317
00318
00319
00320 if (reg_all_objs) {
00321
00322
00323 theLbdb->RegisteringObjects(myHandle);
00324
00325 patchHandles = new LDObjHandle[nLocalPatches];
00326 int patch_count=0;
00327 int i;
00328 for(i=0;i<nPatches;i++)
00329 if (patchMap->node(i) == Node::Object()->myid()) {
00330 LDObjid elemID;
00331 elemID.id[0] = i;
00332 elemID.id[1] = elemID.id[2] = elemID.id[3] = -2;
00333
00334 if (patch_count >= nLocalPatches) {
00335 iout << iFILE << iERROR << iPE
00336 << "LdbCoordinator found too many local patches!" << endi;
00337 CkExit();
00338 }
00339 patchHandles[patch_count]
00340 = theLbdb->RegisterObj(myHandle,elemID,0,0);
00341 patch_count++;
00342 }
00343
00344
00345 if (objHandles == 0) {
00346 objHandles = new LDObjHandle[numComputes];
00347 for(i=0; i<numComputes; i++)
00348 objHandles[i].id.id[0] = -1;
00349
00350
00351 for(i=0; i<numComputes; i++) {
00352 if ( (computeMap->node(i) == Node::Object()->myid())
00353 && ( (computeMap->type(i) == computeNonbondedPairType)
00354 || (computeMap->type(i) == computeSelfBondsType)
00355 || (computeMap->type(i) == computeSelfAnglesType)
00356 || (computeMap->type(i) == computeSelfDihedralsType)
00357 || (computeMap->type(i) == computeSelfImpropersType)
00358 || (computeMap->type(i) == computeSelfCrosstermsType)
00359 || (computeMap->type(i) == computeNonbondedSelfType) ) ) {
00360
00361
00362 LDObjid elemID;
00363 elemID.id[0] = i;
00364
00365 if (cMap->numPids(i) > 2)
00366 elemID.id[3] = cMap->pid(i,2);
00367 else elemID.id[3] = -1;
00368
00369 if (cMap->numPids(i) > 1)
00370 elemID.id[2] = cMap->pid(i,1);
00371 else elemID.id[2] = -1;
00372
00373 if (cMap->numPids(i) > 0)
00374 elemID.id[1] = cMap->pid(i,0);
00375 else elemID.id[1] = -1;
00376
00377 objHandles[i] = theLbdb->RegisterObj(myHandle,elemID,0,1);
00378 }
00379 }
00380 }
00381 theLbdb->DoneRegisteringObjects(myHandle);
00382 reg_all_objs = 0;
00383 }
00384
00385
00386
00387
00388
00389
00390 if (ldbCycleNum==1)
00391 {
00392 totalStepsDone += firstLdbStep;
00393 numStepsToRun = firstLdbStep;
00394 takingLdbData = 0;
00395 theLbdb->CollectStatsOff();
00396 }
00397 else if ( (ldbCycleNum <= 4) || !takingLdbData )
00398 {
00399 totalStepsDone += firstLdbStep;
00400 if(lastLdbStep != -1 && totalStepsDone > lastLdbStep) {
00401 numStepsToRun = -1;
00402 takingLdbData = 0;
00403 theLbdb->CollectStatsOff();
00404 } else {
00405 numStepsToRun = firstLdbStep;
00406 takingLdbData = 1;
00407 theLbdb->CollectStatsOn();
00408 }
00409 }
00410 else
00411 {
00412 totalStepsDone += stepsPerLdbCycle - firstLdbStep;
00413 if(lastLdbStep != -1 && totalStepsDone > lastLdbStep) {
00414 numStepsToRun = -1;
00415 takingLdbData = 0;
00416 theLbdb->CollectStatsOff();
00417 } else {
00418 numStepsToRun = stepsPerLdbCycle - firstLdbStep;
00419 takingLdbData = 0;
00420 theLbdb->CollectStatsOff();
00421 }
00422 }
00423
00424
00425
00426
00427
00428
00429
00430
00431
00432
00433
00434
00435
00436
00437
00438
00439
00440
00441
00442
00443
00444
00445
00446
00447
00448
00449
00450
00451
00452 #if CHARM_VERSION >= 050606
00453 if (traceAvailable()) {
00454 static int specialTracing = 0;
00455 if (ldbCycleNum == 1 && traceIsOn() == 0) specialTracing = 1;
00456 if (specialTracing) {
00457 if (ldbCycleNum == 4) traceBegin();
00458 if (ldbCycleNum == 8) traceEnd();
00459 }
00460 }
00461 #endif
00462
00463 nPatchesReported = 0;
00464 nPatchesExpected = nLocalPatches;
00465 nComputesReported = 0;
00466 nComputesExpected = nLocalComputes * numStepsToRun;
00467 controllerReported = 0;
00468 controllerExpected = ! CkMyPe();
00469
00470 if (CkMyPe() == 0)
00471 {
00472 if (computeArray == NULL)
00473 computeArray = new computeInfo[numComputes];
00474 if (patchArray == NULL)
00475 patchArray = new patchInfo[nPatches];
00476 if (processorArray == NULL)
00477 processorArray = new processorInfo[CkNumPes()];
00478 }
00479
00480 theLbdb->ClearLoads();
00481 }
00482
00483 void LdbCoordinator::patchLoad(PatchID id, int nAtoms, int )
00484 {
00485 CmiAssert( id >=0 && id < nPatches);
00486 if (patchNAtoms[id] != -1) {
00487 patchNAtoms[id] = nAtoms;
00488 nPatchesReported++;
00489 } else {
00490 DebugM(10, "::patchLoad() Unexpected patch reporting in\n");
00491 }
00492 }
00493
00494 void LdbCoordinator::startWork(ComputeID id, int )
00495 {
00496 CmiAssert(id >=0 && id < numComputes);
00497 theLbdb->ObjectStart(objHandles[id]);
00498 }
00499
00500 void LdbCoordinator::endWork(ComputeID id, int )
00501 {
00502 CmiAssert(id >=0 && id < numComputes);
00503 theLbdb->ObjectStop(objHandles[id]);
00504 nComputesReported++;
00505 }
00506
00507 void LdbCoordinator::rebalance(Sequencer *seq, PatchID pid)
00508 {
00509 if (Node::Object()->simParameters->ldbStrategy == LDBSTRAT_NONE)
00510 return;
00511
00512 sequencerThreads[pid] = seq;
00513 seq->suspend();
00514 }
00515
00516 void LdbCoordinator::rebalance(Controller *c)
00517 {
00518 if (Node::Object()->simParameters->ldbStrategy == LDBSTRAT_NONE)
00519 return;
00520
00521 iout << "LDB: ============= START OF LOAD BALANCING ============== " << CmiWallTimer() << "\n" << endi;
00522 DebugM(3, "Controller reached load balance barrier.\n");
00523 controllerReported = 1;
00524 controllerThread = c;
00525
00526 CProxy_LdbCoordinator(thisgroup).barrier();
00527
00528 CthSuspend();
00529 }
00530
00531 void LdbCoordinator::barrier(void)
00532 {
00533 if ( (nPatchesReported != nPatchesExpected)
00534 || (nComputesReported != nComputesExpected)
00535 || (controllerReported != controllerExpected) )
00536 {
00537 NAMD_bug("Load balancer received wrong number of events.\n");
00538 }
00539
00540 theLbdb->AtLocalBarrier(ldBarrierHandle);
00541 }
00542
00543 void LdbCoordinator::nodeDone(void)
00544 {
00545 nodesDone++;
00546
00547 if (nodesDone==Node::Object()->numNodes()) {
00548 iout << "LDB: ============== END OF LOAD BALANCING =============== " << CmiWallTimer() << "\n\n" << endi;
00549 nodesDone=0;
00550 ExecuteMigrations();
00551 }
00552 }
00553
00554 void LdbCoordinator::ExecuteMigrations(void)
00555 {
00556
00557
00558
00559
00560 CProxy_ComputeMgr cm(CkpvAccess(BOCclass_group).computeMgr);
00561 ComputeMgr *computeMgr = cm.ckLocalBranch();
00562 #if CHARM_VERSION > 050402
00563 computeMgr->updateComputes(CkIndex_LdbCoordinator::
00564 updateComputesReady(),thisgroup);
00565 #else
00566 computeMgr->updateComputes(CProxy_LdbCoordinator::
00567 ckIdx_updateComputesReady(),thisgroup);
00568 #endif
00569 }
00570
00571 void LdbCoordinator::RecvMigrate(LdbMigrateMsg* m)
00572 {
00573
00574
00575 const int id = m->handle.id.id[0];
00576
00577 theLbdb->UnregisterObj(objHandles[id]);
00578 objHandles[id].id.id[0] = -1;
00579
00580 #if CHARM_VERSION > 050402
00581 CProxy_LdbCoordinator ldbProxy(thisgroup);
00582 ldbProxy[m->to].ExpectMigrate(m);
00583 #else
00584 CProxy_LdbCoordinator(thisgroup).ExpectMigrate(m,m->to);
00585 #endif
00586 }
00587
00588 void LdbCoordinator::ExpectMigrate(LdbMigrateMsg* m)
00589 {
00590 objHandles[m->handle.id.id[0]]
00591 = theLbdb->RegisterObj(myHandle,m->handle.id,0,1);
00592
00593 theLbdb->Migrated(objHandles[m->handle.id.id[0]]);
00594
00595 delete m;
00596 }
00597
00598 void LdbCoordinator::updateComputesReady() {
00599 DebugM(3,"updateComputesReady()\n");
00600
00601 CProxy_LdbCoordinator(thisgroup).resume();
00602 #if CHARM_VERSION > 050402
00603 CkStartQD(CkIndex_LdbCoordinator::resumeReady((CkQdMsg*)0),&thishandle);
00604 #else
00605 CkStartQD(CProxy_LdbCoordinator::ckIdx_resumeReady((CkQdMsg*)0),&thishandle);
00606 #endif
00607 }
00608
00609 void LdbCoordinator::resume(void)
00610 {
00611 DebugM(3,"resume()\n");
00612
00613
00614
00615 ldbCycleNum++;
00616 initialize(PatchMap::Object(),ComputeMap::Object(),1);
00617 }
00618
00619 void LdbCoordinator::resumeReady(CkQdMsg *msg) {
00620
00621 DebugM(3,"resumeReady()\n");
00622 delete msg;
00623
00624 CProxy_LdbCoordinator(thisgroup).resume2();
00625 }
00626
00627 void LdbCoordinator::resume2(void)
00628 {
00629 DebugM(3,"resume2()\n");
00630
00631 #if CONVERSE_VERSION_ELAN
00632
00633 #endif
00634
00635 awakenSequencers();
00636 }
00637
00638 void LdbCoordinator::awakenSequencers()
00639 {
00640 if (controllerThread)
00641 {
00642 controllerThread->awaken();
00643 controllerThread = NULL;
00644 }
00645 for(int i=0; i < patchMap->numPatches(); i++)
00646 {
00647 if (sequencerThreads[i])
00648 {
00649 sequencerThreads[i]->awaken();
00650 }
00651 sequencerThreads[i]= NULL;
00652 }
00653 }
00654
00655
00656
00657
00658
00659 int LdbCoordinator::requiredProxies(PatchID id, int neighborNodes[])
00660 {
00661 enum proxyHere { No, Yes };
00662 int numNodes = CkNumPes();
00663 proxyHere *proxyNodes = new proxyHere[numNodes];
00664 int nProxyNodes;
00665 int i;
00666
00667
00668 for ( i = 0; i < numNodes; ++i )
00669 {
00670 proxyNodes[i] = No;
00671 }
00672 nProxyNodes=0;
00673
00674
00675
00676
00677 PatchID neighbors[1 + PatchMap::MaxOneAway + PatchMap::MaxTwoAway];
00678
00679 int myNode = patchMap->node(id);
00680 neighbors[0] = id;
00681 int numNeighbors = 1 + patchMap->downstreamNeighbors(id,neighbors+1);
00682 for ( i = 0; i < numNeighbors; ++i )
00683 {
00684 const int proxyNode = patchMap->basenode(neighbors[i]);
00685 if (proxyNode != myNode)
00686 if (proxyNodes[proxyNode] == No)
00687 {
00688 proxyNodes[proxyNode] = Yes;
00689 neighborNodes[nProxyNodes] = proxyNode;
00690 nProxyNodes++;
00691 }
00692 }
00693
00694 delete [] proxyNodes;
00695 return nProxyNodes;
00696 }
00697
00698 void LdbCoordinator::printLocalLdbReport(void)
00699 {
00700 char outputBuf[255];
00701 char *curLoc;
00702
00703 CkPrintf("%d:Patch report:\n",CkMyPe());
00704
00705 curLoc = outputBuf;
00706 int i,j=0;
00707 for(i=0; i<patchMap->numPatches(); i++)
00708 {
00709 if (patchNAtoms[i] != -1)
00710 {
00711 curLoc += sprintf(curLoc,"%5d: %5d ",i,patchNAtoms[i]);
00712 j++;
00713 }
00714 if (((j % 4) == 0) && j)
00715 {
00716 curLoc = outputBuf;
00717 CkPrintf("[%d]%s\n",CkMyPe(),outputBuf);
00718 j=0;
00719 }
00720 }
00721
00722 CkPrintf("%d:Compute report:\n",CkMyPe());
00723
00724 curLoc = outputBuf;
00725 j=0;
00726 }
00727
00728 void LdbCoordinator::printRequiredProxies(PatchID id, FILE *fp)
00729 {
00730
00731
00732
00733 int neighborNodes[PatchMap::MaxOneAway + PatchMap::MaxTwoAway];
00734 const int nProxyNodes = requiredProxies(id,neighborNodes);
00735
00736 fprintf(fp,"%4d ",nProxyNodes);
00737
00738 for(int i=0;i<nProxyNodes;i++)
00739 fprintf(fp,"%4d ",neighborNodes[i]);
00740 }
00741
00742 #include "LdbCoordinator.def.h"