00001
00007
00008
00009
00010
00011
00012
00013
00014 #include "InfoStream.h"
00015 #include "Node.h"
00016 #include "Rebalancer.h"
00017 #include "ProxyMgr.h"
00018 #include "PatchMap.h"
00019 #include "memusage.h"
00020 #include <iomanip>
00021
00022 #define ST_NODE_LOAD 0.005
00023 #define PROXY_LOAD 0.001
00024 #define COMPUTE_LOAD 0.00005
00025
00026 Rebalancer::Rebalancer(computeInfo *computeArray, patchInfo *patchArray,
00027 processorInfo *processorArray, int nComps, int nPatches, int nPes)
00028 {
00029 bytesPerAtom = 32;
00030 strategyName = "None";
00031 computes = computeArray;
00032 patches = patchArray;
00033 processors = processorArray;
00034 numComputes = nComps;
00035 numPatches = nPatches;
00036 P = nPes;
00037 pes = NULL;
00038 computePairHeap = NULL;
00039 computeSelfHeap = NULL;
00040 computeBgPairHeap = NULL;
00041 computeBgSelfHeap = NULL;
00042 overLoad = 0.;
00043 numPesAvailable = 0;
00044 firstAssignInRefine = 0;
00045
00046 const int beginGroup = processors[0].Id;
00047 const int endGroup = beginGroup + P;
00048 #define INGROUP(PROC) ((PROC) >= beginGroup && (PROC) < endGroup)
00049
00050 int i;
00051 int index;
00052 for (i=0; i<P; i++)
00053 {
00054
00055
00056
00057 processors[i].load = processors[i].backgroundLoad;
00058 processors[i].computeLoad = 0;
00059 if (processors[i].available) {
00060 numPesAvailable += 1;
00061 }
00062 }
00063
00064 for (i=0; i<nPatches; i++) {
00065
00066 if INGROUP(patches[i].processor) {
00067 index = patches[i].processor - beginGroup;
00068 if (!patches[i].proxiesOn.find(&(processors[index]))) {
00069 patches[i].proxiesOn.unchecked_insert(&(processors[index]));
00070 processors[index].proxies.unchecked_insert(&(patches[i]));
00071 }
00072 processors[index].patchSet.unchecked_insert(&patches[i]);
00073 }
00074 }
00075
00076 InitProxyUsage();
00077
00078 for (i=0; i<numComputes; i++)
00079 computeArray[i].processor = -1;
00080
00081 for (i=0; i < numComputes; i++) {
00082
00083 if INGROUP(computes[i].oldProcessor) {
00084 index = computes[i].oldProcessor - beginGroup;
00085 processors[index].computeLoad += computes[i].load;
00086 }
00087 }
00088
00089
00090
00091 float *temploads = new float[P];
00092 for(i=0; i<P; i++)
00093 {
00094 temploads[i] = processors[i].load;
00095 processors[i].load += processors[i].computeLoad;
00096 }
00097
00098 origMaxLoad = computeMax();
00099
00100
00101 printLoads();
00102
00103 for(i=0;i<P; i++)
00104 {
00105 processors[i].load = temploads[i];
00106 processors[i].computeLoad = 0;
00107 }
00108
00109 delete [] temploads;
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129
00130
00131
00132
00133
00134
00135 }
00136
00137 Rebalancer::~Rebalancer()
00138 {
00139 if ( computeMax() > origMaxLoad ) {
00140 iout << "LDB:";
00141 if ( P != CkNumPes() ) {
00142 int w = 1;
00143 int maxinw = 10;
00144 while ( maxinw < CkNumPes() ) {
00145 ++w;
00146 maxinw = 10*maxinw;
00147 }
00148 iout << " PES " <<
00149 std::setw(w) << std::right << processors[0].Id << "-" <<
00150 std::setw(w) << std::left << processors[P-1].Id <<
00151 std::right;
00152 }
00153 iout << " Reverting to original mapping\n" << endi;
00154 fflush(stdout);
00155 const int beginGroup = processors[0].Id;
00156 const int endGroup = beginGroup + P;
00157 for (int i=0; i < numComputes; i++) {
00158
00159 if INGROUP(computes[i].oldProcessor) {
00160 computes[i].processor = computes[i].oldProcessor;
00161 }
00162 }
00163 }
00164
00165
00166
00167 delete pes;
00168 delete computePairHeap;
00169 delete computeSelfHeap;
00170 delete computeBgPairHeap;
00171 delete computeBgSelfHeap;
00172 }
00173
00174
00175
00176
00177 void Rebalancer::InitProxyUsage()
00178 {
00179 int i;
00180 numProxies = 0;
00181
00182 for(i=0; i<P; i++) {
00183
00184
00185
00186
00187
00188
00189 Iterator nextCompute;
00190 nextCompute.id = 0;
00191
00192 computeInfo *c = (computeInfo *)
00193 processors[i].computeSet.iterator((Iterator *)&nextCompute);
00194
00195 while(c)
00196 {
00197
00198 proxyUsage.increment (i, c->patch1);
00199
00200 proxyUsage.increment (i, c->patch2);
00201
00202
00203
00204
00205
00206
00207
00208
00209 nextCompute.id++;
00210 c = (computeInfo *) processors[i].computeSet.next((Iterator *)&nextCompute);
00211 }
00212 }
00213
00214 for (i=0; i<numPatches; i++)
00215 {
00216 numProxies += ( patches[i].proxiesOn.numElements() - 1 );
00217 Iterator nextProc;
00218 processorInfo *p = (processorInfo *)patches[i].proxiesOn.iterator((Iterator *)&nextProc);
00219 while (p) {
00220
00221 proxyUsage.increment (p->Id, i);
00222 p = (processorInfo *)patches[i].proxiesOn.next((Iterator*)&nextProc);
00223 }
00224 }
00225
00226 }
00227
00228
00229 void Rebalancer::strategy()
00230 {
00231 iout << iINFO << "Strategy not implemented for the base class.\n" << "\n";
00232 }
00233
00234 void Rebalancer::makeHeaps()
00235 {
00236 int i, j;
00237
00238 delete pes;
00239 pes = new minHeap(P+2);
00240 for (i=0; i<P; i++)
00241 pes->insert((InfoRecord *) &(processors[i]));
00242
00243 delete computePairHeap;
00244 delete computeSelfHeap;
00245 delete computeBgPairHeap;
00246 delete computeBgSelfHeap;
00247
00248 double bgLoadLimit = 0.5 * averageLoad;
00249
00250
00251
00252
00253
00254
00255
00256
00257
00258 int numSelfComputes, numPairComputes, numBgSelfComputes, numBgPairComputes;
00259
00260 while ( 1 ) {
00261 numSelfComputes = 0;
00262 numPairComputes = 0;
00263 numBgSelfComputes = 0;
00264 numBgPairComputes = 0;
00265 for (i=0; i<numComputes; i++) {
00266 int pa1 = computes[i].patch1;
00267 int pa2 = computes[i].patch2;
00268 if ( pa1 == pa2 ) {
00269 if ( processors[patches[pa1].processor].backgroundLoad > bgLoadLimit) {
00270 ++numBgSelfComputes;
00271 } else {
00272 ++numSelfComputes;
00273 }
00274 } else {
00275 if ( processors[patches[pa1].processor].backgroundLoad > bgLoadLimit
00276 || processors[patches[pa2].processor].backgroundLoad > bgLoadLimit) {
00277 ++numBgPairComputes;
00278 } else {
00279 ++numPairComputes;
00280 }
00281 }
00282 }
00283
00284 int numBgComputes = numBgPairComputes + numBgSelfComputes;
00285
00286
00287
00288
00289
00290
00291 if ( numBgComputes < 0.3 * numComputes ) break;
00292 else bgLoadLimit += 0.1 * averageLoad;
00293 }
00294
00295 computePairHeap = new maxHeap(numPairComputes+2);
00296 computeSelfHeap = new maxHeap(numSelfComputes+2);
00297 computeBgPairHeap = new maxHeap(numBgPairComputes+2);
00298 computeBgSelfHeap = new maxHeap(numBgSelfComputes+2);
00299
00300 for (i=0; i<numComputes; i++) {
00301 int pa1 = computes[i].patch1;
00302 int pa2 = computes[i].patch2;
00303 if ( pa1 == pa2 ) {
00304 if ( processors[patches[pa1].processor].backgroundLoad > bgLoadLimit) {
00305 computeBgSelfHeap->insert( (InfoRecord *) &(computes[i]));
00306 } else {
00307 computeSelfHeap->insert( (InfoRecord *) &(computes[i]));
00308 }
00309 } else {
00310 if ( processors[patches[pa1].processor].backgroundLoad > bgLoadLimit
00311 || processors[patches[pa2].processor].backgroundLoad > bgLoadLimit) {
00312 computeBgPairHeap->insert( (InfoRecord *) &(computes[i]));
00313 } else {
00314 computePairHeap->insert( (InfoRecord *) &(computes[i]));
00315 }
00316 }
00317 }
00318
00319
00320
00321
00322
00323
00324
00325
00326
00327
00328
00329
00330
00331
00332
00333
00334
00335
00336 }
00337
00338 void Rebalancer::makeTwoHeaps()
00339 {
00340 int i, j;
00341
00342 delete pes;
00343 pes = new minHeap(P+2);
00344 for (i=0; i<P; i++)
00345 pes->insert((InfoRecord *) &(processors[i]));
00346
00347 delete computePairHeap;
00348 delete computeSelfHeap;
00349 delete computeBgPairHeap;
00350 delete computeBgSelfHeap;
00351
00352 int numSelfComputes, numPairComputes;
00353
00354 numSelfComputes = 0;
00355 numPairComputes = 0;
00356 for (i=0; i<numComputes; i++) {
00357 int pa1 = computes[i].patch1;
00358 int pa2 = computes[i].patch2;
00359 if (pa1 == pa2)
00360 ++numSelfComputes;
00361 else
00362 ++numPairComputes;
00363 }
00364
00365 computePairHeap = new maxHeap(numPairComputes+2);
00366 computeSelfHeap = new maxHeap(numSelfComputes+2);
00367
00368 for (i=0; i<numComputes; i++) {
00369 int pa1 = computes[i].patch1;
00370 int pa2 = computes[i].patch2;
00371 if ( pa1 == pa2 )
00372 computeSelfHeap->insert( (InfoRecord *) &(computes[i]));
00373 else
00374 computePairHeap->insert( (InfoRecord *) &(computes[i]));
00375 }
00376 }
00377
00378
00379
00380
00381
00382
00383
00384 void Rebalancer::assign(computeInfo *c, processorInfo *p)
00385 {
00386 c->processor = p->Id;
00387 p->computeSet.unchecked_insert((InfoRecord *) c);
00388 #if COMPUTE_CORRECTION
00389 if(firstAssignInRefine)
00390 p->computeLoad += c->load + COMPUTE_LOAD;
00391 else
00392 #endif
00393 p->computeLoad += c->load;
00394
00395 p->load = p->computeLoad + p->backgroundLoad;
00396 patchInfo* patch1 = (patchInfo *) &(patches[c->patch1]);
00397 patchInfo* patch2 = (patchInfo *) &(patches[c->patch2]);
00398
00399 if (!patch1->proxiesOn.find(p)) {
00400 p->proxies.unchecked_insert(patch1);
00401 patch1->proxiesOn.unchecked_insert(p);
00402 numProxies++;
00403 #if PROXY_CORRECTION
00404 if(firstAssignInRefine) {
00405 processors[p->Id].load += PROXY_LOAD;
00406 processors[p->Id].backgroundLoad += PROXY_LOAD;
00407 }
00408 #endif
00409 }
00410
00411 if (!patch2->proxiesOn.find(p)) {
00412 p->proxies.unchecked_insert(patch2);
00413 patch2->proxiesOn.unchecked_insert(p);
00414 numProxies++;
00415 #if PROXY_CORRECTION
00416 if(firstAssignInRefine) {
00417 processors[p->Id].load += PROXY_LOAD;
00418 processors[p->Id].backgroundLoad += PROXY_LOAD;
00419 }
00420 #endif
00421 }
00422
00423
00424
00425
00426 proxyUsage.increment (p->Id, c->patch1);
00427
00428 proxyUsage.increment (p->Id, c->patch2);
00429
00430
00431
00432
00433
00434
00435
00436
00437 #if 0
00438 iout << "Assign " << c->Id << " patches " << c->patch1 << " " << c->patch2
00439 << " load " << c->load << " to " << p->Id << " new load "
00440 << p->load << " background " << p->backgroundLoad
00441 << " nPatches " << nPatches << " nProxies " << nProxies;
00442 if ( nPatches + nProxies < 2 ) iout << " addProxy";
00443 if ( badForComm ) iout << " badForComm";
00444 iout << "\n" << endi;
00445 #endif
00446 }
00447
00448 void Rebalancer::deAssign(computeInfo *c, processorInfo *p)
00449 {
00450 if (!p->computeSet.remove(c)) {
00451 iout << iINFO << "ERROR: Rebalancer tried to deAssign an object that is not on the processor.\n" << endi;
00452 return;
00453 }
00454
00455 double temp_load = 0.0;
00456
00457 c->processor = -1;
00458 p->computeLoad -= c->load;
00459 CmiAssert(p->computeLoad >= 0.0);
00460 temp_load = p->load - c->load;
00461 p->load = p->computeLoad + p->backgroundLoad;
00462 CmiAssert( fabs(temp_load - p->load) < 0.001 );
00463
00464
00465
00466
00467
00468
00469 proxyUsage.decrement (p->Id, c->patch1);
00470
00471 proxyUsage.decrement (p->Id, c->patch2);
00472
00473
00474
00475
00476
00477
00478
00479
00480 if(proxyUsage.getVal(p->Id, c->patch1) <= 0 && p->Id != patches[c->patch1].processor)
00481 {
00482
00483
00484
00485
00486 patchInfo* patch1 = (patchInfo *) &(patches[c->patch1]);
00487 p->proxies.remove(patch1);
00488 patch1->proxiesOn.remove(p);
00489 numProxies--;
00490 #if PROXY_CORRECTION
00491 if(firstAssignInRefine) {
00492 processors[p->Id].load -= PROXY_LOAD;
00493 processors[p->Id].backgroundLoad -= PROXY_LOAD;
00494 if(processors[p->Id].backgroundLoad < 0.0) {
00495 processors[p->Id].backgroundLoad = 0.0;
00496 processors[p->Id].load += PROXY_LOAD;
00497 }
00498 }
00499 #endif
00500 }
00501
00502
00503 if(proxyUsage.getVal(p->Id, c->patch2) <= 0 && p->Id != patches[c->patch2].processor)
00504 {
00505
00506
00507
00508
00509 patchInfo* patch2 = (patchInfo *) &(patches[c->patch2]);
00510 p->proxies.remove(patch2);
00511 patch2->proxiesOn.remove(p);
00512 numProxies--;
00513 #if PROXY_CORRECTION
00514 if(firstAssignInRefine) {
00515 processors[p->Id].load -= PROXY_LOAD;
00516 processors[p->Id].backgroundLoad -= PROXY_LOAD;
00517 if(processors[p->Id].backgroundLoad < 0.0) {
00518 processors[p->Id].backgroundLoad = 0.0;
00519 processors[p->Id].load += PROXY_LOAD;
00520 }
00521 }
00522 #endif
00523 }
00524 }
00525
00526 void Rebalancer::refine_togrid(pcgrid &grid, double thresholdLoad,
00527 processorInfo *p, computeInfo *c) {
00528
00529 if(p->available == false) return;
00530
00531 if ( c->load + p->load < thresholdLoad) {
00532 int nPatches, nProxies, badForComm;
00533 numAvailable(c,p,&nPatches,&nProxies,&badForComm);
00534
00535
00536
00537 pcpair *pair = &grid[nPatches][nProxies][badForComm];
00538
00539 if (! pair->c) {
00540 pair->c = c;
00541 pair->p = p;
00542 } else {
00543 double newval = p->load - c->load;
00544 if ( c->load + p->load < averageLoad ) {
00545 newval -= averageLoad;
00546 }
00547 double oldval = pair->p->load - pair->c->load;
00548 if ( pair->c->load + pair->p->load < averageLoad ) {
00549 oldval -= averageLoad;
00550 }
00551 if (newval < oldval) {
00552 pair->c = c;
00553 pair->p = p;
00554 }
00555 }
00556 }
00557 }
00558
00559 int Rebalancer::refine()
00560 {
00561 int finish = 1;
00562 int no_new_proxies = 0;
00563 maxHeap *heavyProcessors = new maxHeap(P);
00564
00565 IRSet *lightProcessors = new IRSet();
00566 int i;
00567 double thresholdLoad = overLoad * averageLoad;
00568 for (i=0; i<P; i++)
00569 {
00570
00571
00572
00573 if (processors[i].load > thresholdLoad)
00574 heavyProcessors->insert((InfoRecord *) &(processors[i]));
00575 else lightProcessors->insert((InfoRecord *) &(processors[i]));
00576 }
00577
00578 #if LDB_DEBUG
00579 iout << "\nBefore Refinement Summary" << "\n";
00580 printSummary();
00581 #endif
00582
00583 int done = 0;
00584 while (!done)
00585 {
00586
00587
00588
00589
00590
00591
00592
00593
00594
00595
00596
00597
00598
00599
00600
00601
00602
00603
00604
00605 processorInfo *donor;
00606 while (donor = (processorInfo*)heavyProcessors->deleteMax()) {
00607 if (donor->computeSet.hasElements()) break;
00608 if ( ! no_new_proxies ) {
00609
00610
00611
00612
00613
00614
00615 no_new_proxies = 1;
00616 }
00617 }
00618
00619 if (!donor) break;
00620
00621 pcgrid grid;
00622 #define REASSIGN(GRID) if (GRID.c) { \
00623 deAssign(GRID.c, donor); \
00624 assign(GRID.c, GRID.p); \
00625 bestP = GRID.p; \
00626 }
00627
00628
00629 {
00630 Iterator nextCompute;
00631 nextCompute.id = 0;
00632 computeInfo *c = (computeInfo *)
00633 donor->computeSet.iterator((Iterator *)&nextCompute);
00634 while (c)
00635 {
00636 Iterator nextProc;
00637 processorInfo *p;
00638
00639 p = &processors[patches[c->patch1].processor];
00640 refine_togrid(grid, thresholdLoad, p, c);
00641
00642 if (c->patch1 != c->patch2)
00643 {
00644 p = &processors[patches[c->patch2].processor];
00645 refine_togrid(grid, thresholdLoad, p, c);
00646 }
00647
00648 p = (processorInfo *)patches[c->patch1].
00649 proxiesOn.iterator((Iterator *)&nextProc);
00650 while (p) {
00651 refine_togrid(grid, thresholdLoad, p, c);
00652 p = (processorInfo *)patches[c->patch1].
00653 proxiesOn.next((Iterator*)&nextProc);
00654 }
00655
00656 if (c->patch1 != c->patch2)
00657 {
00658 p = (processorInfo *)patches[c->patch2].
00659 proxiesOn.iterator((Iterator *)&nextProc);
00660 while (p) {
00661 refine_togrid(grid, thresholdLoad, p, c);
00662 p = (processorInfo *)patches[c->patch2].
00663 proxiesOn.next((Iterator*)&nextProc);
00664 }
00665 }
00666
00667 nextCompute.id++;
00668 c = (computeInfo *) donor->computeSet.
00669 next((Iterator *)&nextCompute);
00670 }
00671 processorInfo* bestP = 0;
00672
00673 REASSIGN(grid[0][2][0])
00674 else REASSIGN(grid[1][1][0])
00675 else REASSIGN(grid[2][0][0])
00676 else if ( no_new_proxies ) { finish = 0; break; }
00677 else REASSIGN(grid[0][1][0])
00678 else REASSIGN(grid[1][0][0])
00679 else REASSIGN(grid[0][0][0])
00680
00681
00682
00683 if (bestP) {
00684 if (bestP->load > averageLoad) lightProcessors->remove(bestP);
00685 if (donor->load > thresholdLoad)
00686 heavyProcessors->insert((InfoRecord *) donor);
00687 else lightProcessors->insert((InfoRecord *) donor);
00688 continue;
00689 }
00690 }
00691
00692 if ( no_new_proxies ) iout << iINFO
00693 << "ERROR: Rebalancer::refine() algorithm is broken.\n" << endi;
00694
00695
00696
00697
00698 Iterator nextProcessor;
00699 processorInfo *p = (processorInfo *)
00700 lightProcessors->iterator((Iterator *) &nextProcessor);
00701
00702 while (p)
00703 {
00704 Iterator nextCompute;
00705 nextCompute.id = 0;
00706 computeInfo *c = (computeInfo *)
00707 donor->computeSet.iterator((Iterator *)&nextCompute);
00708 while (c)
00709 {
00710 #if USE_TOPOMAP
00711 int flag = tmgr.areNeighbors(p->Id, patches[c->patch1].processor,
00712 patches[c->patch2].processor, 8);
00713 if(flag)
00714 #endif
00715 {
00716 refine_togrid(grid, thresholdLoad, p, c);
00717 }
00718 nextCompute.id++;
00719 c = (computeInfo *) donor->computeSet.
00720 next((Iterator *)&nextCompute);
00721 }
00722 p = (processorInfo *)
00723 lightProcessors->next((Iterator *) &nextProcessor);
00724 }
00725
00726
00727
00728 {
00729 processorInfo* bestP = 0;
00730 REASSIGN(grid[0][2][0])
00731 else REASSIGN(grid[1][1][0])
00732 else REASSIGN(grid[2][0][0])
00733 else REASSIGN(grid[0][1][0])
00734 else REASSIGN(grid[1][0][0])
00735 else REASSIGN(grid[0][0][0])
00736
00737
00738
00739 else { finish = 0; break; }
00740 if (bestP->load > averageLoad) lightProcessors->remove(bestP);
00741 if (donor->load > thresholdLoad)
00742 heavyProcessors->insert((InfoRecord *) donor);
00743 else lightProcessors->insert((InfoRecord *) donor);
00744 }
00745
00746 }
00747
00748 #if LDB_DEBUG
00749 iout << "After Refinement Summary" << "\n";
00750 printSummary();
00751
00752 if (!finish) {
00753 iout << iINFO << "Refine: No solution found for overLoad = "
00754 << overLoad << "\n" << endi;
00755 }
00756 #endif
00757
00758 delete heavyProcessors;
00759 delete lightProcessors;
00760
00761 return finish;
00762 }
00763
00764
00765
00766 void Rebalancer::multirefine(double overload_start)
00767 {
00768
00769
00770
00771
00772 double avg = computeAverage();
00773 double max = computeMax();
00774
00775 #if LDB_DEBUG
00776 iout << "******** Processors with background load > average load ********" << "\n";
00777 #endif
00778
00779 int numOverloaded = 0;
00780 for (int ip=0; ip<P; ip++) {
00781 if ( processors[ip].backgroundLoad > averageLoad ) {
00782 ++numOverloaded;
00783 #if LDB_DEBUG
00784 iout << iINFO << "Info about proc " << ip << ": Load: " << processors[ip].load << " Bg Load: " << processors[ip].backgroundLoad << " Compute Load: " << processors[ip].computeLoad << " No of computes: " << processors[ip].computeSet.numElements() << "\n";
00785 #endif
00786 }
00787 }
00788 if ( numOverloaded ) {
00789 iout << iWARN << numOverloaded
00790 << " processors are overloaded due to high background load.\n" << endi;
00791 }
00792 #if LDB_DEBUG
00793 iout << "******** Processor List Ends ********" << "\n\n";
00794 #endif
00795
00796 const double overloadStep = 0.01;
00797 const double overloadStart = overload_start;
00798 double dCurOverload = max / avg;
00799
00800 int minOverload = 0;
00801 int maxOverload = (int)((dCurOverload - overloadStart)/overloadStep + 1);
00802 double dMinOverload = minOverload * overloadStep + overloadStart;
00803 double dMaxOverload = maxOverload * overloadStep + overloadStart;
00804
00805 #if LDB_DEBUG
00806 iout << iINFO
00807 << "Balancing from " << minOverload << " = " << dMinOverload
00808 << " to " << maxOverload << "=" << dMaxOverload
00809 << " dCurOverload=" << dCurOverload << " max=" << max << " avg=" << avg
00810 << "\n" << endi;
00811 #endif
00812
00813 int curOverload;
00814 int refineDone = 0;
00815
00816 overLoad = dMinOverload;
00817 if (refine())
00818 refineDone = 1;
00819 else {
00820 overLoad = dMaxOverload;
00821 if (!refine()) {
00822 iout << iINFO << "ERROR: Could not refine at max overload\n" << endi;
00823 refineDone = 1;
00824 }
00825 }
00826
00827
00828 while (!refineDone) {
00829 if (maxOverload - minOverload <= 1)
00830 refineDone = 1;
00831 else {
00832 curOverload = (maxOverload + minOverload ) / 2;
00833
00834 overLoad = curOverload * overloadStep + overloadStart;
00835 #if LDB_DEBUG
00836 iout << iINFO << "Testing curOverload " << curOverload
00837 << "=" << overLoad << " [min,max]="
00838 << minOverload << ", " << maxOverload
00839 << "\n" << endi;
00840 #endif
00841 if (refine())
00842 maxOverload = curOverload;
00843 else
00844 minOverload = curOverload;
00845 }
00846 }
00847
00848 }
00849
00850 void Rebalancer::printResults()
00851 {
00852 iout << iINFO << "ready to print result \n" << "\n";
00853 }
00854
00855
00856 void Rebalancer::printLoads()
00857 {
00858
00859 int i, total = 0, numBytes = 0;
00860 double max;
00861 int maxproxies = 0;
00862 int maxpatchproxies = 0;
00863 double avgBgLoad =0.0;
00864
00865 for (i=0; i<P; i++) {
00866 int nproxies = processors[i].proxies.numElements() -
00867 processors[i].patchSet.numElements();
00868 total += nproxies;
00869 if ( nproxies > maxproxies ) maxproxies = nproxies;
00870 avgBgLoad += processors[i].backgroundLoad;
00871 Iterator p;
00872 int count = 0;
00873
00874 patchInfo *patch = (patchInfo *) processors[i].patchSet.iterator(&p);
00875 while (patch)
00876 {
00877 int myProxies;
00878 myProxies = patch->proxiesOn.numElements()-1;
00879 if ( myProxies > maxpatchproxies ) maxpatchproxies = myProxies;
00880 numBytes += myProxies *patch->numAtoms*bytesPerAtom;
00881 count += myProxies;
00882 patch = (patchInfo *)processors[i].patchSet.next(&p);
00883 }
00884 }
00885
00886 avgBgLoad /= P;
00887 computeAverage();
00888 max = computeMax();
00889
00890 iout << "LDB:";
00891 if ( P != CkNumPes() ) {
00892 int w = 1;
00893 int maxinw = 10;
00894 while ( maxinw < CkNumPes() ) {
00895 ++w;
00896 maxinw = 10*maxinw;
00897 }
00898 iout << " PES " <<
00899 std::setw(w) << std::right << processors[0].Id << "-" <<
00900 std::setw(w) << std::left << processors[P-1].Id <<
00901 std::right;
00902 }
00903 iout << " TIME " << CmiWallTimer() << " LOAD: AVG " << averageLoad
00904 << " MAX " << max << " PROXIES: TOTAL " << total << " MAXPE " <<
00905 maxproxies << " MAXPATCH " << maxpatchproxies << " " << strategyName
00906 << " MEM: " << memusage_MB() << " MB\n" << endi;
00907 fflush(stdout);
00908
00909 }
00910
00911 void Rebalancer::printSummary()
00912 {
00913 int i;
00914
00915 double total = processors[0].load;
00916 double min = processors[0].load;
00917 int min_proc = 0;
00918 double max = processors[0].load;
00919 int max_proc = 0;
00920 for (i=1; i<P; i++) {
00921 total += processors[i].load;
00922 if (processors[i].load < min) {
00923 min = processors[i].load;
00924 min_proc = i;
00925 }
00926 if (processors[i].load > max) {
00927 max = processors[i].load;
00928 max_proc = i;
00929 }
00930 }
00931 iout << iINFO << " min = " << min << " processor " << min_proc << "\n";
00932 iout << iINFO << " max = " << max << " processor " << max_proc << "\n";
00933 iout << iINFO << " total = " << total << " average = " << total/P << "\n";
00934 iout << iINFO << "Info about most overloaded processor " << max_proc << ": Load: " << processors[max_proc].load << " Bg Load: " << processors[max_proc].backgroundLoad << " Compute Load: " << processors[max_proc].computeLoad << " No of computes: " << processors[max_proc].computeSet.numElements() << " No. of proxies: " << processors[max_proc].proxies.numElements() << "\n" << endi;
00935 }
00936
00937 double Rebalancer::computeAverage()
00938 {
00939 int i;
00940 double total = 0.;
00941 for (i=0; i<numComputes; i++)
00942 total += computes[i].load;
00943
00944 for (i=0; i<P; i++) {
00945 if (processors[i].available) {
00946 total += processors[i].backgroundLoad;
00947 }
00948 }
00949
00950 if (numPesAvailable == 0) {
00951 CmiPrintf("Warning: no processors available for load balancing!\n");
00952 averageLoad = 0.0;
00953 }
00954 else
00955 averageLoad = total/numPesAvailable;
00956 return averageLoad;
00957 }
00958
00959 void Rebalancer::adjustBackgroundLoadAndComputeAverage()
00960 {
00961
00962
00963 if (numPesAvailable == 0) {
00964 computeAverage();
00965 return;
00966 }
00967
00968 int i;
00969 double bgtotal = 0.;
00970 for (i=0; i<P; i++) {
00971 if (processors[i].available) {
00972 bgtotal += processors[i].backgroundLoad;
00973 }
00974 }
00975 double bgavg = bgtotal / numPesAvailable;
00976
00977 int nadjusted = 0;
00978 for (i=0; i<P; i++) {
00979 if (processors[i].available) {
00980 double bgload = processors[i].backgroundLoad;
00981 if ( bgload < bgavg ) {
00982 processors[i].backgroundLoad = bgavg;
00983 ++nadjusted;
00984 }
00985 }
00986 }
00987
00988
00989
00990 computeAverage();
00991 }
00992
00993 double Rebalancer::computeMax()
00994 {
00995 int i;
00996 double max = processors[0].load;
00997 for (i=1; i<P; i++)
00998 {
00999 if (processors[i].load > max)
01000 max = processors[i].load;
01001 }
01002 return max;
01003 }
01004
01005 int Rebalancer::isAvailableOn(patchInfo *patch, processorInfo *p)
01006 {
01007 return patch->proxiesOn.find(p);
01008 }
01009
01010 void Rebalancer::numAvailable(computeInfo *c, processorInfo *p,
01011 int *nPatches, int *nProxies, int *isBadForCommunication)
01012 {
01013
01014 int realPe, index;
01015 int patch_count = 0;
01016 int proxy_count = 0;
01017
01018 const int beginGroup = processors[0].Id;
01019 const int endGroup = beginGroup + P;
01020
01021 patchInfo &pa1 = patches[c->patch1];
01022 patchInfo &pa2 = patches[c->patch2];
01023 int pa1_avail = 1;
01024 int pa2_avail = 1;
01025
01026 if (pa1.processor == p->Id) {
01027 patch_count++;
01028 } else if ( pa1.proxiesOn.find(p) ) {
01029 proxy_count++;
01030 } else {
01031 pa1_avail = 0;
01032 }
01033
01034
01035 if (c->patch1 == c->patch2 || pa2.processor == p->Id) {
01036 patch_count++;
01037 } else if ( pa2.proxiesOn.find(p) ) {
01038 proxy_count++;
01039 } else {
01040 pa2_avail = 0;
01041 }
01042
01043 *nPatches = patch_count;
01044 *nProxies = proxy_count;
01045
01046 if ( isBadForCommunication ) {
01047 int bad = 0;
01048
01049 if ( patch_count + proxy_count < 2 ) {
01050 double bgLoadLimit = 1.2 * averageLoad;
01051 if ( p->backgroundLoad > bgLoadLimit ) bad = 1;
01052 else {
01053 int proxiesPerPeLimit = numProxies / numPesAvailable + 3;
01054 if ( proxiesPerPeLimit < 6 ) proxiesPerPeLimit = 6;
01055
01056 if ( p->proxies.numElements() > proxiesPerPeLimit ) bad = 1;
01057
01058 int proxiesPerPatchLimit = numProxies / numPatches + 3;
01059 if ( proxiesPerPatchLimit < 6 ) proxiesPerPatchLimit = 6;
01060
01061 if ( ! bad && ! pa1_avail ) {
01062
01063 realPe = pa1.processor;
01064 if INGROUP(realPe) {
01065 index = realPe - beginGroup;
01066
01067 if (processors[index].backgroundLoad > bgLoadLimit) bad = 1;
01068 else if ( pa1.proxiesOn.numElements() > proxiesPerPatchLimit ) bad = 1;
01069 } else bad = 1;
01070 }
01071
01072 if ( ! bad && ! pa2_avail ) {
01073
01074 realPe = pa2.processor;
01075 if INGROUP(realPe) {
01076 index = realPe - beginGroup;
01077
01078 if ( processors[index].backgroundLoad > bgLoadLimit) bad = 1;
01079 else if ( pa2.proxiesOn.numElements() > proxiesPerPatchLimit ) bad = 1;
01080 } else bad = 1;
01081 }
01082
01083 }
01084 }
01085
01086 *isBadForCommunication = bad;
01087 }
01088 }
01089
01090 void Rebalancer::createSpanningTree() {
01091 ProxyTree &pt = ProxyMgr::Object()->getPtree();
01092 Iterator nextP;
01093 processorInfo *p;
01094 #ifndef NODEAWARE_PROXY_SPANNINGTREE
01095 if(pt.sizes==NULL)
01096 pt.sizes = new int[numPatches];
01097 #endif
01098
01099 if (pt.proxylist == NULL)
01100 pt.proxylist = new NodeIDList[numPatches];
01101 for(int i=0; i<numPatches; i++)
01102 {
01103 pt.proxylist[i].resize(patches[i].proxiesOn.numElements());
01104 nextP.id = 0;
01105 p = (processorInfo *)(patches[i].proxiesOn.iterator((Iterator *)&nextP));
01106 int j = 0;
01107 while(p) {
01108
01109
01110
01111 if (p->Id == (PatchMap::Object()->node(i))) {
01112 p = (processorInfo *)(patches[i].proxiesOn.next((Iterator *)&nextP));
01113 continue;
01114 }
01115
01116 pt.proxylist[i][j] = p->Id;
01117 nextP.id++;
01118 p = (processorInfo *)(patches[i].proxiesOn.next((Iterator *)&nextP));
01119 j++;
01120 }
01121 pt.proxylist[i].resize(j);
01122 }
01123 CkPrintf("Done intialising\n");
01124 #ifdef NODEAWARE_PROXY_SPANNINGTREE
01125 ProxyMgr::Object()->buildNodeAwareSpanningTree0();
01126 #else
01127 ProxyMgr::Object()->buildSpanningTree0();
01128 #endif
01129 }
01130
01131 void Rebalancer::decrSTLoad() {
01132 int pe;
01133 ProxyTree &pt = ProxyMgr::Object()->getPtree();
01134 for(int i=0; i<numPatches; i++)
01135 for(int j=1; j<pt.proxylist[i].size() && j<proxySpanDim; j++) {
01136 pe = pt.proxylist[i][j];
01137 processors[pe].load -= ST_NODE_LOAD;
01138 processors[pe].backgroundLoad -= ST_NODE_LOAD;
01139 if(processors[pe].load < 0.0)
01140 processors[pe].load = 0.0;
01141 if(processors[pe].backgroundLoad < 0.0)
01142 processors[pe].backgroundLoad = 0.0;
01143 }
01144 }
01145
01146 void Rebalancer::incrSTLoad() {
01147 int pe;
01148 ProxyTree &pt = ProxyMgr::Object()->getPtree();
01149 for(int i=0; i<numPatches; i++)
01150 for(int j=1; j<pt.proxylist[i].size() && j<proxySpanDim; j++) {
01151 pe = pt.proxylist[i][j];
01152 processors[pe].load += ST_NODE_LOAD;
01153 processors[pe].backgroundLoad += ST_NODE_LOAD;
01154 }
01155 }
01156