Main Page | Namespace List | Class Hierarchy | Alphabetical List | Class List | File List | Class Members | File Members

Sync.C

Go to the documentation of this file.
00001 
00008 /*
00009     Sync will ensure that all homepatches finished updating before Computes starts and all proxies finished updating themselves.
00010 */
00011 
00012 #if !defined(WIN32) || defined(__CYGWIN__)
00013 #include <unistd.h>
00014 #endif
00015 #include <stdio.h>
00016 
00017 #include "InfoStream.h"
00018 #include "Patch.h"
00019 #include "PatchMap.h"
00020 #include "ProxyMgr.h"
00021 #include "Compute.h"
00022 #include "ComputeMap.h"
00023 
00024 #include "Sync.h"
00025 
00026 #define MIN_DEBUG_LEVEL 3
00027 //#define DEBUGM
00028 #include "Debug.h"
00029 
00030 // make sure all HomePatches get their positions data and sendProxyData to 
00031 // their proxies before computes get positionsReady.
00032 int useSync = 1;
00033 
00034 // useProxySync will make sure all proxies get updated before computes' 
00035 // positionsReady triggered so that real computation starts.
00036 // when these two combined, it will make sure that homepatch get all its force 
00037 // and positions data, and proxies receive its updated data before all 
00038 // computes start.
00039 int useProxySync = 0;
00040 
00041 Sync::Sync(): INCREASE(600), step(0), counter(0), homeReady(0)
00042 {
00043     if (CkpvAccess(Sync_instance) == NULL) {
00044         CkpvAccess(Sync_instance) = this;
00045     } else {
00046         iout << iFILE << iERROR << iPE
00047           << "Sync instanced twice on same processor!" << endi;
00048         CkExit();
00049     }
00050     capacity = INCREASE;
00051     clist = new _clist[capacity];
00052     cnum = 0;
00053     nPatcheReady = 0;
00054     numPatches = -1;
00055 }
00056 
00057 Sync::~Sync()
00058 {
00059   delete [] clist;
00060 }
00061 
00062 void Sync::openSync(void)
00063 {
00064   if (useSync) {
00065     // if use proxy spanning tree, proxy sync is forced
00066     if (!useProxySync && (proxySendSpanning || proxyRecvSpanning)) {
00067 #if !CMK_IMMEDIATE_MSG
00068       //Dont need proxy sync when immediate messges are turned on
00069       if (CkMyPe() == 0)
00070       CmiPrintf("[%d] useProxySync is turned on. \n", CkMyPe());
00071       // Dont need proxy sync when immediate messges are turned on
00072       // If on BG/P, useProxySync should not be turned on for better performance
00073       #if !CMK_BLUEGENEP
00074       // CmiPrintf("[%d] useProxySync is turned on. \n", CkMyPe());
00075       useProxySync = 1;
00076       #endif
00077 #endif
00078     }
00079     // no proxies on this node, no need to use proxy sync.
00080     if (useProxySync && ProxyMgr::Object()->numProxies() == 0) {
00081       // CmiPrintf("[%d] useProxySync is turned off because no proxy. \n", CkMyPe());
00082       useProxySync = 0;
00083     }
00084     // if no proxy sync and no home patch, then disable home patch sync as well
00085     if (!useProxySync && PatchMap::Object()->numHomePatches() == 0) useSync = 0;
00086   }
00087   if(CkMyPe() == 0)
00088     iout << iINFO << "useSync: " << useSync << " useProxySync: " << useProxySync << "\n" << endi;
00089 }    
00090 
00091 // called from Patch::positionsReady()
00092 int Sync::holdComputes(PatchID pid, ComputeIDListIter cid, int doneMigration)
00093 {
00094   if (!useProxySync) {
00095     // only hold when homepatches are not ready
00096     PatchMap *patchMap = PatchMap::Object();
00097     if (homeReady) {
00098       triggerCompute();
00099       return 0;
00100     }
00101   }
00102 
00103   int slot = 0;
00104   for (; slot < cnum; slot++)
00105      if (clist[slot].pid == -1) break;
00106   if (slot == cnum) {
00107     cnum++;
00108     // table is full, expand the list
00109     if (cnum == capacity) {
00110       capacity += INCREASE;
00111       struct _clist *tmp = new _clist[capacity];
00112       memcpy(tmp, clist, cnum*sizeof(_clist));
00113       delete [] clist;
00114       clist = tmp;
00115       //CmiPrintf("[%d] Info:: Sync buffer overflow and expanded!\n", CkMyPe());
00116     }
00117   }
00118 
00119   clist[slot].cid = cid;
00120   clist[slot].pid = pid;
00121   clist[slot].doneMigration  = doneMigration;
00122   clist[slot].step = PatchMap::Object()->patch(pid)->flags.sequence;
00123 
00124 //  CkPrintf("REG[%d]: patch:%d step:%d-%d slot:%d\n", CkMyPe(), pid, patchMap->patch(pid)->flags.sequence, step, slot);
00125 
00126   if (clist[slot].step == step) {
00127       nPatcheReady++;
00128       triggerCompute();
00129   }
00130   return 1;
00131 }
00132 
00133 // called from HomePatch::positionsReady()
00134 void Sync::PatchReady(void)
00135 {
00136   counter ++;
00137   triggerCompute();
00138 }
00139 
00140 void Sync::releaseComputes()
00141 {
00142   PatchMap *patchMap = PatchMap::Object();
00143   ComputeMap *computeMap = ComputeMap::Object();
00144 
00145   nPatcheReady = 0;
00146   for (int i= 0; i<cnum; i++) {
00147     int &pid = clist[i].pid;
00148     if (pid == -1) continue;
00149     if (clist[i].step != step) {
00150       // count for next step
00151       if (clist[i].step == step + 1) nPatcheReady++;
00152       continue;
00153     }
00154     //         CkPrintf(" %d-%d-%d ",
00155     //   clist[i].pid, clist[i].step,
00156     //      patchMap->patch(pid)->flags.sequence);
00157 
00158     ComputeIDListIter cid = clist[i].cid;
00159 
00160     int compute_count = 0;
00161     for(cid = cid.begin(); cid != cid.end(); cid++) {
00162       compute_count++;
00163       computeMap->compute(*cid)->patchReady(pid,clist[i].doneMigration,step);
00164     }
00165     if (compute_count == 0 && patchMap->node(pid) != CkMyPe()) {
00166            iout << iINFO << "PATCH_COUNT-Sync step " << step
00167                 << "]: Patch " << pid << " on PE " 
00168                 << CkMyPe() <<" home patch " 
00169                 << patchMap->node(pid) << " does not have any computes\n" 
00170                 << endi;
00171     }
00172     pid = -1;
00173   }
00174 //  CkPrintf("\n");
00175 }
00176 
00177 void Sync::triggerCompute()
00178 {
00179   PatchMap *patchMap = PatchMap::Object();
00180 
00181   if (numPatches == -1) 
00182     numPatches = ProxyMgr::Object()->numProxies() + patchMap->numHomePatches();
00183 
00184 //  CkPrintf("SYNC[%d]: PATCHREADY:%d %d patches:%d %d\n", CkMyPe(), counter, PatchMap::Object()->numHomePatches(), nPatcheReady, numPatches);
00185 
00186   if (homeReady == 0 && counter == patchMap->numHomePatches()) {
00187     homeReady = 1;
00188     if (!useProxySync)  releaseComputes();
00189   }
00190 
00191   if (homeReady && nPatcheReady == numPatches)
00192   {
00193 //     CkPrintf("TRIGGERED[%d]\n", CkMyPe());
00194     if (useProxySync) releaseComputes();
00195 
00196     // reset counter
00197     counter = 0;
00198     numPatches = -1;
00199     step++;
00200     homeReady = 0;
00201   }
00202 }
00203 
00204 
00205 #if 0
00206 // hack for SUN MPCC
00207 // force compiler to instantiate ArrayElementT
00208 void *frightenCompilerIntoInstantiatingHack(void) {
00209   return new ArrayElementT<int>;
00210 }
00211 #endif
00212 
00213 
00214 #include "Sync.def.h"

Generated on Sat Nov 7 04:07:50 2009 for NAMD by  doxygen 1.3.9.1