Rebalancer.C

Go to the documentation of this file.
00001 
00007 /*****************************************************************************
00008  * $Source: /home/cvs/namd/cvsroot/namd2/src/Rebalancer.C,v $
00009  * $Author: jim $
00010  * $Date: 2013/08/30 21:43:01 $
00011  * $Revision: 1.100 $
00012  *****************************************************************************/
00013 
00014 #include "InfoStream.h"
00015 #include "Node.h"
00016 #include "Rebalancer.h"
00017 #include "ProxyMgr.h"
00018 #include "PatchMap.h"
00019 #include "LdbCoordinator.h"
00020 #include "memusage.h"
00021 #include <iomanip>
00022 
00023 #define ST_NODE_LOAD            0.005
00024 #define PROXY_LOAD              0.001
00025 #define COMPUTE_LOAD            0.00005
00026 
00027 Rebalancer::Rebalancer(computeInfo *computeArray, patchInfo *patchArray,
00028       processorInfo *processorArray, int nComps, int nPatches, int nPes)
00029 {
00030    bytesPerAtom = 32;
00031    strategyName = "None";
00032    computes = computeArray;
00033    patches =  patchArray;
00034    processors =  processorArray;
00035    numComputes = nComps;
00036    numPatches = nPatches;
00037    P = nPes;
00038    pes = NULL;
00039    computePairHeap = NULL;
00040    computeSelfHeap = NULL;
00041    computeBgPairHeap = NULL;
00042    computeBgSelfHeap = NULL;
00043    overLoad = 0.;
00044    numPesAvailable = 0;
00045    firstAssignInRefine = 0;
00046    collMsg = 0;
00047 
00048   const int beginGroup = processors[0].Id;
00049   const int endGroup = beginGroup + P;
00050 #define INGROUP(PROC) ((PROC) >= beginGroup && (PROC) < endGroup)
00051 
00052    int i;
00053    int index;
00054    for (i=0; i<P; i++)
00055    {
00056       // For testing only...
00057       // processors[i].backgroundLoad = 0;
00058       // End of test section
00059       processors[i].load = processors[i].backgroundLoad;
00060       processors[i].computeLoad = 0;
00061       if (processors[i].available) {
00062         numPesAvailable += 1;
00063       }
00064    }
00065 
00066    for (i=0; i<nPatches; i++) {
00067      // Only for those patches which are in my group (hierarchical case)
00068      if INGROUP(patches[i].processor) {
00069        index = patches[i].processor - beginGroup;
00070        if (!patches[i].proxiesOn.find(&(processors[index]))) {
00071          patches[i].proxiesOn.unchecked_insert(&(processors[index]));
00072          processors[index].proxies.unchecked_insert(&(patches[i]));
00073        }
00074        processors[index].patchSet.unchecked_insert(&patches[i]);
00075      }
00076    }                      
00077 
00078    InitProxyUsage();
00079 
00080    for (i=0; i<numComputes; i++)
00081       computeArray[i].processor = -1;
00082 
00083    for (i=0; i < numComputes; i++) {
00084      // Only for those computes which are in my group (hierarchical case)
00085      if INGROUP(computes[i].oldProcessor) {
00086        index = computes[i].oldProcessor - beginGroup;
00087        processors[index].computeLoad += computes[i].load;
00088      }
00089    }
00090 
00091    // Added 4-29-98: Temporarily adds the compute load to the background
00092    // load so that the correct value for the total load can be displayed.
00093    float *temploads = new float[P];
00094    for(i=0; i<P; i++)
00095    {
00096       temploads[i] = processors[i].load;
00097       processors[i].load += processors[i].computeLoad;
00098    }
00099 
00100    origMaxLoad = computeMax();
00101 
00102    // iout << iINFO << "Initial load" << "\n";
00103    printLoads(1);
00104 
00105    for(i=0;i<P; i++)
00106    {
00107       processors[i].load = temploads[i];
00108       processors[i].computeLoad = 0;
00109    }
00110    
00111    delete [] temploads;
00112 
00113    // int count1=0, count2=0;
00114    // for (i=0; i<nPatches; i++)
00115    // {
00116    //    if (patches[i].proxiesOn.numElements() <= 1)
00117    //    count1++;
00118    //    else count2++;
00119    // }                   
00120    // iout << iINFO << "Count1 = " << count1
00121    //      << "Count2 = " << count2
00122    //      << "\n" << std::endl;
00123    // 
00124    // for (i=0; i <P; i++) 
00125    // {
00126    //    iout << iINFO << "\n proxies on proc. " << i << " are for patches:";
00127    //    processorArray[i].proxies->print();
00128    // }
00129    // 
00130    // iout << iINFO <<"\n" << endi;
00131    // strategy();
00132 
00133    // for (i=0; i<nPatches; i++)
00134    // {
00135    //    iout << "patch " << i << " on processor " << patches[i].processor << "\n" << endi;
00136    // }
00137 }
00138 
00139 Rebalancer::~Rebalancer()
00140 {
00141   if ( computeMax() > origMaxLoad ) {
00142    if ( P == CkNumPes() ) {
00143    iout << "LDB:";
00144    if ( P != CkNumPes() ) {
00145      int w = 1;   
00146      int maxinw = 10;
00147      while ( maxinw < CkNumPes() ) {
00148        ++w;
00149        maxinw = 10*maxinw;
00150      }
00151      iout << " PES " <<
00152              std::setw(w) << std::right << processors[0].Id << "-" <<
00153              std::setw(w) << std::left  << processors[P-1].Id <<
00154              std::right;
00155    }
00156    iout << " Reverting to original mapping\n" << endi;
00157    fflush(stdout);
00158    } else {  // P != CkNumPes()
00159      if ( ! collMsg ) NAMD_bug("Rebalancer::~Rebalancer() collMsg null.");
00160      collMsg->finalAvgPeLoad = collMsg->initAvgPeLoad;
00161      collMsg->finalMaxPeLoad = collMsg->initMaxPeLoad;
00162      collMsg->finalTotalProxies = collMsg->initTotalProxies;
00163      collMsg->finalMaxPeProxies = collMsg->initMaxPeProxies;
00164      collMsg->finalMaxPatchProxies = collMsg->initMaxPatchProxies;
00165      collMsg->reverted = 1;
00166    }
00167    const int beginGroup = processors[0].Id;
00168    const int endGroup = beginGroup + P;
00169    for (int i=0; i < numComputes; i++) {
00170      // Only for those computes which are in my group (hierarchical case)
00171      if INGROUP(computes[i].oldProcessor) {
00172        computes[i].processor = computes[i].oldProcessor;
00173      }
00174    }
00175   }
00176 
00177   if ( P != CkNumPes() ) {
00178     if ( ! collMsg ) NAMD_bug("Rebalancer::~Rebalancer() collMsg null.");
00179     LdbCoordinator::Object()->sendCollectLoads(collMsg);
00180     collMsg = 0;
00181   }
00182 
00183   //for(int i=0; i<P; i++)
00184   //  delete [] processors[i].proxyUsage;
00185    delete pes;
00186    delete computePairHeap;
00187    delete computeSelfHeap;
00188    delete computeBgPairHeap;
00189    delete computeBgSelfHeap;
00190 }
00191 
00192 // Added 4-29-98: array proxyUsage on each processor keeps track of 
00193 // how many computes are accessing each proxy on the processor.  If
00194 // no computes are accessing it, the proxy can be removed in DeAssign
00195 void Rebalancer::InitProxyUsage()
00196 {
00197    int i;
00198    numProxies = 0;
00199 
00200    for(i=0; i<P; i++) {
00201      //processors[i].proxyUsage = new unsigned char [numPatches];
00202      //for(int j=0; j<numPatches; j++)
00203      //{
00204      //  processors[i].proxyUsage[j] = 0;
00205      //}
00206 
00207       Iterator nextCompute;
00208       nextCompute.id = 0;
00209 
00210       computeInfo *c = (computeInfo *)
00211          processors[i].computeSet.iterator((Iterator *)&nextCompute);
00212 
00213       while(c)
00214       {
00215         /* int n1 = */ //processors[i].proxyUsage[c->patch1]++;
00216         proxyUsage.increment (i, c->patch1); 
00217         /* int n2 = */ //processors[i].proxyUsage[c->patch2]++;
00218         proxyUsage.increment (i, c->patch2); 
00219 
00220          // iout << iINFO  
00221          // << "Assigning compute " << c->Id << " with work = " << c->load 
00222          // << " to processor " << processors[i].Id << "\n"
00223          // << "\tproxyUsage[" << c->patch1 << "]: " << n1 << " --> " << n1+1 << "\n";
00224          // << "\tproxyUsage[" << c->patch2 << "]: " << n2 << " --> " << n2+1 << "\n";
00225          // << std::endl;
00226 
00227          nextCompute.id++;
00228          c = (computeInfo *) processors[i].computeSet.next((Iterator *)&nextCompute);
00229       }
00230    }
00231 
00232   for (i=0; i<numPatches; i++)
00233   {
00234       numProxies += ( patches[i].proxiesOn.numElements() - 1 );
00235       Iterator nextProc;
00236       processorInfo *p = (processorInfo *)patches[i].proxiesOn.iterator((Iterator *)&nextProc);
00237       while (p) {
00238           //p->proxyUsage[i] += 1;
00239           proxyUsage.increment (p->Id, i);
00240           p = (processorInfo *)patches[i].proxiesOn.next((Iterator*)&nextProc);
00241       }
00242   }
00243 
00244 }
00245 
00246 
00247 void Rebalancer::strategy()
00248 {
00249    iout << iINFO << "Strategy not implemented for the base class.\n" << "\n";
00250 }
00251 
00252 void Rebalancer::makeHeaps()
00253 {
00254    int i, j;
00255 
00256    delete pes;
00257    pes = new minHeap(P+2);
00258    for (i=0; i<P; i++)
00259       pes->insert((InfoRecord *) &(processors[i]));
00260 
00261    delete computePairHeap;
00262    delete computeSelfHeap;
00263    delete computeBgPairHeap;
00264    delete computeBgSelfHeap;
00265 
00266    double bgLoadLimit = 0.5 * averageLoad;
00267    /*
00268    iout << iINFO << "Background load limit = " << bgLoadLimit << "\n";
00269    for (i=0; i<P; i++)
00270      if ( processors[i].backgroundLoad > bgLoadLimit )
00271        iout << iINFO << "Processor " << i << " background load = "
00272             << processors[i].backgroundLoad << "\n";
00273    iout << endi;
00274    */
00275 
00276    int numSelfComputes, numPairComputes, numBgSelfComputes, numBgPairComputes;
00277 
00278    while ( 1 ) {
00279     numSelfComputes = 0;
00280     numPairComputes = 0;
00281     numBgSelfComputes = 0;
00282     numBgPairComputes = 0;
00283     for (i=0; i<numComputes; i++) {
00284      int pa1 = computes[i].patch1;
00285      int pa2 = computes[i].patch2;
00286      if ( pa1 == pa2 ) {
00287         if ( processors[patches[pa1].processor].backgroundLoad > bgLoadLimit) {
00288           ++numBgSelfComputes;
00289         } else {
00290           ++numSelfComputes;
00291         }
00292      } else {
00293         if ( processors[patches[pa1].processor].backgroundLoad > bgLoadLimit
00294           || processors[patches[pa2].processor].backgroundLoad > bgLoadLimit) {
00295           ++numBgPairComputes;
00296         } else {
00297           ++numPairComputes;
00298         }
00299      }
00300     }
00301 
00302     int numBgComputes = numBgPairComputes + numBgSelfComputes;
00303 
00304     /*if ( numBgComputes ) {
00305         iout << iINFO << numBgComputes << " of " << numComputes
00306         << " computes have background load > " << bgLoadLimit << "\n" << endi;
00307     }*/
00308 
00309     if ( numBgComputes < 0.3 * numComputes ) break;
00310     else bgLoadLimit += 0.1 * averageLoad;
00311    }
00312 
00313    computePairHeap = new maxHeap(numPairComputes+2);
00314    computeSelfHeap = new maxHeap(numSelfComputes+2);
00315    computeBgPairHeap = new maxHeap(numBgPairComputes+2);
00316    computeBgSelfHeap = new maxHeap(numBgSelfComputes+2);
00317 
00318    for (i=0; i<numComputes; i++) {
00319      int pa1 = computes[i].patch1;
00320      int pa2 = computes[i].patch2;
00321      if ( pa1 == pa2 ) {
00322         if ( processors[patches[pa1].processor].backgroundLoad > bgLoadLimit) {
00323           computeBgSelfHeap->insert( (InfoRecord *) &(computes[i]));
00324         } else {
00325           computeSelfHeap->insert( (InfoRecord *) &(computes[i]));
00326         }
00327      } else {
00328         if ( processors[patches[pa1].processor].backgroundLoad > bgLoadLimit
00329           || processors[patches[pa2].processor].backgroundLoad > bgLoadLimit) {
00330           computeBgPairHeap->insert( (InfoRecord *) &(computes[i]));
00331         } else {
00332           computePairHeap->insert( (InfoRecord *) &(computes[i]));
00333         }
00334      }
00335    }
00336 
00337 /*
00338    delete computePairHeap;
00339    delete computeSelfHeap;
00340 
00341    int numSelfComputes = 0;
00342    for (i=0; i<numComputes; i++)
00343       if ( computes[i].patch1 == computes[i].patch2 ) ++numSelfComputes;
00344 
00345    computeSelfHeap = new maxHeap(numSelfComputes+2);
00346    computePairHeap = new maxHeap(numComputes-numSelfComputes+2);
00347 
00348    for (i=0; i<numComputes; i++)
00349       if ( computes[i].patch1 == computes[i].patch2 )
00350          computeSelfHeap->insert( (InfoRecord *) &(computes[i]));
00351       else
00352          computePairHeap->insert( (InfoRecord *) &(computes[i]));
00353 */
00354 }
00355 
00356 void Rebalancer::makeTwoHeaps()
00357 {
00358    int i, j;
00359 
00360    delete pes;
00361    pes = new minHeap(P+2);
00362    for (i=0; i<P; i++)
00363       pes->insert((InfoRecord *) &(processors[i]));
00364 
00365    delete computePairHeap;
00366    delete computeSelfHeap;
00367    delete computeBgPairHeap;
00368    delete computeBgSelfHeap;
00369 
00370    int numSelfComputes, numPairComputes;
00371 
00372    numSelfComputes = 0;
00373    numPairComputes = 0;
00374    for (i=0; i<numComputes; i++) {
00375      int pa1 = computes[i].patch1;
00376      int pa2 = computes[i].patch2;
00377      if (pa1 == pa2)
00378        ++numSelfComputes;
00379      else
00380        ++numPairComputes;
00381    }
00382 
00383    computePairHeap = new maxHeap(numPairComputes+2);
00384    computeSelfHeap = new maxHeap(numSelfComputes+2);
00385 
00386    for (i=0; i<numComputes; i++) {
00387      int pa1 = computes[i].patch1;
00388      int pa2 = computes[i].patch2;
00389      if ( pa1 == pa2 )
00390        computeSelfHeap->insert( (InfoRecord *) &(computes[i]));
00391      else
00392        computePairHeap->insert( (InfoRecord *) &(computes[i]));
00393    }
00394 }
00395 
00396 // not safe with hybrid balancer
00397 //void Rebalancer::assign(computeInfo *c, int processor)
00398 //{
00399 //   assign(c, &(processors[processor]));
00400 //}
00401 
00402 void Rebalancer::assign(computeInfo *c, processorInfo *p)
00403 {
00404    c->processor = p->Id;
00405    p->computeSet.unchecked_insert((InfoRecord *) c);
00406 #if COMPUTE_CORRECTION
00407    if(firstAssignInRefine)
00408      p->computeLoad += c->load + COMPUTE_LOAD;
00409    else
00410 #endif
00411      p->computeLoad += c->load;
00412      
00413    p->load = p->computeLoad + p->backgroundLoad;
00414    patchInfo* patch1 = (patchInfo *) &(patches[c->patch1]);
00415    patchInfo* patch2 = (patchInfo *) &(patches[c->patch2]);
00416 
00417    if (!patch1->proxiesOn.find(p)) {
00418      p->proxies.unchecked_insert(patch1); 
00419      patch1->proxiesOn.unchecked_insert(p); 
00420      numProxies++;
00421 #if PROXY_CORRECTION
00422      if(firstAssignInRefine) {
00423        processors[p->Id].load += PROXY_LOAD;
00424        processors[p->Id].backgroundLoad += PROXY_LOAD;
00425      }
00426 #endif
00427    }
00428 
00429    if (!patch2->proxiesOn.find(p)) {
00430      p->proxies.unchecked_insert(patch2); 
00431      patch2->proxiesOn.unchecked_insert(p);
00432      numProxies++;
00433 #if PROXY_CORRECTION
00434      if(firstAssignInRefine) {
00435        processors[p->Id].load += PROXY_LOAD;
00436        processors[p->Id].backgroundLoad += PROXY_LOAD;
00437      }
00438 #endif
00439    }
00440    
00441    // 4-29-98: Added the following code to keep track of how many proxies
00442    // on each processor are being used by a compute on that processor
00443    /* int n1 = */ //p->proxyUsage[c->patch1]++;
00444    proxyUsage.increment (p->Id, c->patch1);
00445    /* int n2 = */ //p->proxyUsage[c->patch2]++;
00446    proxyUsage.increment (p->Id, c->patch2);
00447 
00448    // iout << iINFO  
00449    // << "Assigning compute " << c->Id << " with work = " << c->load 
00450    // << " to processor " << p->Id << "\n"
00451    // << "\tproxyUsage[" << c->patch1 << "]: " << n1 << " --> " << n1+1 << "\n"
00452    // << "\tproxyUsage[" << c->patch2 << "]: " << n2 << " --> " << n2+1 << "\n"
00453    // << std::endl;
00454 
00455 #if 0
00456    iout << "Assign " << c->Id << " patches " << c->patch1 << " " << c->patch2
00457         << " load " << c->load << " to " << p->Id << " new load "
00458         << p->load << " background " << p->backgroundLoad
00459         << " nPatches " << nPatches << " nProxies " << nProxies;
00460    if ( nPatches + nProxies < 2 ) iout << " addProxy";
00461    if ( badForComm ) iout << " badForComm";
00462    iout << "\n" << endi;
00463 #endif
00464 }
00465 
00466 void  Rebalancer::deAssign(computeInfo *c, processorInfo *p)
00467 {
00468    if (!p->computeSet.remove(c))  {
00469       iout << iINFO << "ERROR: Rebalancer tried to deAssign an object that is not on the processor.\n" << endi;
00470       return;
00471    }
00472 
00473    double temp_load = 0.0;
00474 
00475    c->processor = -1;
00476    p->computeLoad -= c->load;
00477    CmiAssert(p->computeLoad >= 0.0);
00478    temp_load = p->load - c->load;
00479    p->load = p->computeLoad + p->backgroundLoad;
00480    CmiAssert( fabs(temp_load - p->load) < 0.001 );
00481 
00482    // 4-29-98: Added the following code to keep track of how many proxies 
00483    // on each processor are being used by a compute on that processor.
00484    // If no computes are using the proxy, it should be removed if it is not
00485    // on the processor that its patch is on.
00486    /* int n1 = */ //p->proxyUsage[c->patch1]--;
00487    proxyUsage.decrement (p->Id, c->patch1);
00488    /* int n2 = */ //p->proxyUsage[c->patch2]--;
00489    proxyUsage.decrement (p->Id, c->patch2);
00490 
00491    // iout << iINFO
00492    // << "De-assigning compute " << c->Id << " from processor " << p->Id << "\n"
00493    // << "\tproxyUsage[" << c->patch1 << "]: " << n1 << " --> " << n1-1 << "\n"
00494    // << "\tproxyUsage[" << c->patch2 << "]: " << n2 << " --> " << n2-1 << "\n"
00495    // << std::endl;
00496 
00497    //if(p->proxyUsage[c->patch1] <= 0 && p->Id != patches[c->patch1].processor)
00498    if(proxyUsage.getVal(p->Id, c->patch1) <= 0 && p->Id != patches[c->patch1].processor)
00499    {
00500       // iout << iINFO 
00501       // << "REMOVING PROXY " << c->patch1 << " FROM PROCESSOR " << p->Id 
00502       // << std::endl << endl;
00503 
00504       patchInfo* patch1 = (patchInfo *) &(patches[c->patch1]);
00505       p->proxies.remove(patch1);
00506       patch1->proxiesOn.remove(p);
00507       numProxies--;
00508 #if PROXY_CORRECTION
00509       if(firstAssignInRefine) {
00510         processors[p->Id].load -= PROXY_LOAD;
00511         processors[p->Id].backgroundLoad -= PROXY_LOAD;
00512         if(processors[p->Id].backgroundLoad < 0.0) {
00513           processors[p->Id].backgroundLoad = 0.0;
00514           processors[p->Id].load += PROXY_LOAD;
00515         }
00516       }
00517 #endif
00518    }
00519    
00520    //if(p->proxyUsage[c->patch2] <= 0 && p->Id != patches[c->patch2].processor)
00521    if(proxyUsage.getVal(p->Id, c->patch2) <= 0 && p->Id != patches[c->patch2].processor)
00522    {
00523       // iout << iINFO
00524       // << "REMOVING PROXY " << c->patch1 << " FROM PROCESSOR " << p->Id 
00525       // << std::endl << endl;
00526 
00527       patchInfo* patch2 = (patchInfo *) &(patches[c->patch2]);
00528       p->proxies.remove(patch2);
00529       patch2->proxiesOn.remove(p);
00530       numProxies--;
00531 #if PROXY_CORRECTION
00532       if(firstAssignInRefine) {
00533         processors[p->Id].load -= PROXY_LOAD;
00534         processors[p->Id].backgroundLoad -= PROXY_LOAD;
00535         if(processors[p->Id].backgroundLoad < 0.0) {
00536           processors[p->Id].backgroundLoad = 0.0;
00537           processors[p->Id].load += PROXY_LOAD;
00538         }
00539       }
00540 #endif
00541    }
00542 }
00543 
00544 void Rebalancer::refine_togrid(pcgrid &grid, double thresholdLoad,
00545                         processorInfo *p, computeInfo *c) {
00546 
00547   if(p->available == false) return;
00548 
00549   if ( c->load + p->load < thresholdLoad) {
00550     int nPatches, nProxies, badForComm;
00551     numAvailable(c,p,&nPatches,&nProxies,&badForComm);
00552 
00553     // if ( badForComm ) return;
00554 
00555     pcpair *pair = &grid[nPatches][nProxies][badForComm];
00556 
00557     if (! pair->c) {
00558       pair->c = c;
00559       pair->p = p;
00560     } else {
00561       double newval = p->load - c->load;
00562       if ( c->load + p->load < averageLoad ) {
00563          newval -= averageLoad;
00564       }
00565       double oldval = pair->p->load - pair->c->load;
00566       if ( pair->c->load + pair->p->load < averageLoad ) {
00567          oldval -= averageLoad;
00568       }
00569       if (newval < oldval) {
00570         pair->c = c;
00571         pair->p = p;
00572       }
00573     }
00574   }
00575 }
00576 
00577 int Rebalancer::refine()
00578 {
00579    int finish = 1;
00580    int no_new_proxies = 0;  // set to true if new proxies are futile
00581    maxHeap *heavyProcessors = new maxHeap(P);
00582 
00583    IRSet *lightProcessors = new IRSet();
00584    int i;
00585    double thresholdLoad = overLoad * averageLoad;
00586    for (i=0; i<P; i++)
00587    {
00588       // iout << iINFO << "\n Computes on processor " << i << " ";
00589       // processors[i].computeSet->print();
00590       // iout << iINFO << "\n" << endi;
00591       if (processors[i].load > thresholdLoad)
00592          heavyProcessors->insert((InfoRecord *) &(processors[i]));
00593       else lightProcessors->insert((InfoRecord *) &(processors[i]));
00594    }
00595 
00596 #if LDB_DEBUG
00597    iout << "\nBefore Refinement Summary" << "\n";
00598    printSummary();
00599 #endif
00600 
00601    int done = 0;
00602    while (!done)
00603    {
00604       // processorInfo *donor = (processorInfo *) heavyProcessors->deleteMax();
00605       /* Keep selecting new donors, until we find one with some compute to
00606        * migrate
00607        */
00608 /*
00609       computeInfo* c=0;
00610       while (donor && !c) {
00611         Iterator nextCompute;
00612         nextCompute.id = 0;
00613         c = (computeInfo *) donor->
00614             computeSet.iterator((Iterator *)&nextCompute);
00615         if (!c) {
00616           iout << iINFO << "Ignoring donor " << donor->Id
00617                << " because no computes\n" << endi;
00618           donor = (processorInfo*)heavyProcessors->deleteMax();
00619         }
00620       };
00621 */
00622 
00623       processorInfo *donor;
00624       while (donor = (processorInfo*)heavyProcessors->deleteMax()) {
00625         if (donor->computeSet.hasElements()) break;
00626         if ( ! no_new_proxies ) {
00627           /*
00628           iout << iINFO << "Most-loaded processor " << donor->Id
00629                << " (" << donor->patchSet.numElements() << " patches, "
00630                << donor->proxies.numElements() << " proxies)"
00631                << " has no migratable work.\n" << endi;
00632           */
00633           no_new_proxies = 1;  // New proxies would not improve load balance.
00634         }
00635       }
00636   
00637       if (!donor) break;  // No donors found at all! Give up 
00638 
00639       pcgrid grid;
00640 #define REASSIGN(GRID) if (GRID.c) { \
00641            deAssign(GRID.c, donor); \
00642            assign(GRID.c, GRID.p); \
00643            bestP = GRID.p; \
00644         }
00645 
00646       // try for at least one proxy
00647       {
00648         Iterator nextCompute;
00649         nextCompute.id = 0;
00650         computeInfo *c = (computeInfo *) 
00651            donor->computeSet.iterator((Iterator *)&nextCompute);
00652         while (c)
00653         {
00654           Iterator nextProc;
00655           processorInfo *p;
00656 
00657           p = &processors[patches[c->patch1].processor];
00658           refine_togrid(grid, thresholdLoad, p, c);
00659 
00660           if (c->patch1 != c->patch2)
00661           {
00662           p = &processors[patches[c->patch2].processor];
00663           refine_togrid(grid, thresholdLoad, p, c);
00664           }
00665 
00666           p = (processorInfo *)patches[c->patch1].
00667                                 proxiesOn.iterator((Iterator *)&nextProc);
00668           while (p) {
00669             refine_togrid(grid, thresholdLoad, p, c);
00670             p = (processorInfo *)patches[c->patch1].
00671                                 proxiesOn.next((Iterator*)&nextProc);
00672           }
00673 
00674           if (c->patch1 != c->patch2) 
00675           {
00676           p = (processorInfo *)patches[c->patch2].
00677                                 proxiesOn.iterator((Iterator *)&nextProc);
00678           while (p) {
00679             refine_togrid(grid, thresholdLoad, p, c);
00680             p = (processorInfo *)patches[c->patch2].
00681                                 proxiesOn.next((Iterator*)&nextProc);
00682           }
00683           }
00684 
00685           nextCompute.id++;
00686           c = (computeInfo *) donor->computeSet.
00687             next((Iterator *)&nextCompute);
00688         }
00689         processorInfo* bestP = 0;
00690         // prefer proxies to home patches
00691         REASSIGN(grid[0][2][0])
00692         else REASSIGN(grid[1][1][0])
00693         else REASSIGN(grid[2][0][0])
00694         else if ( no_new_proxies ) { finish = 0; break; }
00695         else REASSIGN(grid[0][1][0])
00696         else REASSIGN(grid[1][0][0])
00697         else REASSIGN(grid[0][0][0])
00698         // else REASSIGN(grid[0][1][1])
00699         // else REASSIGN(grid[1][0][1])
00700         // else REASSIGN(grid[0][0][1])
00701         if (bestP) {
00702           if (bestP->load > averageLoad) lightProcessors->remove(bestP);
00703           if (donor->load > thresholdLoad)
00704                 heavyProcessors->insert((InfoRecord *) donor);
00705           else lightProcessors->insert((InfoRecord *) donor);
00706           continue;
00707         }
00708       }
00709 
00710       if ( no_new_proxies ) iout << iINFO
00711          << "ERROR: Rebalancer::refine() algorithm is broken.\n" << endi;
00712 
00713       // no luck, do it the long way
00714 
00715       //find the best pair (c,receiver)
00716       Iterator nextProcessor;
00717       processorInfo *p = (processorInfo *) 
00718       lightProcessors->iterator((Iterator *) &nextProcessor);
00719 
00720       while (p)
00721       {
00722          Iterator nextCompute;
00723          nextCompute.id = 0;
00724          computeInfo *c = (computeInfo *) 
00725             donor->computeSet.iterator((Iterator *)&nextCompute);
00726          while (c)
00727          {
00728 #if USE_TOPOMAP
00729            int flag = tmgr.areNeighbors(p->Id, patches[c->patch1].processor, 
00730                                      patches[c->patch2].processor, 8);
00731            if(flag)
00732 #endif
00733              {       
00734                refine_togrid(grid, thresholdLoad, p, c);
00735              }
00736            nextCompute.id++;
00737            c = (computeInfo *) donor->computeSet.
00738              next((Iterator *)&nextCompute);
00739          }
00740          p = (processorInfo *) 
00741            lightProcessors->next((Iterator *) &nextProcessor);
00742       }
00743 
00744       //we have narrowed the choice to 6 candidates.
00745       // prefer proxies to home patches
00746       {
00747         processorInfo* bestP = 0;
00748         REASSIGN(grid[0][2][0])
00749         else REASSIGN(grid[1][1][0])
00750         else REASSIGN(grid[2][0][0])
00751         else REASSIGN(grid[0][1][0])
00752         else REASSIGN(grid[1][0][0])
00753         else REASSIGN(grid[0][0][0])
00754         // else REASSIGN(grid[0][1][1])
00755         // else REASSIGN(grid[1][0][1])
00756         // else REASSIGN(grid[0][0][1])
00757         else { finish = 0; break; }
00758         if (bestP->load > averageLoad) lightProcessors->remove(bestP);
00759         if (donor->load > thresholdLoad)
00760                 heavyProcessors->insert((InfoRecord *) donor);
00761         else lightProcessors->insert((InfoRecord *) donor);
00762       }
00763 
00764    }  
00765 
00766 #if LDB_DEBUG
00767    iout << "After Refinement Summary" << "\n";
00768    printSummary();
00769 
00770    if (!finish) {
00771      iout << iINFO << "Refine: No solution found for overLoad = " 
00772           << overLoad << "\n" << endi;
00773    }
00774 #endif
00775 
00776    delete heavyProcessors;
00777    delete lightProcessors;
00778 
00779    return finish;
00780 }
00781 
00782 // this binary search refinement procedure assume you already assigned computes
00783 // to their processors before calling this!!
00784 void Rebalancer::multirefine(double overload_start)
00785 {
00786   // The New refinement procedure.  This is identical to the code in
00787   // RefineOnly.C, and probably should be merged with that code to form
00788   // a binary-search function
00789 
00790   double avg = computeAverage();
00791   double max = computeMax();
00792 
00793 #if LDB_DEBUG
00794   iout << "******** Processors with background load > average load ********" << "\n";
00795 #endif
00796 
00797   int numOverloaded = 0;
00798   for (int ip=0; ip<P; ip++) {
00799     if ( processors[ip].backgroundLoad > averageLoad ) {
00800       ++numOverloaded;
00801 #if LDB_DEBUG
00802       iout << iINFO << "Info about proc " << ip << ": Load: " << processors[ip].load << " Bg Load: " << processors[ip].backgroundLoad << " Compute Load: " << processors[ip].computeLoad << " No of computes: " << processors[ip].computeSet.numElements() << "\n";
00803 #endif
00804     }
00805   }
00806   if ( numOverloaded ) {
00807     iout << iWARN << numOverloaded
00808       << " processors are overloaded due to high background load.\n" << endi;
00809   }
00810 #if LDB_DEBUG
00811   iout << "******** Processor List Ends ********" << "\n\n";
00812 #endif
00813 
00814   const double overloadStep = 0.01;
00815   const double overloadStart = overload_start;       //1.05;
00816   double dCurOverload = max / avg;
00817   
00818   int minOverload = 0;   //Min overload should be 1.05 ?
00819   int maxOverload = (int)((dCurOverload - overloadStart)/overloadStep + 1);
00820   double dMinOverload = minOverload * overloadStep + overloadStart;
00821   double dMaxOverload = maxOverload * overloadStep + overloadStart;
00822 
00823 #if LDB_DEBUG 
00824   iout << iINFO
00825        << "Balancing from " << minOverload << " = " << dMinOverload 
00826        << " to " << maxOverload << "=" << dMaxOverload 
00827        << " dCurOverload=" << dCurOverload << " max=" << max << " avg=" << avg
00828        << "\n" << endi;
00829 #endif
00830 
00831   int curOverload;
00832   int refineDone = 0;
00833 
00834   overLoad = dMinOverload;
00835   if (refine())
00836     refineDone = 1;
00837   else {
00838     overLoad = dMaxOverload;
00839     if (!refine()) {
00840       iout << iINFO << "ERROR: Could not refine at max overload\n" << endi;
00841       refineDone = 1;
00842     }
00843   }
00844 
00845   // Scan up, until we find a refine that works
00846   while (!refineDone) {
00847     if (maxOverload - minOverload <= 1)
00848       refineDone = 1;
00849     else {
00850       curOverload = (maxOverload + minOverload ) / 2;
00851 
00852       overLoad = curOverload * overloadStep + overloadStart;
00853 #if LDB_DEBUG 
00854       iout << iINFO << "Testing curOverload " << curOverload 
00855            << "=" << overLoad << " [min,max]=" 
00856            << minOverload << ", " << maxOverload
00857            << "\n" << endi;
00858 #endif     
00859       if (refine())
00860         maxOverload = curOverload;
00861       else
00862         minOverload = curOverload;
00863     }
00864   }
00865 
00866 }
00867 
00868 void Rebalancer::printResults()
00869 {
00870   iout << iINFO << "ready to print result \n" << "\n";
00871 }
00872 
00873 
00874 void Rebalancer::printLoads(int phase)  // 0=nocollective, 1=initial, 2=proxies, 3=final
00875 {
00876 
00877    int i, total = 0, numBytes = 0;
00878    double max;
00879    int maxproxies = 0;
00880    int maxpatchproxies = 0;
00881    double avgBgLoad =0.0;
00882 
00883    for (i=0; i<P; i++) {
00884       int nproxies = processors[i].proxies.numElements() - 
00885                         processors[i].patchSet.numElements();
00886       total += nproxies;
00887       if ( nproxies > maxproxies ) maxproxies = nproxies;
00888       avgBgLoad += processors[i].backgroundLoad;
00889       Iterator p;
00890       int count = 0;
00891     
00892       patchInfo *patch = (patchInfo *) processors[i].patchSet.iterator(&p);
00893       while (patch)
00894       {
00895          int myProxies;
00896          myProxies = patch->proxiesOn.numElements()-1;
00897          if ( myProxies > maxpatchproxies ) maxpatchproxies = myProxies;
00898          numBytes += myProxies *patch->numAtoms*bytesPerAtom;
00899          count += myProxies;
00900          patch = (patchInfo *)processors[i].patchSet.next(&p);
00901       }
00902    }
00903 
00904    avgBgLoad /= P;
00905    computeAverage();
00906    max = computeMax();
00907 
00908   if ( P == CkNumPes() ) {
00909    iout << "LDB:";
00910    if ( P != CkNumPes() ) {
00911      int w = 1;   
00912      int maxinw = 10;
00913      while ( maxinw < CkNumPes() ) {
00914        ++w;
00915        maxinw = 10*maxinw;
00916      }
00917      iout << " PES " <<
00918              std::setw(w) << std::right << processors[0].Id << "-" <<
00919              std::setw(w) << std::left  << processors[P-1].Id <<
00920              std::right;
00921    }
00922    iout << " TIME " << CmiWallTimer() << " LOAD: AVG " << averageLoad 
00923      << " MAX " << max << "  PROXIES: TOTAL " << total << " MAXPE " << 
00924      maxproxies << " MAXPATCH " << maxpatchproxies << " " << strategyName 
00925      << " MEM: " << memusage_MB() << " MB\n" << endi;
00926    fflush(stdout);
00927   }
00928 
00929    if ( P != CkNumPes() ) {  // collect stats on pe 0
00930      switch ( phase ) {
00931      case 0:  // no collective
00932        NAMD_bug("Rebalancer::printLoads(0) called with hybrid balancer.");
00933      break;
00934      case 1:  // initial
00935        if ( collMsg ) NAMD_bug("Rebalancer::printLoads(1) collMsg not null.");
00936        collMsg = new CollectLoadsMsg;
00937        collMsg->reverted = 0;
00938        collMsg->firstPe = processors[0].Id;
00939        collMsg->lastPe = processors[P-1].Id;
00940        collMsg->initTime = CmiWallTimer();
00941        collMsg->initMemory = memusage_MB();
00942        collMsg->initAvgPeLoad = averageLoad;
00943        collMsg->initMaxPeLoad = max;
00944        collMsg->initTotalProxies = total;
00945        collMsg->initMaxPeProxies = maxproxies;
00946        collMsg->initMaxPatchProxies = maxpatchproxies;
00947      break;
00948      case 2:  // proxies (optional)
00949        if ( ! collMsg ) NAMD_bug("Rebalancer::printLoads(2) collMsg null.");
00950        collMsg->initAvgPeLoad = averageLoad;
00951        collMsg->initMaxPeLoad = max;
00952        collMsg->initTotalProxies = total;
00953        collMsg->initMaxPeProxies = maxproxies;
00954        collMsg->initMaxPatchProxies = maxpatchproxies;
00955      break;
00956      case 3:  // final
00957        if ( ! collMsg ) NAMD_bug("Rebalancer::printLoads(3) collMsg null.");
00958        collMsg->finalTime = CmiWallTimer();
00959        collMsg->finalMemory = memusage_MB();
00960        collMsg->finalAvgPeLoad = averageLoad;
00961        collMsg->finalMaxPeLoad = max;
00962        collMsg->finalTotalProxies = total;
00963        collMsg->finalMaxPeProxies = maxproxies;
00964        collMsg->finalMaxPatchProxies = maxpatchproxies;
00965        strncpy(collMsg->strategyName,strategyName,15);
00966        collMsg->strategyName[15] = 0;
00967      break;
00968      default:
00969        NAMD_bug("Rebalancer::printLoads() called with unknown phase.");
00970      }
00971    }
00972 
00973 }
00974 
00975 void Rebalancer::printSummary()
00976 {
00977    int i;
00978    // After refining, compute min, max and avg processor load
00979    double total = processors[0].load;
00980    double min = processors[0].load;
00981    int min_proc = 0;
00982    double max = processors[0].load;
00983    int max_proc = 0;
00984    for (i=1; i<P; i++) {
00985      total += processors[i].load;
00986      if (processors[i].load < min) {
00987        min = processors[i].load;
00988        min_proc = i;
00989      }
00990      if (processors[i].load > max) {
00991        max = processors[i].load;
00992        max_proc = i;
00993      }
00994    }
00995    iout << iINFO << "  min = " << min << " processor " << min_proc << "\n";
00996    iout << iINFO << "  max = " << max << " processor " << max_proc << "\n";
00997    iout << iINFO << "  total = " << total << " average = " << total/P << "\n";
00998    iout << iINFO << "Info about most overloaded processor " << max_proc << ": Load: " << processors[max_proc].load << " Bg Load: " << processors[max_proc].backgroundLoad << " Compute Load: " << processors[max_proc].computeLoad << " No of computes: " << processors[max_proc].computeSet.numElements() << " No. of proxies: " << processors[max_proc].proxies.numElements() << "\n" << endi; 
00999 }
01000 
01001 double Rebalancer::computeAverage()
01002 {
01003    int i;
01004    double total = 0.;
01005    for (i=0; i<numComputes; i++)
01006       total += computes[i].load;
01007 
01008    for (i=0; i<P; i++) {
01009       if (processors[i].available) {
01010         total += processors[i].backgroundLoad;
01011       }
01012    }
01013   
01014    if (numPesAvailable == 0) {
01015      CmiPrintf("Warning: no processors available for load balancing!\n");
01016      averageLoad = 0.0;
01017    }
01018    else 
01019      averageLoad = total/numPesAvailable;
01020    return averageLoad;
01021 }
01022 
01023 void Rebalancer::adjustBackgroundLoadAndComputeAverage()
01024 {
01025   // useful for AlgSeven when some loads start out as zero
01026 
01027    if (numPesAvailable == 0) {
01028      computeAverage();  // because otherwise someone will forget
01029      return;
01030    }
01031   
01032    int i;
01033    double bgtotal = 0.;
01034    for (i=0; i<P; i++) {
01035       if (processors[i].available) {
01036         bgtotal += processors[i].backgroundLoad;
01037       }
01038    }
01039    double bgavg = bgtotal / numPesAvailable;
01040 
01041    int nadjusted = 0;
01042    for (i=0; i<P; i++) {
01043       if (processors[i].available) {
01044         double bgload = processors[i].backgroundLoad;
01045         if ( bgload < bgavg ) {
01046           processors[i].backgroundLoad = bgavg;
01047           ++nadjusted;
01048         }
01049       }
01050    }
01051    // iout << iINFO << "Adjusted background load on " << nadjusted
01052    //     << " nodes.\n" << endi;
01053 
01054    computeAverage();  // because otherwise someone will forget
01055 }
01056 
01057 double Rebalancer::computeMax()
01058 {
01059    int i;
01060    double max = processors[0].load;
01061    for (i=1; i<P; i++)
01062    {
01063       if (processors[i].load > max) 
01064          max = processors[i].load;
01065    }
01066    return max;
01067 }
01068 
01069 int Rebalancer::isAvailableOn(patchInfo *patch, processorInfo *p)
01070 {
01071    return  patch->proxiesOn.find(p);
01072 }
01073 
01074 void Rebalancer::numAvailable(computeInfo *c, processorInfo *p,
01075            int *nPatches, int *nProxies, int *isBadForCommunication)
01076 {
01077    // return the number of proxy/home patches available on p for c (0, 1, 2)
01078    int realPe, index;
01079    int patch_count = 0;
01080    int proxy_count = 0;
01081 
01082   const int beginGroup = processors[0].Id;
01083   const int endGroup = beginGroup + P;
01084 
01085    patchInfo &pa1 = patches[c->patch1];
01086    patchInfo &pa2 = patches[c->patch2];
01087    int pa1_avail = 1;
01088    int pa2_avail = 1;
01089 
01090    if (pa1.processor == p->Id) {
01091      patch_count++;
01092    } else if ( pa1.proxiesOn.find(p) ) {
01093      proxy_count++;
01094    } else {
01095      pa1_avail = 0;
01096    }
01097 
01098    // self computes get one patch for free here
01099    if (c->patch1 == c->patch2 || pa2.processor == p->Id) {
01100      patch_count++;
01101    } else if ( pa2.proxiesOn.find(p) ) {
01102      proxy_count++;
01103    } else {
01104      pa2_avail = 0;
01105    }
01106 
01107    *nPatches = patch_count;
01108    *nProxies = proxy_count;
01109 
01110   if ( isBadForCommunication ) {  // skip work if pointer is null
01111    int bad = 0;
01112 
01113    if ( patch_count + proxy_count < 2 ) {
01114      double bgLoadLimit = 1.2 * averageLoad;
01115      if ( p->backgroundLoad > bgLoadLimit ) bad = 1;
01116      else {
01117        int proxiesPerPeLimit = numProxies / numPesAvailable + 3;
01118        if ( proxiesPerPeLimit < 6 ) proxiesPerPeLimit = 6;
01119 
01120        if ( p->proxies.numElements() > proxiesPerPeLimit ) bad = 1;
01121 
01122        int proxiesPerPatchLimit = numProxies / numPatches + 3;
01123        if ( proxiesPerPatchLimit < 6 ) proxiesPerPatchLimit = 6;
01124 
01125        if ( ! bad && ! pa1_avail ) {
01126          // HYBRID check for range in local group
01127          realPe = pa1.processor;
01128          if INGROUP(realPe) {
01129            index = realPe - beginGroup;
01130            //BACKUP if ( processors[pa1.processor].backgroundLoad > bgLoadLimit) bad = 1;
01131            if (processors[index].backgroundLoad > bgLoadLimit) bad = 1;
01132            else if ( pa1.proxiesOn.numElements() > proxiesPerPatchLimit ) bad = 1;
01133          } else bad = 1;  // patch has proxies we don't know about
01134        }
01135 
01136        if ( ! bad && ! pa2_avail ) {
01137          // HYBRID check for range in local group
01138          realPe = pa2.processor;
01139          if INGROUP(realPe) {
01140            index = realPe - beginGroup;
01141            // BACKUP if ( processors[pa2.processor].backgroundLoad > bgLoadLimit) bad = 1;
01142            if ( processors[index].backgroundLoad > bgLoadLimit) bad = 1;
01143            else if ( pa2.proxiesOn.numElements() > proxiesPerPatchLimit ) bad = 1;
01144          } else bad = 1;  // patch has proxies we don't know about
01145        }
01146 
01147      }
01148    }
01149 
01150    *isBadForCommunication = bad;
01151   }
01152 }
01153 
01154 void Rebalancer::createSpanningTree() {
01155   ProxyTree &pt = ProxyMgr::Object()->getPtree();
01156   Iterator nextP;
01157   processorInfo *p;
01158 #ifndef NODEAWARE_PROXY_SPANNINGTREE
01159   if(pt.sizes==NULL)
01160     pt.sizes = new int[numPatches];
01161 #endif
01162  
01163   if (pt.proxylist == NULL)
01164     pt.proxylist = new NodeIDList[numPatches];
01165   for(int i=0; i<numPatches; i++)
01166   {
01167     pt.proxylist[i].resize(patches[i].proxiesOn.numElements());
01168     nextP.id = 0;
01169     p = (processorInfo *)(patches[i].proxiesOn.iterator((Iterator *)&nextP));
01170     int j = 0;
01171     while(p) {
01172       //if (p->Id < 0)
01173       //  printf ("Inserting proxy on -ve processor %d for patch %d\n", p->Id, i);
01174 
01175       if (p->Id == (PatchMap::Object()->node(i))) {
01176         p = (processorInfo *)(patches[i].proxiesOn.next((Iterator *)&nextP));
01177         continue;
01178       }
01179 
01180       pt.proxylist[i][j] = p->Id;
01181       nextP.id++;
01182       p = (processorInfo *)(patches[i].proxiesOn.next((Iterator *)&nextP));
01183       j++;
01184     }
01185     pt.proxylist[i].resize(j);
01186   }
01187   CkPrintf("Done intialising\n");
01188 #ifdef NODEAWARE_PROXY_SPANNINGTREE
01189   ProxyMgr::Object()->buildNodeAwareSpanningTree0();
01190 #else
01191   ProxyMgr::Object()->buildSpanningTree0();
01192 #endif
01193 }
01194 
01195 void Rebalancer::decrSTLoad() {
01196   int pe;
01197   ProxyTree &pt = ProxyMgr::Object()->getPtree();
01198   for(int i=0; i<numPatches; i++)
01199     for(int j=1; j<pt.proxylist[i].size() && j<proxySpanDim; j++) {
01200       pe = pt.proxylist[i][j];
01201       processors[pe].load -= ST_NODE_LOAD;
01202       processors[pe].backgroundLoad -= ST_NODE_LOAD;
01203       if(processors[pe].load < 0.0)
01204         processors[pe].load = 0.0;
01205       if(processors[pe].backgroundLoad < 0.0)
01206         processors[pe].backgroundLoad = 0.0;
01207     }
01208 }
01209 
01210 void Rebalancer::incrSTLoad() {
01211   int pe;
01212   ProxyTree &pt = ProxyMgr::Object()->getPtree();
01213   for(int i=0; i<numPatches; i++)
01214     for(int j=1; j<pt.proxylist[i].size() && j<proxySpanDim; j++) {
01215       pe = pt.proxylist[i][j];
01216       processors[pe].load += ST_NODE_LOAD;
01217       processors[pe].backgroundLoad += ST_NODE_LOAD;
01218     }
01219 }
01220 

Generated on Fri Sep 22 01:17:14 2017 for NAMD by  doxygen 1.4.7