Sync.C

Go to the documentation of this file.
00001 
00008 /*
00009     Sync will ensure that all homepatches finished updating before Computes starts and all proxies finished updating themselves.
00010 */
00011 
00012 #if !defined(WIN32) || defined(__CYGWIN__)
00013 #include <unistd.h>
00014 #endif
00015 #include <stdio.h>
00016 
00017 #include "InfoStream.h"
00018 #include "Patch.h"
00019 #include "PatchMap.h"
00020 #include "ProxyMgr.h"
00021 #include "Compute.h"
00022 #include "ComputeMap.h"
00023 
00024 #include "Sync.h"
00025 
00026 #define MIN_DEBUG_LEVEL 3
00027 //#define DEBUGM
00028 #include "Debug.h"
00029 
00030 // make sure all HomePatches get their positions data and sendProxyData to 
00031 // their proxies before computes get positionsReady.
00032 
00033 // useProxySync will make sure all proxies get updated before computes' 
00034 // positionsReady triggered so that real computation starts.
00035 // when these two combined, it will make sure that homepatch get all its force 
00036 // and positions data, and proxies receive its updated data before all 
00037 // computes start.
00038 
00039 static int eventHoldComputes;
00040 static int eventReleaseComputes;
00041 
00042 Sync::Sync(): INCREASE(600), step(0), counter(0), homeReady(0)
00043 {
00044     if (CkpvAccess(Sync_instance) == NULL) {
00045         CkpvAccess(Sync_instance) = this;
00046     } else {
00047         NAMD_bug("Sync instanced twice on same processor!");
00048     }
00049     capacity = INCREASE;
00050     clist = new _clist[capacity];
00051     cnum = 0;
00052     nPatcheReady = 0;
00053     numPatches = -1;
00054     eventHoldComputes = traceRegisterUserEvent("Sync::holdComputes", 133);
00055     eventReleaseComputes = traceRegisterUserEvent("Sync::releaseComputes", 134);
00056 }
00057 
00058 Sync::~Sync()
00059 {
00060   delete [] clist;
00061 }
00062 
00063 void Sync::openSync(void)
00064 {
00065   int reportPe = 1;
00066   while ( 2 * reportPe < CkNumPes() ) reportPe *= 2;
00067   step = -1;
00068   useSync = 1;
00069   if ( PatchMap::Object()->numPatches() >= 4 * CkNumPes() ) useSync = 0;
00070   if ( CmiNumNodes() < 2 ) useSync = 0;
00071   if ( CmiNumPhysicalNodes() < 2 ) useSync = 0;
00072 #if defined(NAMD_CUDA) || defined(NAMD_MIC)
00073   useSync = 0;
00074 #endif
00075   useProxySync = 0;
00076   if (useSync) {
00077     // if use proxy spanning tree, proxy sync is forced
00078     if (!useProxySync && (proxySendSpanning || proxyRecvSpanning)
00079         && PatchMap::Object()->numPatches() < 4 * CkNumPes() ) {
00080       // If on BG/P, useProxySync should not be turned on for better performance
00081       #if ! (CMK_BLUEGENEQ || CMK_BLUEGENEP)
00082       // CmiPrintf("[%d] useProxySync is turned on. \n", CkMyPe());
00083       useProxySync = 1;
00084       #endif
00085     }
00086 #if defined(NODEAWARE_PROXY_SPANNINGTREE) && defined(USE_NODEPATCHMGR)
00087     // immediate messages can be processed by any PE
00088     if (CkMyNodeSize() > 2) useProxySync = 0;
00089 #endif
00090     // no proxies on this node, no need to use proxy sync.
00091     if (useProxySync && ProxyMgr::Object()->numProxies() == 0) {
00092       // CmiPrintf("[%d] useProxySync is turned off because no proxy. \n", CkMyPe());
00093       useProxySync = 0;
00094     }
00095     // if no proxy sync and no home patch, then disable home patch sync as well
00096     if (!useProxySync && PatchMap::Object()->numHomePatches() == 0) useSync = 0;
00097   }
00098   if(CkMyPe() == reportPe)
00099     iout << iINFO << "useSync: " << useSync << " useProxySync: " << useProxySync << "\n" << endi;
00100 }    
00101 
00102 // called from Patch::positionsReady()
00103 int Sync::holdComputes(PatchID pid, Compute **cbegin, Compute **cend, int doneMigration, int seq)
00104 {
00105   if (!useSync) return 0;
00106   if (step < 0) step = seq;
00107   if (!useProxySync) {
00108     // only hold when homepatches are not ready
00109     PatchMap *patchMap = PatchMap::Object();
00110     if (homeReady && seq == step) {
00111       nPatcheReady++;
00112       triggerCompute();
00113       return 0;
00114     }
00115   }
00116   traceUserEvent(eventHoldComputes);
00117 
00118   int slot = 0;
00119   for (; slot < cnum; slot++)
00120      if (clist[slot].pid == -1) break;
00121   if (slot == cnum) {
00122     cnum++;
00123     // table is full, expand the list
00124     if (cnum == capacity) {
00125       capacity += INCREASE;
00126       struct _clist *tmp = new _clist[capacity];
00127       memcpy(tmp, clist, cnum*sizeof(_clist));
00128       delete [] clist;
00129       clist = tmp;
00130       //CmiPrintf("[%d] Info:: Sync buffer overflow and expanded!\n", CkMyPe());
00131     }
00132   }
00133 
00134   clist[slot].cbegin = cbegin;
00135   clist[slot].cend = cend;
00136   clist[slot].pid = pid;
00137   clist[slot].doneMigration  = doneMigration;
00138   clist[slot].step = seq;
00139 
00140 //  CkPrintf("REG[%d]: patch:%d step:%d-%d slot:%d\n", CkMyPe(), pid, patchMap->patch(pid)->flags.sequence, step, slot);
00141 
00142   if (clist[slot].step == step) {
00143       nPatcheReady++;
00144       triggerCompute();
00145   }
00146   return 1;
00147 }
00148 
00149 // called from HomePatch::positionsReady()
00150 void Sync::PatchReady(void)
00151 {
00152  if ( useSync ) {
00153   counter ++;
00154   triggerCompute();
00155  }
00156 }
00157 
00158 void Sync::releaseComputes()
00159 {
00160   PatchMap *patchMap = PatchMap::Object();
00161 
00162   traceUserEvent(eventReleaseComputes);
00163 
00164   for (int i= 0; i<cnum; i++) {
00165     int &pid = clist[i].pid;
00166     if (pid == -1) continue;
00167     if (clist[i].step != step) {
00168       continue;
00169     }
00170     //         CkPrintf(" %d-%d-%d ",
00171     //   clist[i].pid, clist[i].step,
00172     //      patchMap->patch(pid)->flags.sequence);
00173 
00174     Compute **cend = clist[i].cend;
00175     for(Compute **cid = clist[i].cbegin; cid != cend; cid++) {
00176       (*cid)->patchReady(pid,clist[i].doneMigration,step);
00177     }
00178     pid = -1;
00179   }
00180 //  CkPrintf("\n");
00181 }
00182 
00183 void Sync::triggerCompute()
00184 {
00185   PatchMap *patchMap = PatchMap::Object();
00186   const int numHomePatches = patchMap->numHomePatches();
00187 
00188   if (numPatches == -1) 
00189     numPatches = ProxyMgr::Object()->numProxies() + numHomePatches;
00190 
00191 // if (CkMyPe()<=8) CkPrintf("SYNC[%d]: PATCHREADY:%d %d patches:%d %d\n", CkMyPe(), counter, numHomePatches, nPatcheReady, numPatches);
00192 //  CkPrintf("SYNC[%d]: PATCHREADY:%d %d patches:%d %d\n", CkMyPe(), counter, PatchMap::Object()->numHomePatches(), nPatcheReady, numPatches);
00193 
00194   if (homeReady == 0 && counter >= numHomePatches) {
00195     homeReady = 1;
00196  // if (CkMyPe()<=8) CkPrintf("HOMEREADY[%d]\n", CkMyPe());
00197     if (!useProxySync)  releaseComputes();
00198   }
00199 
00200   if (homeReady && nPatcheReady == numPatches)
00201   {
00202 // if (CkMyPe()<=8) CkPrintf("TRIGGERED[%d]\n", CkMyPe());
00203 //     CkPrintf("TRIGGERED[%d]\n", CkMyPe());
00204     if (useProxySync) releaseComputes();
00205 
00206     // reset counter
00207     numPatches = -1;
00208     step++;
00209     nPatcheReady = 0;
00210     for (int i= 0; i<cnum; i++) {
00211       if (clist[i].pid != -1 && clist[i].step == step) ++nPatcheReady;
00212     }
00213     homeReady = 0;
00214     if ( numHomePatches ) {
00215       counter -= numHomePatches;
00216       if (counter >= numHomePatches) triggerCompute();
00217     }
00218   }
00219 }
00220 
00221 
00222 #if 0
00223 // hack for SUN MPCC
00224 // force compiler to instantiate ArrayElementT
00225 void *frightenCompilerIntoInstantiatingHack(void) {
00226   return new ArrayElementT<int>;
00227 }
00228 #endif
00229 
00230 
00231 #include "Sync.def.h"

Generated on Tue Sep 19 01:17:14 2017 for NAMD by  doxygen 1.4.7