Main Page | Namespace List | Class Hierarchy | Alphabetical List | Class List | File List | Class Members | File Members

Sync.C

Go to the documentation of this file.
00001 
00008 /*
00009     Sync will ensure that all homepatches finished updating before Computes starts and all proxies finished updating themselves.
00010 */
00011 
00012 #if !defined(WIN32) || defined(__CYGWIN__)
00013 #include <unistd.h>
00014 #endif
00015 #include <stdio.h>
00016 
00017 #include "InfoStream.h"
00018 #include "Patch.h"
00019 #include "PatchMap.h"
00020 #include "ProxyMgr.h"
00021 #include "Compute.h"
00022 #include "ComputeMap.h"
00023 
00024 #include "Sync.h"
00025 
00026 #define MIN_DEBUG_LEVEL 3
00027 //#define DEBUGM
00028 #include "Debug.h"
00029 
00030 // make sure all HomePatches get their positions data and sendProxyData to 
00031 // their proxies before computes get positionsReady.
00032 
00033 // useProxySync will make sure all proxies get updated before computes' 
00034 // positionsReady triggered so that real computation starts.
00035 // when these two combined, it will make sure that homepatch get all its force 
00036 // and positions data, and proxies receive its updated data before all 
00037 // computes start.
00038 
00039 static int eventHoldComputes;
00040 static int eventReleaseComputes;
00041 
00042 Sync::Sync(): INCREASE(600), step(0), counter(0), homeReady(0)
00043 {
00044     if (CkpvAccess(Sync_instance) == NULL) {
00045         CkpvAccess(Sync_instance) = this;
00046     } else {
00047         iout << iFILE << iERROR << iPE
00048           << "Sync instanced twice on same processor!" << endi;
00049         CkExit();
00050     }
00051     capacity = INCREASE;
00052     clist = new _clist[capacity];
00053     cnum = 0;
00054     nPatcheReady = 0;
00055     numPatches = -1;
00056     eventHoldComputes = traceRegisterUserEvent("Sync::holdComputes", 133);
00057     eventReleaseComputes = traceRegisterUserEvent("Sync::releaseComputes", 134);
00058 }
00059 
00060 Sync::~Sync()
00061 {
00062   delete [] clist;
00063 }
00064 
00065 void Sync::openSync(void)
00066 {
00067   int reportPe = 1;
00068   while ( 2 * reportPe < CkNumPes() ) reportPe *= 2;
00069   step = -1;
00070   useSync = 1;
00071   useProxySync = 0;
00072   if (useSync) {
00073     // if use proxy spanning tree, proxy sync is forced
00074     if (!useProxySync && (proxySendSpanning || proxyRecvSpanning)
00075         && PatchMap::Object()->numPatches() < 4 * CkNumPes() ) {
00076       // If on BG/P, useProxySync should not be turned on for better performance
00077       #if !CMK_BLUEGENEP
00078       // CmiPrintf("[%d] useProxySync is turned on. \n", CkMyPe());
00079       useProxySync = 1;
00080       #endif
00081     }
00082 #if defined(NODEAWARE_PROXY_SPANNINGTREE) && defined(USE_NODEPATCHMGR)
00083     // immediate messages can be processed by any PE
00084     if (CkMyNodeSize() > 2) useProxySync = 0;
00085 #endif
00086     // no proxies on this node, no need to use proxy sync.
00087     if (useProxySync && ProxyMgr::Object()->numProxies() == 0) {
00088       // CmiPrintf("[%d] useProxySync is turned off because no proxy. \n", CkMyPe());
00089       useProxySync = 0;
00090     }
00091     // if no proxy sync and no home patch, then disable home patch sync as well
00092     if (!useProxySync && PatchMap::Object()->numHomePatches() == 0) useSync = 0;
00093   }
00094   if(CkMyPe() == reportPe)
00095     iout << iINFO << "useSync: " << useSync << " useProxySync: " << useProxySync << "\n" << endi;
00096 }    
00097 
00098 // called from Patch::positionsReady()
00099 int Sync::holdComputes(PatchID pid, ComputeIDListIter cid, int doneMigration, int seq)
00100 {
00101   if (!useSync) return 0;
00102   if (step < 0) step = seq;
00103   if (!useProxySync) {
00104     // only hold when homepatches are not ready
00105     PatchMap *patchMap = PatchMap::Object();
00106     if (homeReady && seq == step) {
00107       nPatcheReady++;
00108       triggerCompute();
00109       return 0;
00110     }
00111   }
00112   traceUserEvent(eventHoldComputes);
00113 
00114   int slot = 0;
00115   for (; slot < cnum; slot++)
00116      if (clist[slot].pid == -1) break;
00117   if (slot == cnum) {
00118     cnum++;
00119     // table is full, expand the list
00120     if (cnum == capacity) {
00121       capacity += INCREASE;
00122       struct _clist *tmp = new _clist[capacity];
00123       memcpy(tmp, clist, cnum*sizeof(_clist));
00124       delete [] clist;
00125       clist = tmp;
00126       //CmiPrintf("[%d] Info:: Sync buffer overflow and expanded!\n", CkMyPe());
00127     }
00128   }
00129 
00130   clist[slot].cid = cid;
00131   clist[slot].pid = pid;
00132   clist[slot].doneMigration  = doneMigration;
00133   clist[slot].step = seq;
00134 
00135 //  CkPrintf("REG[%d]: patch:%d step:%d-%d slot:%d\n", CkMyPe(), pid, patchMap->patch(pid)->flags.sequence, step, slot);
00136 
00137   if (clist[slot].step == step) {
00138       nPatcheReady++;
00139       triggerCompute();
00140   }
00141   return 1;
00142 }
00143 
00144 // called from HomePatch::positionsReady()
00145 void Sync::PatchReady(void)
00146 {
00147  if ( useSync ) {
00148   counter ++;
00149   triggerCompute();
00150  }
00151 }
00152 
00153 void Sync::releaseComputes()
00154 {
00155   PatchMap *patchMap = PatchMap::Object();
00156   ComputeMap *computeMap = ComputeMap::Object();
00157 
00158   traceUserEvent(eventReleaseComputes);
00159 
00160   for (int i= 0; i<cnum; i++) {
00161     int &pid = clist[i].pid;
00162     if (pid == -1) continue;
00163     if (clist[i].step != step) {
00164       continue;
00165     }
00166     //         CkPrintf(" %d-%d-%d ",
00167     //   clist[i].pid, clist[i].step,
00168     //      patchMap->patch(pid)->flags.sequence);
00169 
00170     ComputeIDListIter cid = clist[i].cid;
00171 
00172     int compute_count = 0;
00173     for(cid = cid.begin(); cid != cid.end(); cid++) {
00174       compute_count++;
00175       computeMap->compute(*cid)->patchReady(pid,clist[i].doneMigration,step);
00176     }
00177     if (compute_count == 0 && patchMap->node(pid) != CkMyPe()) {
00178            iout << iINFO << "PATCH_COUNT-Sync step " << step
00179                 << "]: Patch " << pid << " on PE " 
00180                 << CkMyPe() <<" home patch " 
00181                 << patchMap->node(pid) << " does not have any computes\n" 
00182                 << endi;
00183     }
00184     pid = -1;
00185   }
00186 //  CkPrintf("\n");
00187 }
00188 
00189 void Sync::triggerCompute()
00190 {
00191   PatchMap *patchMap = PatchMap::Object();
00192   const int numHomePatches = patchMap->numHomePatches();
00193 
00194   if (numPatches == -1) 
00195     numPatches = ProxyMgr::Object()->numProxies() + numHomePatches;
00196 
00197 // if (CkMyPe()<=8) CkPrintf("SYNC[%d]: PATCHREADY:%d %d patches:%d %d\n", CkMyPe(), counter, numHomePatches, nPatcheReady, numPatches);
00198 //  CkPrintf("SYNC[%d]: PATCHREADY:%d %d patches:%d %d\n", CkMyPe(), counter, PatchMap::Object()->numHomePatches(), nPatcheReady, numPatches);
00199 
00200   if (homeReady == 0 && counter >= numHomePatches) {
00201     homeReady = 1;
00202  // if (CkMyPe()<=8) CkPrintf("HOMEREADY[%d]\n", CkMyPe());
00203     if (!useProxySync)  releaseComputes();
00204   }
00205 
00206   if (homeReady && nPatcheReady == numPatches)
00207   {
00208 // if (CkMyPe()<=8) CkPrintf("TRIGGERED[%d]\n", CkMyPe());
00209 //     CkPrintf("TRIGGERED[%d]\n", CkMyPe());
00210     if (useProxySync) releaseComputes();
00211 
00212     // reset counter
00213     numPatches = -1;
00214     step++;
00215     nPatcheReady = 0;
00216     for (int i= 0; i<cnum; i++) {
00217       if (clist[i].pid != -1 && clist[i].step == step) ++nPatcheReady;
00218     }
00219     homeReady = 0;
00220     if ( numHomePatches ) {
00221       counter -= numHomePatches;
00222       if (counter >= numHomePatches) triggerCompute();
00223     }
00224   }
00225 }
00226 
00227 
00228 #if 0
00229 // hack for SUN MPCC
00230 // force compiler to instantiate ArrayElementT
00231 void *frightenCompilerIntoInstantiatingHack(void) {
00232   return new ArrayElementT<int>;
00233 }
00234 #endif
00235 
00236 
00237 #include "Sync.def.h"

Generated on Sun Feb 12 04:07:56 2012 for NAMD by  doxygen 1.3.9.1