00001
00008
00009
00010
00011
00012 #if !defined(WIN32) || defined(__CYGWIN__)
00013 #include <unistd.h>
00014 #endif
00015 #include <stdio.h>
00016
00017 #include "InfoStream.h"
00018 #include "Patch.h"
00019 #include "PatchMap.h"
00020 #include "ProxyMgr.h"
00021 #include "Compute.h"
00022 #include "ComputeMap.h"
00023
00024 #include "Sync.h"
00025
00026 #define MIN_DEBUG_LEVEL 3
00027
00028 #include "Debug.h"
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 static int eventHoldComputes;
00040 static int eventReleaseComputes;
00041
00042 Sync::Sync(): INCREASE(600), step(0), counter(0), homeReady(0)
00043 {
00044 if (CkpvAccess(Sync_instance) == NULL) {
00045 CkpvAccess(Sync_instance) = this;
00046 } else {
00047 iout << iFILE << iERROR << iPE
00048 << "Sync instanced twice on same processor!" << endi;
00049 CkExit();
00050 }
00051 capacity = INCREASE;
00052 clist = new _clist[capacity];
00053 cnum = 0;
00054 nPatcheReady = 0;
00055 numPatches = -1;
00056 eventHoldComputes = traceRegisterUserEvent("Sync::holdComputes", 133);
00057 eventReleaseComputes = traceRegisterUserEvent("Sync::releaseComputes", 134);
00058 }
00059
00060 Sync::~Sync()
00061 {
00062 delete [] clist;
00063 }
00064
00065 void Sync::openSync(void)
00066 {
00067 int reportPe = 1;
00068 while ( 2 * reportPe < CkNumPes() ) reportPe *= 2;
00069 step = -1;
00070 useSync = 1;
00071 useProxySync = 0;
00072 if (useSync) {
00073
00074 if (!useProxySync && (proxySendSpanning || proxyRecvSpanning)
00075 && PatchMap::Object()->numPatches() < 4 * CkNumPes() ) {
00076
00077 #if !CMK_BLUEGENEP
00078
00079 useProxySync = 1;
00080 #endif
00081 }
00082 #if defined(NODEAWARE_PROXY_SPANNINGTREE) && defined(USE_NODEPATCHMGR)
00083
00084 if (CkMyNodeSize() > 2) useProxySync = 0;
00085 #endif
00086
00087 if (useProxySync && ProxyMgr::Object()->numProxies() == 0) {
00088
00089 useProxySync = 0;
00090 }
00091
00092 if (!useProxySync && PatchMap::Object()->numHomePatches() == 0) useSync = 0;
00093 }
00094 if(CkMyPe() == reportPe)
00095 iout << iINFO << "useSync: " << useSync << " useProxySync: " << useProxySync << "\n" << endi;
00096 }
00097
00098
00099 int Sync::holdComputes(PatchID pid, ComputeIDListIter cid, int doneMigration, int seq)
00100 {
00101 if (!useSync) return 0;
00102 if (step < 0) step = seq;
00103 if (!useProxySync) {
00104
00105 PatchMap *patchMap = PatchMap::Object();
00106 if (homeReady && seq == step) {
00107 nPatcheReady++;
00108 triggerCompute();
00109 return 0;
00110 }
00111 }
00112 traceUserEvent(eventHoldComputes);
00113
00114 int slot = 0;
00115 for (; slot < cnum; slot++)
00116 if (clist[slot].pid == -1) break;
00117 if (slot == cnum) {
00118 cnum++;
00119
00120 if (cnum == capacity) {
00121 capacity += INCREASE;
00122 struct _clist *tmp = new _clist[capacity];
00123 memcpy(tmp, clist, cnum*sizeof(_clist));
00124 delete [] clist;
00125 clist = tmp;
00126
00127 }
00128 }
00129
00130 clist[slot].cid = cid;
00131 clist[slot].pid = pid;
00132 clist[slot].doneMigration = doneMigration;
00133 clist[slot].step = seq;
00134
00135
00136
00137 if (clist[slot].step == step) {
00138 nPatcheReady++;
00139 triggerCompute();
00140 }
00141 return 1;
00142 }
00143
00144
00145 void Sync::PatchReady(void)
00146 {
00147 if ( useSync ) {
00148 counter ++;
00149 triggerCompute();
00150 }
00151 }
00152
00153 void Sync::releaseComputes()
00154 {
00155 PatchMap *patchMap = PatchMap::Object();
00156 ComputeMap *computeMap = ComputeMap::Object();
00157
00158 traceUserEvent(eventReleaseComputes);
00159
00160 for (int i= 0; i<cnum; i++) {
00161 int &pid = clist[i].pid;
00162 if (pid == -1) continue;
00163 if (clist[i].step != step) {
00164 continue;
00165 }
00166
00167
00168
00169
00170 ComputeIDListIter cid = clist[i].cid;
00171
00172 int compute_count = 0;
00173 for(cid = cid.begin(); cid != cid.end(); cid++) {
00174 compute_count++;
00175 computeMap->compute(*cid)->patchReady(pid,clist[i].doneMigration,step);
00176 }
00177 if (compute_count == 0 && patchMap->node(pid) != CkMyPe()) {
00178 iout << iINFO << "PATCH_COUNT-Sync step " << step
00179 << "]: Patch " << pid << " on PE "
00180 << CkMyPe() <<" home patch "
00181 << patchMap->node(pid) << " does not have any computes\n"
00182 << endi;
00183 }
00184 pid = -1;
00185 }
00186
00187 }
00188
00189 void Sync::triggerCompute()
00190 {
00191 PatchMap *patchMap = PatchMap::Object();
00192 const int numHomePatches = patchMap->numHomePatches();
00193
00194 if (numPatches == -1)
00195 numPatches = ProxyMgr::Object()->numProxies() + numHomePatches;
00196
00197
00198
00199
00200 if (homeReady == 0 && counter >= numHomePatches) {
00201 homeReady = 1;
00202
00203 if (!useProxySync) releaseComputes();
00204 }
00205
00206 if (homeReady && nPatcheReady == numPatches)
00207 {
00208
00209
00210 if (useProxySync) releaseComputes();
00211
00212
00213 numPatches = -1;
00214 step++;
00215 nPatcheReady = 0;
00216 for (int i= 0; i<cnum; i++) {
00217 if (clist[i].pid != -1 && clist[i].step == step) ++nPatcheReady;
00218 }
00219 homeReady = 0;
00220 if ( numHomePatches ) {
00221 counter -= numHomePatches;
00222 if (counter >= numHomePatches) triggerCompute();
00223 }
00224 }
00225 }
00226
00227
00228 #if 0
00229
00230
00231 void *frightenCompilerIntoInstantiatingHack(void) {
00232 return new ArrayElementT<int>;
00233 }
00234 #endif
00235
00236
00237 #include "Sync.def.h"