Main Page | Namespace List | Class Hierarchy | Alphabetical List | Class List | File List | Namespace Members | Class Members | File Members

Rebalancer.C

Go to the documentation of this file.
00001 
00007 /*****************************************************************************
00008  * $Source: /home/cvs/namd/cvsroot/namd2/src/Rebalancer.C,v $
00009  * $Author: jim $
00010  * $Date: 2013/06/07 22:34:38 $
00011  * $Revision: 1.98 $
00012  *****************************************************************************/
00013 
00014 #include "InfoStream.h"
00015 #include "Node.h"
00016 #include "Rebalancer.h"
00017 #include "ProxyMgr.h"
00018 #include "PatchMap.h"
00019 #include "memusage.h"
00020 #include <iomanip>
00021 
00022 #define ST_NODE_LOAD            0.005
00023 #define PROXY_LOAD              0.001
00024 #define COMPUTE_LOAD            0.00005
00025 
00026 Rebalancer::Rebalancer(computeInfo *computeArray, patchInfo *patchArray,
00027       processorInfo *processorArray, int nComps, int nPatches, int nPes)
00028 {
00029    bytesPerAtom = 32;
00030    strategyName = "None";
00031    computes = computeArray;
00032    patches =  patchArray;
00033    processors =  processorArray;
00034    numComputes = nComps;
00035    numPatches = nPatches;
00036    P = nPes;
00037    pes = NULL;
00038    computePairHeap = NULL;
00039    computeSelfHeap = NULL;
00040    computeBgPairHeap = NULL;
00041    computeBgSelfHeap = NULL;
00042    overLoad = 0.;
00043    numPesAvailable = 0;
00044    firstAssignInRefine = 0;
00045 
00046   const int beginGroup = processors[0].Id;
00047   const int endGroup = beginGroup + P;
00048 #define INGROUP(PROC) ((PROC) >= beginGroup && (PROC) < endGroup)
00049 
00050    int i;
00051    int index;
00052    for (i=0; i<P; i++)
00053    {
00054       // For testing only...
00055       // processors[i].backgroundLoad = 0;
00056       // End of test section
00057       processors[i].load = processors[i].backgroundLoad;
00058       processors[i].computeLoad = 0;
00059       if (processors[i].available) {
00060         numPesAvailable += 1;
00061       }
00062    }
00063 
00064    for (i=0; i<nPatches; i++) {
00065      // Only for those patches which are in my group (hierarchical case)
00066      if INGROUP(patches[i].processor) {
00067        index = patches[i].processor - beginGroup;
00068        if (!patches[i].proxiesOn.find(&(processors[index]))) {
00069          patches[i].proxiesOn.unchecked_insert(&(processors[index]));
00070          processors[index].proxies.unchecked_insert(&(patches[i]));
00071        }
00072        processors[index].patchSet.unchecked_insert(&patches[i]);
00073      }
00074    }                      
00075 
00076    InitProxyUsage();
00077 
00078    for (i=0; i<numComputes; i++)
00079       computeArray[i].processor = -1;
00080 
00081    for (i=0; i < numComputes; i++) {
00082      // Only for those computes which are in my group (hierarchical case)
00083      if INGROUP(computes[i].oldProcessor) {
00084        index = computes[i].oldProcessor - beginGroup;
00085        processors[index].computeLoad += computes[i].load;
00086      }
00087    }
00088 
00089    // Added 4-29-98: Temporarily adds the compute load to the background
00090    // load so that the correct value for the total load can be displayed.
00091    float *temploads = new float[P];
00092    for(i=0; i<P; i++)
00093    {
00094       temploads[i] = processors[i].load;
00095       processors[i].load += processors[i].computeLoad;
00096    }
00097 
00098    origMaxLoad = computeMax();
00099 
00100    // iout << iINFO << "Initial load" << "\n";
00101    printLoads();
00102 
00103    for(i=0;i<P; i++)
00104    {
00105       processors[i].load = temploads[i];
00106       processors[i].computeLoad = 0;
00107    }
00108    
00109    delete [] temploads;
00110 
00111    // int count1=0, count2=0;
00112    // for (i=0; i<nPatches; i++)
00113    // {
00114    //    if (patches[i].proxiesOn.numElements() <= 1)
00115    //    count1++;
00116    //    else count2++;
00117    // }                   
00118    // iout << iINFO << "Count1 = " << count1
00119    //      << "Count2 = " << count2
00120    //      << "\n" << std::endl;
00121    // 
00122    // for (i=0; i <P; i++) 
00123    // {
00124    //    iout << iINFO << "\n proxies on proc. " << i << " are for patches:";
00125    //    processorArray[i].proxies->print();
00126    // }
00127    // 
00128    // iout << iINFO <<"\n" << endi;
00129    // strategy();
00130 
00131    // for (i=0; i<nPatches; i++)
00132    // {
00133    //    iout << "patch " << i << " on processor " << patches[i].processor << "\n" << endi;
00134    // }
00135 }
00136 
00137 Rebalancer::~Rebalancer()
00138 {
00139   if ( computeMax() > origMaxLoad ) {
00140    iout << "LDB:";
00141    if ( P != CkNumPes() ) {
00142      int w = 1;   
00143      int maxinw = 10;
00144      while ( maxinw < CkNumPes() ) {
00145        ++w;
00146        maxinw = 10*maxinw;
00147      }
00148      iout << " PES " <<
00149              std::setw(w) << std::right << processors[0].Id << "-" <<
00150              std::setw(w) << std::left  << processors[P-1].Id <<
00151              std::right;
00152    }
00153    iout << " Reverting to original mapping\n" << endi;
00154    fflush(stdout);
00155    const int beginGroup = processors[0].Id;
00156    const int endGroup = beginGroup + P;
00157    for (int i=0; i < numComputes; i++) {
00158      // Only for those computes which are in my group (hierarchical case)
00159      if INGROUP(computes[i].oldProcessor) {
00160        computes[i].processor = computes[i].oldProcessor;
00161      }
00162    }
00163   }
00164 
00165   //for(int i=0; i<P; i++)
00166   //  delete [] processors[i].proxyUsage;
00167    delete pes;
00168    delete computePairHeap;
00169    delete computeSelfHeap;
00170    delete computeBgPairHeap;
00171    delete computeBgSelfHeap;
00172 }
00173 
00174 // Added 4-29-98: array proxyUsage on each processor keeps track of 
00175 // how many computes are accessing each proxy on the processor.  If
00176 // no computes are accessing it, the proxy can be removed in DeAssign
00177 void Rebalancer::InitProxyUsage()
00178 {
00179    int i;
00180    numProxies = 0;
00181 
00182    for(i=0; i<P; i++) {
00183      //processors[i].proxyUsage = new unsigned char [numPatches];
00184      //for(int j=0; j<numPatches; j++)
00185      //{
00186      //  processors[i].proxyUsage[j] = 0;
00187      //}
00188 
00189       Iterator nextCompute;
00190       nextCompute.id = 0;
00191 
00192       computeInfo *c = (computeInfo *)
00193          processors[i].computeSet.iterator((Iterator *)&nextCompute);
00194 
00195       while(c)
00196       {
00197         /* int n1 = */ //processors[i].proxyUsage[c->patch1]++;
00198         proxyUsage.increment (i, c->patch1); 
00199         /* int n2 = */ //processors[i].proxyUsage[c->patch2]++;
00200         proxyUsage.increment (i, c->patch2); 
00201 
00202          // iout << iINFO  
00203          // << "Assigning compute " << c->Id << " with work = " << c->load 
00204          // << " to processor " << processors[i].Id << "\n"
00205          // << "\tproxyUsage[" << c->patch1 << "]: " << n1 << " --> " << n1+1 << "\n";
00206          // << "\tproxyUsage[" << c->patch2 << "]: " << n2 << " --> " << n2+1 << "\n";
00207          // << std::endl;
00208 
00209          nextCompute.id++;
00210          c = (computeInfo *) processors[i].computeSet.next((Iterator *)&nextCompute);
00211       }
00212    }
00213 
00214   for (i=0; i<numPatches; i++)
00215   {
00216       numProxies += ( patches[i].proxiesOn.numElements() - 1 );
00217       Iterator nextProc;
00218       processorInfo *p = (processorInfo *)patches[i].proxiesOn.iterator((Iterator *)&nextProc);
00219       while (p) {
00220           //p->proxyUsage[i] += 1;
00221           proxyUsage.increment (p->Id, i);
00222           p = (processorInfo *)patches[i].proxiesOn.next((Iterator*)&nextProc);
00223       }
00224   }
00225 
00226 }
00227 
00228 
00229 void Rebalancer::strategy()
00230 {
00231    iout << iINFO << "Strategy not implemented for the base class.\n" << "\n";
00232 }
00233 
00234 void Rebalancer::makeHeaps()
00235 {
00236    int i, j;
00237 
00238    delete pes;
00239    pes = new minHeap(P+2);
00240    for (i=0; i<P; i++)
00241       pes->insert((InfoRecord *) &(processors[i]));
00242 
00243    delete computePairHeap;
00244    delete computeSelfHeap;
00245    delete computeBgPairHeap;
00246    delete computeBgSelfHeap;
00247 
00248    double bgLoadLimit = 0.5 * averageLoad;
00249    /*
00250    iout << iINFO << "Background load limit = " << bgLoadLimit << "\n";
00251    for (i=0; i<P; i++)
00252      if ( processors[i].backgroundLoad > bgLoadLimit )
00253        iout << iINFO << "Processor " << i << " background load = "
00254             << processors[i].backgroundLoad << "\n";
00255    iout << endi;
00256    */
00257 
00258    int numSelfComputes, numPairComputes, numBgSelfComputes, numBgPairComputes;
00259 
00260    while ( 1 ) {
00261     numSelfComputes = 0;
00262     numPairComputes = 0;
00263     numBgSelfComputes = 0;
00264     numBgPairComputes = 0;
00265     for (i=0; i<numComputes; i++) {
00266      int pa1 = computes[i].patch1;
00267      int pa2 = computes[i].patch2;
00268      if ( pa1 == pa2 ) {
00269         if ( processors[patches[pa1].processor].backgroundLoad > bgLoadLimit) {
00270           ++numBgSelfComputes;
00271         } else {
00272           ++numSelfComputes;
00273         }
00274      } else {
00275         if ( processors[patches[pa1].processor].backgroundLoad > bgLoadLimit
00276           || processors[patches[pa2].processor].backgroundLoad > bgLoadLimit) {
00277           ++numBgPairComputes;
00278         } else {
00279           ++numPairComputes;
00280         }
00281      }
00282     }
00283 
00284     int numBgComputes = numBgPairComputes + numBgSelfComputes;
00285 
00286     /*if ( numBgComputes ) {
00287         iout << iINFO << numBgComputes << " of " << numComputes
00288         << " computes have background load > " << bgLoadLimit << "\n" << endi;
00289     }*/
00290 
00291     if ( numBgComputes < 0.3 * numComputes ) break;
00292     else bgLoadLimit += 0.1 * averageLoad;
00293    }
00294 
00295    computePairHeap = new maxHeap(numPairComputes+2);
00296    computeSelfHeap = new maxHeap(numSelfComputes+2);
00297    computeBgPairHeap = new maxHeap(numBgPairComputes+2);
00298    computeBgSelfHeap = new maxHeap(numBgSelfComputes+2);
00299 
00300    for (i=0; i<numComputes; i++) {
00301      int pa1 = computes[i].patch1;
00302      int pa2 = computes[i].patch2;
00303      if ( pa1 == pa2 ) {
00304         if ( processors[patches[pa1].processor].backgroundLoad > bgLoadLimit) {
00305           computeBgSelfHeap->insert( (InfoRecord *) &(computes[i]));
00306         } else {
00307           computeSelfHeap->insert( (InfoRecord *) &(computes[i]));
00308         }
00309      } else {
00310         if ( processors[patches[pa1].processor].backgroundLoad > bgLoadLimit
00311           || processors[patches[pa2].processor].backgroundLoad > bgLoadLimit) {
00312           computeBgPairHeap->insert( (InfoRecord *) &(computes[i]));
00313         } else {
00314           computePairHeap->insert( (InfoRecord *) &(computes[i]));
00315         }
00316      }
00317    }
00318 
00319 /*
00320    delete computePairHeap;
00321    delete computeSelfHeap;
00322 
00323    int numSelfComputes = 0;
00324    for (i=0; i<numComputes; i++)
00325       if ( computes[i].patch1 == computes[i].patch2 ) ++numSelfComputes;
00326 
00327    computeSelfHeap = new maxHeap(numSelfComputes+2);
00328    computePairHeap = new maxHeap(numComputes-numSelfComputes+2);
00329 
00330    for (i=0; i<numComputes; i++)
00331       if ( computes[i].patch1 == computes[i].patch2 )
00332          computeSelfHeap->insert( (InfoRecord *) &(computes[i]));
00333       else
00334          computePairHeap->insert( (InfoRecord *) &(computes[i]));
00335 */
00336 }
00337 
00338 void Rebalancer::makeTwoHeaps()
00339 {
00340    int i, j;
00341 
00342    delete pes;
00343    pes = new minHeap(P+2);
00344    for (i=0; i<P; i++)
00345       pes->insert((InfoRecord *) &(processors[i]));
00346 
00347    delete computePairHeap;
00348    delete computeSelfHeap;
00349    delete computeBgPairHeap;
00350    delete computeBgSelfHeap;
00351 
00352    int numSelfComputes, numPairComputes;
00353 
00354    numSelfComputes = 0;
00355    numPairComputes = 0;
00356    for (i=0; i<numComputes; i++) {
00357      int pa1 = computes[i].patch1;
00358      int pa2 = computes[i].patch2;
00359      if (pa1 == pa2)
00360        ++numSelfComputes;
00361      else
00362        ++numPairComputes;
00363    }
00364 
00365    computePairHeap = new maxHeap(numPairComputes+2);
00366    computeSelfHeap = new maxHeap(numSelfComputes+2);
00367 
00368    for (i=0; i<numComputes; i++) {
00369      int pa1 = computes[i].patch1;
00370      int pa2 = computes[i].patch2;
00371      if ( pa1 == pa2 )
00372        computeSelfHeap->insert( (InfoRecord *) &(computes[i]));
00373      else
00374        computePairHeap->insert( (InfoRecord *) &(computes[i]));
00375    }
00376 }
00377 
00378 // not safe with hybrid balancer
00379 //void Rebalancer::assign(computeInfo *c, int processor)
00380 //{
00381 //   assign(c, &(processors[processor]));
00382 //}
00383 
00384 void Rebalancer::assign(computeInfo *c, processorInfo *p)
00385 {
00386    c->processor = p->Id;
00387    p->computeSet.unchecked_insert((InfoRecord *) c);
00388 #if COMPUTE_CORRECTION
00389    if(firstAssignInRefine)
00390      p->computeLoad += c->load + COMPUTE_LOAD;
00391    else
00392 #endif
00393      p->computeLoad += c->load;
00394      
00395    p->load = p->computeLoad + p->backgroundLoad;
00396    patchInfo* patch1 = (patchInfo *) &(patches[c->patch1]);
00397    patchInfo* patch2 = (patchInfo *) &(patches[c->patch2]);
00398 
00399    if (!patch1->proxiesOn.find(p)) {
00400      p->proxies.unchecked_insert(patch1); 
00401      patch1->proxiesOn.unchecked_insert(p); 
00402      numProxies++;
00403 #if PROXY_CORRECTION
00404      if(firstAssignInRefine) {
00405        processors[p->Id].load += PROXY_LOAD;
00406        processors[p->Id].backgroundLoad += PROXY_LOAD;
00407      }
00408 #endif
00409    }
00410 
00411    if (!patch2->proxiesOn.find(p)) {
00412      p->proxies.unchecked_insert(patch2); 
00413      patch2->proxiesOn.unchecked_insert(p);
00414      numProxies++;
00415 #if PROXY_CORRECTION
00416      if(firstAssignInRefine) {
00417        processors[p->Id].load += PROXY_LOAD;
00418        processors[p->Id].backgroundLoad += PROXY_LOAD;
00419      }
00420 #endif
00421    }
00422    
00423    // 4-29-98: Added the following code to keep track of how many proxies
00424    // on each processor are being used by a compute on that processor
00425    /* int n1 = */ //p->proxyUsage[c->patch1]++;
00426    proxyUsage.increment (p->Id, c->patch1);
00427    /* int n2 = */ //p->proxyUsage[c->patch2]++;
00428    proxyUsage.increment (p->Id, c->patch2);
00429 
00430    // iout << iINFO  
00431    // << "Assigning compute " << c->Id << " with work = " << c->load 
00432    // << " to processor " << p->Id << "\n"
00433    // << "\tproxyUsage[" << c->patch1 << "]: " << n1 << " --> " << n1+1 << "\n"
00434    // << "\tproxyUsage[" << c->patch2 << "]: " << n2 << " --> " << n2+1 << "\n"
00435    // << std::endl;
00436 
00437 #if 0
00438    iout << "Assign " << c->Id << " patches " << c->patch1 << " " << c->patch2
00439         << " load " << c->load << " to " << p->Id << " new load "
00440         << p->load << " background " << p->backgroundLoad
00441         << " nPatches " << nPatches << " nProxies " << nProxies;
00442    if ( nPatches + nProxies < 2 ) iout << " addProxy";
00443    if ( badForComm ) iout << " badForComm";
00444    iout << "\n" << endi;
00445 #endif
00446 }
00447 
00448 void  Rebalancer::deAssign(computeInfo *c, processorInfo *p)
00449 {
00450    if (!p->computeSet.remove(c))  {
00451       iout << iINFO << "ERROR: Rebalancer tried to deAssign an object that is not on the processor.\n" << endi;
00452       return;
00453    }
00454 
00455    double temp_load = 0.0;
00456 
00457    c->processor = -1;
00458    p->computeLoad -= c->load;
00459    CmiAssert(p->computeLoad >= 0.0);
00460    temp_load = p->load - c->load;
00461    p->load = p->computeLoad + p->backgroundLoad;
00462    CmiAssert( fabs(temp_load - p->load) < 0.001 );
00463 
00464    // 4-29-98: Added the following code to keep track of how many proxies 
00465    // on each processor are being used by a compute on that processor.
00466    // If no computes are using the proxy, it should be removed if it is not
00467    // on the processor that its patch is on.
00468    /* int n1 = */ //p->proxyUsage[c->patch1]--;
00469    proxyUsage.decrement (p->Id, c->patch1);
00470    /* int n2 = */ //p->proxyUsage[c->patch2]--;
00471    proxyUsage.decrement (p->Id, c->patch2);
00472 
00473    // iout << iINFO
00474    // << "De-assigning compute " << c->Id << " from processor " << p->Id << "\n"
00475    // << "\tproxyUsage[" << c->patch1 << "]: " << n1 << " --> " << n1-1 << "\n"
00476    // << "\tproxyUsage[" << c->patch2 << "]: " << n2 << " --> " << n2-1 << "\n"
00477    // << std::endl;
00478 
00479    //if(p->proxyUsage[c->patch1] <= 0 && p->Id != patches[c->patch1].processor)
00480    if(proxyUsage.getVal(p->Id, c->patch1) <= 0 && p->Id != patches[c->patch1].processor)
00481    {
00482       // iout << iINFO 
00483       // << "REMOVING PROXY " << c->patch1 << " FROM PROCESSOR " << p->Id 
00484       // << std::endl << endl;
00485 
00486       patchInfo* patch1 = (patchInfo *) &(patches[c->patch1]);
00487       p->proxies.remove(patch1);
00488       patch1->proxiesOn.remove(p);
00489       numProxies--;
00490 #if PROXY_CORRECTION
00491       if(firstAssignInRefine) {
00492         processors[p->Id].load -= PROXY_LOAD;
00493         processors[p->Id].backgroundLoad -= PROXY_LOAD;
00494         if(processors[p->Id].backgroundLoad < 0.0) {
00495           processors[p->Id].backgroundLoad = 0.0;
00496           processors[p->Id].load += PROXY_LOAD;
00497         }
00498       }
00499 #endif
00500    }
00501    
00502    //if(p->proxyUsage[c->patch2] <= 0 && p->Id != patches[c->patch2].processor)
00503    if(proxyUsage.getVal(p->Id, c->patch2) <= 0 && p->Id != patches[c->patch2].processor)
00504    {
00505       // iout << iINFO
00506       // << "REMOVING PROXY " << c->patch1 << " FROM PROCESSOR " << p->Id 
00507       // << std::endl << endl;
00508 
00509       patchInfo* patch2 = (patchInfo *) &(patches[c->patch2]);
00510       p->proxies.remove(patch2);
00511       patch2->proxiesOn.remove(p);
00512       numProxies--;
00513 #if PROXY_CORRECTION
00514       if(firstAssignInRefine) {
00515         processors[p->Id].load -= PROXY_LOAD;
00516         processors[p->Id].backgroundLoad -= PROXY_LOAD;
00517         if(processors[p->Id].backgroundLoad < 0.0) {
00518           processors[p->Id].backgroundLoad = 0.0;
00519           processors[p->Id].load += PROXY_LOAD;
00520         }
00521       }
00522 #endif
00523    }
00524 }
00525 
00526 void Rebalancer::refine_togrid(pcgrid &grid, double thresholdLoad,
00527                         processorInfo *p, computeInfo *c) {
00528 
00529   if(p->available == false) return;
00530 
00531   if ( c->load + p->load < thresholdLoad) {
00532     int nPatches, nProxies, badForComm;
00533     numAvailable(c,p,&nPatches,&nProxies,&badForComm);
00534 
00535     // if ( badForComm ) return;
00536 
00537     pcpair *pair = &grid[nPatches][nProxies][badForComm];
00538 
00539     if (! pair->c) {
00540       pair->c = c;
00541       pair->p = p;
00542     } else {
00543       double newval = p->load - c->load;
00544       if ( c->load + p->load < averageLoad ) {
00545          newval -= averageLoad;
00546       }
00547       double oldval = pair->p->load - pair->c->load;
00548       if ( pair->c->load + pair->p->load < averageLoad ) {
00549          oldval -= averageLoad;
00550       }
00551       if (newval < oldval) {
00552         pair->c = c;
00553         pair->p = p;
00554       }
00555     }
00556   }
00557 }
00558 
00559 int Rebalancer::refine()
00560 {
00561    int finish = 1;
00562    int no_new_proxies = 0;  // set to true if new proxies are futile
00563    maxHeap *heavyProcessors = new maxHeap(P);
00564 
00565    IRSet *lightProcessors = new IRSet();
00566    int i;
00567    double thresholdLoad = overLoad * averageLoad;
00568    for (i=0; i<P; i++)
00569    {
00570       // iout << iINFO << "\n Computes on processor " << i << " ";
00571       // processors[i].computeSet->print();
00572       // iout << iINFO << "\n" << endi;
00573       if (processors[i].load > thresholdLoad)
00574          heavyProcessors->insert((InfoRecord *) &(processors[i]));
00575       else lightProcessors->insert((InfoRecord *) &(processors[i]));
00576    }
00577 
00578 #if LDB_DEBUG
00579    iout << "\nBefore Refinement Summary" << "\n";
00580    printSummary();
00581 #endif
00582 
00583    int done = 0;
00584    while (!done)
00585    {
00586       // processorInfo *donor = (processorInfo *) heavyProcessors->deleteMax();
00587       /* Keep selecting new donors, until we find one with some compute to
00588        * migrate
00589        */
00590 /*
00591       computeInfo* c=0;
00592       while (donor && !c) {
00593         Iterator nextCompute;
00594         nextCompute.id = 0;
00595         c = (computeInfo *) donor->
00596             computeSet.iterator((Iterator *)&nextCompute);
00597         if (!c) {
00598           iout << iINFO << "Ignoring donor " << donor->Id
00599                << " because no computes\n" << endi;
00600           donor = (processorInfo*)heavyProcessors->deleteMax();
00601         }
00602       };
00603 */
00604 
00605       processorInfo *donor;
00606       while (donor = (processorInfo*)heavyProcessors->deleteMax()) {
00607         if (donor->computeSet.hasElements()) break;
00608         if ( ! no_new_proxies ) {
00609           /*
00610           iout << iINFO << "Most-loaded processor " << donor->Id
00611                << " (" << donor->patchSet.numElements() << " patches, "
00612                << donor->proxies.numElements() << " proxies)"
00613                << " has no migratable work.\n" << endi;
00614           */
00615           no_new_proxies = 1;  // New proxies would not improve load balance.
00616         }
00617       }
00618   
00619       if (!donor) break;  // No donors found at all! Give up 
00620 
00621       pcgrid grid;
00622 #define REASSIGN(GRID) if (GRID.c) { \
00623            deAssign(GRID.c, donor); \
00624            assign(GRID.c, GRID.p); \
00625            bestP = GRID.p; \
00626         }
00627 
00628       // try for at least one proxy
00629       {
00630         Iterator nextCompute;
00631         nextCompute.id = 0;
00632         computeInfo *c = (computeInfo *) 
00633            donor->computeSet.iterator((Iterator *)&nextCompute);
00634         while (c)
00635         {
00636           Iterator nextProc;
00637           processorInfo *p;
00638 
00639           p = &processors[patches[c->patch1].processor];
00640           refine_togrid(grid, thresholdLoad, p, c);
00641 
00642           if (c->patch1 != c->patch2)
00643           {
00644           p = &processors[patches[c->patch2].processor];
00645           refine_togrid(grid, thresholdLoad, p, c);
00646           }
00647 
00648           p = (processorInfo *)patches[c->patch1].
00649                                 proxiesOn.iterator((Iterator *)&nextProc);
00650           while (p) {
00651             refine_togrid(grid, thresholdLoad, p, c);
00652             p = (processorInfo *)patches[c->patch1].
00653                                 proxiesOn.next((Iterator*)&nextProc);
00654           }
00655 
00656           if (c->patch1 != c->patch2) 
00657           {
00658           p = (processorInfo *)patches[c->patch2].
00659                                 proxiesOn.iterator((Iterator *)&nextProc);
00660           while (p) {
00661             refine_togrid(grid, thresholdLoad, p, c);
00662             p = (processorInfo *)patches[c->patch2].
00663                                 proxiesOn.next((Iterator*)&nextProc);
00664           }
00665           }
00666 
00667           nextCompute.id++;
00668           c = (computeInfo *) donor->computeSet.
00669             next((Iterator *)&nextCompute);
00670         }
00671         processorInfo* bestP = 0;
00672         // prefer proxies to home patches
00673         REASSIGN(grid[0][2][0])
00674         else REASSIGN(grid[1][1][0])
00675         else REASSIGN(grid[2][0][0])
00676         else if ( no_new_proxies ) { finish = 0; break; }
00677         else REASSIGN(grid[0][1][0])
00678         else REASSIGN(grid[1][0][0])
00679         else REASSIGN(grid[0][0][0])
00680         // else REASSIGN(grid[0][1][1])
00681         // else REASSIGN(grid[1][0][1])
00682         // else REASSIGN(grid[0][0][1])
00683         if (bestP) {
00684           if (bestP->load > averageLoad) lightProcessors->remove(bestP);
00685           if (donor->load > thresholdLoad)
00686                 heavyProcessors->insert((InfoRecord *) donor);
00687           else lightProcessors->insert((InfoRecord *) donor);
00688           continue;
00689         }
00690       }
00691 
00692       if ( no_new_proxies ) iout << iINFO
00693          << "ERROR: Rebalancer::refine() algorithm is broken.\n" << endi;
00694 
00695       // no luck, do it the long way
00696 
00697       //find the best pair (c,receiver)
00698       Iterator nextProcessor;
00699       processorInfo *p = (processorInfo *) 
00700       lightProcessors->iterator((Iterator *) &nextProcessor);
00701 
00702       while (p)
00703       {
00704          Iterator nextCompute;
00705          nextCompute.id = 0;
00706          computeInfo *c = (computeInfo *) 
00707             donor->computeSet.iterator((Iterator *)&nextCompute);
00708          while (c)
00709          {
00710 #if USE_TOPOMAP
00711            int flag = tmgr.areNeighbors(p->Id, patches[c->patch1].processor, 
00712                                      patches[c->patch2].processor, 8);
00713            if(flag)
00714 #endif
00715              {       
00716                refine_togrid(grid, thresholdLoad, p, c);
00717              }
00718            nextCompute.id++;
00719            c = (computeInfo *) donor->computeSet.
00720              next((Iterator *)&nextCompute);
00721          }
00722          p = (processorInfo *) 
00723            lightProcessors->next((Iterator *) &nextProcessor);
00724       }
00725 
00726       //we have narrowed the choice to 6 candidates.
00727       // prefer proxies to home patches
00728       {
00729         processorInfo* bestP = 0;
00730         REASSIGN(grid[0][2][0])
00731         else REASSIGN(grid[1][1][0])
00732         else REASSIGN(grid[2][0][0])
00733         else REASSIGN(grid[0][1][0])
00734         else REASSIGN(grid[1][0][0])
00735         else REASSIGN(grid[0][0][0])
00736         // else REASSIGN(grid[0][1][1])
00737         // else REASSIGN(grid[1][0][1])
00738         // else REASSIGN(grid[0][0][1])
00739         else { finish = 0; break; }
00740         if (bestP->load > averageLoad) lightProcessors->remove(bestP);
00741         if (donor->load > thresholdLoad)
00742                 heavyProcessors->insert((InfoRecord *) donor);
00743         else lightProcessors->insert((InfoRecord *) donor);
00744       }
00745 
00746    }  
00747 
00748 #if LDB_DEBUG
00749    iout << "After Refinement Summary" << "\n";
00750    printSummary();
00751 
00752    if (!finish) {
00753      iout << iINFO << "Refine: No solution found for overLoad = " 
00754           << overLoad << "\n" << endi;
00755    }
00756 #endif
00757 
00758    delete heavyProcessors;
00759    delete lightProcessors;
00760 
00761    return finish;
00762 }
00763 
00764 // this binary search refinement procedure assume you already assigned computes
00765 // to their processors before calling this!!
00766 void Rebalancer::multirefine(double overload_start)
00767 {
00768   // The New refinement procedure.  This is identical to the code in
00769   // RefineOnly.C, and probably should be merged with that code to form
00770   // a binary-search function
00771 
00772   double avg = computeAverage();
00773   double max = computeMax();
00774 
00775 #if LDB_DEBUG
00776   iout << "******** Processors with background load > average load ********" << "\n";
00777 #endif
00778 
00779   int numOverloaded = 0;
00780   for (int ip=0; ip<P; ip++) {
00781     if ( processors[ip].backgroundLoad > averageLoad ) {
00782       ++numOverloaded;
00783 #if LDB_DEBUG
00784       iout << iINFO << "Info about proc " << ip << ": Load: " << processors[ip].load << " Bg Load: " << processors[ip].backgroundLoad << " Compute Load: " << processors[ip].computeLoad << " No of computes: " << processors[ip].computeSet.numElements() << "\n";
00785 #endif
00786     }
00787   }
00788   if ( numOverloaded ) {
00789     iout << iWARN << numOverloaded
00790       << " processors are overloaded due to high background load.\n" << endi;
00791   }
00792 #if LDB_DEBUG
00793   iout << "******** Processor List Ends ********" << "\n\n";
00794 #endif
00795 
00796   const double overloadStep = 0.01;
00797   const double overloadStart = overload_start;       //1.05;
00798   double dCurOverload = max / avg;
00799   
00800   int minOverload = 0;   //Min overload should be 1.05 ?
00801   int maxOverload = (int)((dCurOverload - overloadStart)/overloadStep + 1);
00802   double dMinOverload = minOverload * overloadStep + overloadStart;
00803   double dMaxOverload = maxOverload * overloadStep + overloadStart;
00804 
00805 #if LDB_DEBUG 
00806   iout << iINFO
00807        << "Balancing from " << minOverload << " = " << dMinOverload 
00808        << " to " << maxOverload << "=" << dMaxOverload 
00809        << " dCurOverload=" << dCurOverload << " max=" << max << " avg=" << avg
00810        << "\n" << endi;
00811 #endif
00812 
00813   int curOverload;
00814   int refineDone = 0;
00815 
00816   overLoad = dMinOverload;
00817   if (refine())
00818     refineDone = 1;
00819   else {
00820     overLoad = dMaxOverload;
00821     if (!refine()) {
00822       iout << iINFO << "ERROR: Could not refine at max overload\n" << endi;
00823       refineDone = 1;
00824     }
00825   }
00826 
00827   // Scan up, until we find a refine that works
00828   while (!refineDone) {
00829     if (maxOverload - minOverload <= 1)
00830       refineDone = 1;
00831     else {
00832       curOverload = (maxOverload + minOverload ) / 2;
00833 
00834       overLoad = curOverload * overloadStep + overloadStart;
00835 #if LDB_DEBUG 
00836       iout << iINFO << "Testing curOverload " << curOverload 
00837            << "=" << overLoad << " [min,max]=" 
00838            << minOverload << ", " << maxOverload
00839            << "\n" << endi;
00840 #endif     
00841       if (refine())
00842         maxOverload = curOverload;
00843       else
00844         minOverload = curOverload;
00845     }
00846   }
00847 
00848 }
00849 
00850 void Rebalancer::printResults()
00851 {
00852   iout << iINFO << "ready to print result \n" << "\n";
00853 }
00854 
00855 
00856 void Rebalancer::printLoads()
00857 {
00858 
00859    int i, total = 0, numBytes = 0;
00860    double max;
00861    int maxproxies = 0;
00862    int maxpatchproxies = 0;
00863    double avgBgLoad =0.0;
00864 
00865    for (i=0; i<P; i++) {
00866       int nproxies = processors[i].proxies.numElements() - 
00867                         processors[i].patchSet.numElements();
00868       total += nproxies;
00869       if ( nproxies > maxproxies ) maxproxies = nproxies;
00870       avgBgLoad += processors[i].backgroundLoad;
00871       Iterator p;
00872       int count = 0;
00873     
00874       patchInfo *patch = (patchInfo *) processors[i].patchSet.iterator(&p);
00875       while (patch)
00876       {
00877          int myProxies;
00878          myProxies = patch->proxiesOn.numElements()-1;
00879          if ( myProxies > maxpatchproxies ) maxpatchproxies = myProxies;
00880          numBytes += myProxies *patch->numAtoms*bytesPerAtom;
00881          count += myProxies;
00882          patch = (patchInfo *)processors[i].patchSet.next(&p);
00883       }
00884    }
00885 
00886    avgBgLoad /= P;
00887    computeAverage();
00888    max = computeMax();
00889 
00890    iout << "LDB:";
00891    if ( P != CkNumPes() ) {
00892      int w = 1;   
00893      int maxinw = 10;
00894      while ( maxinw < CkNumPes() ) {
00895        ++w;
00896        maxinw = 10*maxinw;
00897      }
00898      iout << " PES " <<
00899              std::setw(w) << std::right << processors[0].Id << "-" <<
00900              std::setw(w) << std::left  << processors[P-1].Id <<
00901              std::right;
00902    }
00903    iout << " TIME " << CmiWallTimer() << " LOAD: AVG " << averageLoad 
00904      << " MAX " << max << "  PROXIES: TOTAL " << total << " MAXPE " << 
00905      maxproxies << " MAXPATCH " << maxpatchproxies << " " << strategyName 
00906      << " MEM: " << memusage_MB() << " MB\n" << endi;
00907    fflush(stdout);
00908 
00909 }
00910 
00911 void Rebalancer::printSummary()
00912 {
00913    int i;
00914    // After refining, compute min, max and avg processor load
00915    double total = processors[0].load;
00916    double min = processors[0].load;
00917    int min_proc = 0;
00918    double max = processors[0].load;
00919    int max_proc = 0;
00920    for (i=1; i<P; i++) {
00921      total += processors[i].load;
00922      if (processors[i].load < min) {
00923        min = processors[i].load;
00924        min_proc = i;
00925      }
00926      if (processors[i].load > max) {
00927        max = processors[i].load;
00928        max_proc = i;
00929      }
00930    }
00931    iout << iINFO << "  min = " << min << " processor " << min_proc << "\n";
00932    iout << iINFO << "  max = " << max << " processor " << max_proc << "\n";
00933    iout << iINFO << "  total = " << total << " average = " << total/P << "\n";
00934    iout << iINFO << "Info about most overloaded processor " << max_proc << ": Load: " << processors[max_proc].load << " Bg Load: " << processors[max_proc].backgroundLoad << " Compute Load: " << processors[max_proc].computeLoad << " No of computes: " << processors[max_proc].computeSet.numElements() << " No. of proxies: " << processors[max_proc].proxies.numElements() << "\n" << endi; 
00935 }
00936 
00937 double Rebalancer::computeAverage()
00938 {
00939    int i;
00940    double total = 0.;
00941    for (i=0; i<numComputes; i++)
00942       total += computes[i].load;
00943 
00944    for (i=0; i<P; i++) {
00945       if (processors[i].available) {
00946         total += processors[i].backgroundLoad;
00947       }
00948    }
00949   
00950    if (numPesAvailable == 0) {
00951      CmiPrintf("Warning: no processors available for load balancing!\n");
00952      averageLoad = 0.0;
00953    }
00954    else 
00955      averageLoad = total/numPesAvailable;
00956    return averageLoad;
00957 }
00958 
00959 void Rebalancer::adjustBackgroundLoadAndComputeAverage()
00960 {
00961   // useful for AlgSeven when some loads start out as zero
00962 
00963    if (numPesAvailable == 0) {
00964      computeAverage();  // because otherwise someone will forget
00965      return;
00966    }
00967   
00968    int i;
00969    double bgtotal = 0.;
00970    for (i=0; i<P; i++) {
00971       if (processors[i].available) {
00972         bgtotal += processors[i].backgroundLoad;
00973       }
00974    }
00975    double bgavg = bgtotal / numPesAvailable;
00976 
00977    int nadjusted = 0;
00978    for (i=0; i<P; i++) {
00979       if (processors[i].available) {
00980         double bgload = processors[i].backgroundLoad;
00981         if ( bgload < bgavg ) {
00982           processors[i].backgroundLoad = bgavg;
00983           ++nadjusted;
00984         }
00985       }
00986    }
00987    // iout << iINFO << "Adjusted background load on " << nadjusted
00988    //     << " nodes.\n" << endi;
00989 
00990    computeAverage();  // because otherwise someone will forget
00991 }
00992 
00993 double Rebalancer::computeMax()
00994 {
00995    int i;
00996    double max = processors[0].load;
00997    for (i=1; i<P; i++)
00998    {
00999       if (processors[i].load > max) 
01000          max = processors[i].load;
01001    }
01002    return max;
01003 }
01004 
01005 int Rebalancer::isAvailableOn(patchInfo *patch, processorInfo *p)
01006 {
01007    return  patch->proxiesOn.find(p);
01008 }
01009 
01010 void Rebalancer::numAvailable(computeInfo *c, processorInfo *p,
01011            int *nPatches, int *nProxies, int *isBadForCommunication)
01012 {
01013    // return the number of proxy/home patches available on p for c (0, 1, 2)
01014    int realPe, index;
01015    int patch_count = 0;
01016    int proxy_count = 0;
01017 
01018   const int beginGroup = processors[0].Id;
01019   const int endGroup = beginGroup + P;
01020 
01021    patchInfo &pa1 = patches[c->patch1];
01022    patchInfo &pa2 = patches[c->patch2];
01023    int pa1_avail = 1;
01024    int pa2_avail = 1;
01025 
01026    if (pa1.processor == p->Id) {
01027      patch_count++;
01028    } else if ( pa1.proxiesOn.find(p) ) {
01029      proxy_count++;
01030    } else {
01031      pa1_avail = 0;
01032    }
01033 
01034    // self computes get one patch for free here
01035    if (c->patch1 == c->patch2 || pa2.processor == p->Id) {
01036      patch_count++;
01037    } else if ( pa2.proxiesOn.find(p) ) {
01038      proxy_count++;
01039    } else {
01040      pa2_avail = 0;
01041    }
01042 
01043    *nPatches = patch_count;
01044    *nProxies = proxy_count;
01045 
01046   if ( isBadForCommunication ) {  // skip work if pointer is null
01047    int bad = 0;
01048 
01049    if ( patch_count + proxy_count < 2 ) {
01050      double bgLoadLimit = 1.2 * averageLoad;
01051      if ( p->backgroundLoad > bgLoadLimit ) bad = 1;
01052      else {
01053        int proxiesPerPeLimit = numProxies / numPesAvailable + 3;
01054        if ( proxiesPerPeLimit < 6 ) proxiesPerPeLimit = 6;
01055 
01056        if ( p->proxies.numElements() > proxiesPerPeLimit ) bad = 1;
01057 
01058        int proxiesPerPatchLimit = numProxies / numPatches + 3;
01059        if ( proxiesPerPatchLimit < 6 ) proxiesPerPatchLimit = 6;
01060 
01061        if ( ! bad && ! pa1_avail ) {
01062          // HYBRID check for range in local group
01063          realPe = pa1.processor;
01064          if INGROUP(realPe) {
01065            index = realPe - beginGroup;
01066            //BACKUP if ( processors[pa1.processor].backgroundLoad > bgLoadLimit) bad = 1;
01067            if (processors[index].backgroundLoad > bgLoadLimit) bad = 1;
01068            else if ( pa1.proxiesOn.numElements() > proxiesPerPatchLimit ) bad = 1;
01069          } else bad = 1;  // patch has proxies we don't know about
01070        }
01071 
01072        if ( ! bad && ! pa2_avail ) {
01073          // HYBRID check for range in local group
01074          realPe = pa2.processor;
01075          if INGROUP(realPe) {
01076            index = realPe - beginGroup;
01077            // BACKUP if ( processors[pa2.processor].backgroundLoad > bgLoadLimit) bad = 1;
01078            if ( processors[index].backgroundLoad > bgLoadLimit) bad = 1;
01079            else if ( pa2.proxiesOn.numElements() > proxiesPerPatchLimit ) bad = 1;
01080          } else bad = 1;  // patch has proxies we don't know about
01081        }
01082 
01083      }
01084    }
01085 
01086    *isBadForCommunication = bad;
01087   }
01088 }
01089 
01090 void Rebalancer::createSpanningTree() {
01091   ProxyTree &pt = ProxyMgr::Object()->getPtree();
01092   Iterator nextP;
01093   processorInfo *p;
01094 #ifndef NODEAWARE_PROXY_SPANNINGTREE
01095   if(pt.sizes==NULL)
01096     pt.sizes = new int[numPatches];
01097 #endif
01098  
01099   if (pt.proxylist == NULL)
01100     pt.proxylist = new NodeIDList[numPatches];
01101   for(int i=0; i<numPatches; i++)
01102   {
01103     pt.proxylist[i].resize(patches[i].proxiesOn.numElements());
01104     nextP.id = 0;
01105     p = (processorInfo *)(patches[i].proxiesOn.iterator((Iterator *)&nextP));
01106     int j = 0;
01107     while(p) {
01108       //if (p->Id < 0)
01109       //  printf ("Inserting proxy on -ve processor %d for patch %d\n", p->Id, i);
01110 
01111       if (p->Id == (PatchMap::Object()->node(i))) {
01112         p = (processorInfo *)(patches[i].proxiesOn.next((Iterator *)&nextP));
01113         continue;
01114       }
01115 
01116       pt.proxylist[i][j] = p->Id;
01117       nextP.id++;
01118       p = (processorInfo *)(patches[i].proxiesOn.next((Iterator *)&nextP));
01119       j++;
01120     }
01121     pt.proxylist[i].resize(j);
01122   }
01123   CkPrintf("Done intialising\n");
01124 #ifdef NODEAWARE_PROXY_SPANNINGTREE
01125   ProxyMgr::Object()->buildNodeAwareSpanningTree0();
01126 #else
01127   ProxyMgr::Object()->buildSpanningTree0();
01128 #endif
01129 }
01130 
01131 void Rebalancer::decrSTLoad() {
01132   int pe;
01133   ProxyTree &pt = ProxyMgr::Object()->getPtree();
01134   for(int i=0; i<numPatches; i++)
01135     for(int j=1; j<pt.proxylist[i].size() && j<proxySpanDim; j++) {
01136       pe = pt.proxylist[i][j];
01137       processors[pe].load -= ST_NODE_LOAD;
01138       processors[pe].backgroundLoad -= ST_NODE_LOAD;
01139       if(processors[pe].load < 0.0)
01140         processors[pe].load = 0.0;
01141       if(processors[pe].backgroundLoad < 0.0)
01142         processors[pe].backgroundLoad = 0.0;
01143     }
01144 }
01145 
01146 void Rebalancer::incrSTLoad() {
01147   int pe;
01148   ProxyTree &pt = ProxyMgr::Object()->getPtree();
01149   for(int i=0; i<numPatches; i++)
01150     for(int j=1; j<pt.proxylist[i].size() && j<proxySpanDim; j++) {
01151       pe = pt.proxylist[i][j];
01152       processors[pe].load += ST_NODE_LOAD;
01153       processors[pe].backgroundLoad += ST_NODE_LOAD;
01154     }
01155 }
01156 

Generated on Wed Jun 19 04:08:19 2013 for NAMD by  doxygen 1.3.9.1