00001
00007 #include "InfoStream.h"
00008
00009 #ifdef USE_COMM_LIB
00010 #include "ComlibManager.h"
00011 #endif
00012
00013 #include "main.decl.h"
00014 #include "main.h"
00015 #include "ProxyPatch.h"
00016 #include "ProxyMgr.decl.h"
00017 #include "ProxyMgr.h"
00018 #include "AtomMap.h"
00019 #include "PatchMap.h"
00020 #include "Priorities.h"
00021
00022 #define MIN_DEBUG_LEVEL 2
00023
00024 #include "Debug.h"
00025
00026
00027 ProxyPatch::ProxyPatch(PatchID pd) :
00028 Patch(pd), proxyMsgBufferStatus(PROXYMSGNOTBUFFERED),
00029 curProxyMsg(NULL), prevProxyMsg(NULL)
00030 {
00031 DebugM(4, "ProxyPatch(" << pd << ") at " << this << "\n");
00032 ProxyMgr::Object()->registerProxy(patchID);
00033 numAtoms = -1;
00034 parent = -1;
00035
00036 #ifdef NODEAWARE_PROXY_SPANNINGTREE
00037
00038
00039 #else
00040 nChild = 0;
00041 child = new int[proxySpanDim];
00042 #endif
00043
00044 #if CMK_PERSISTENT_COMM
00045 localphs = 0;
00046 localphs = CmiCreatePersistent(PatchMap::Object()->node(patchID), 300000);
00047 #endif
00048
00049
00050 #if NAMD_SeparateWaters != 0
00051 numWaterAtoms = -1;
00052 #endif
00053
00054 #if defined(NODEAWARE_PROXY_SPANNINGTREE) && defined(USE_NODEPATCHMGR)
00055 depositLock = CmiCreateLock();
00056 #endif
00057 }
00058
00059 ProxyPatch::~ProxyPatch()
00060 {
00061 DebugM(4, "ProxyPatch(" << patchID << ") deleted at " << this << "\n");
00062 ProxyMgr::Object()->unregisterProxy(patchID);
00063
00064
00065
00066 CmiAssert(prevProxyMsg!=NULL);
00067 if(prevProxyMsg!=NULL) {
00068 #ifdef REMOVE_PROXYDATAMSG_EXTRACOPY
00069 AtomMap::Object()->unregisterIDs(patchID,positionPtrBegin, positionPtrEnd);
00070 #else
00071 AtomMap::Object()->unregisterIDs(patchID,p.begin(),p.end());
00072 #endif
00073 delete prevProxyMsg;
00074 prevProxyMsg = NULL;
00075 }
00076
00077
00078 #ifdef NODEAWARE_PROXY_SPANNINGTREE
00079 delete [] children;
00080 #ifdef USE_NODEPATCHMGR
00081 delete [] nodeChildren;
00082 #endif
00083 #else
00084 delete [] child;
00085 #endif
00086
00087 p.resize(0);
00088
00089 #ifdef MEM_OPT_VERSION
00090 pExt.resize(0);
00091 #endif
00092
00093 #if CMK_PERSISTENT_COMM
00094 CmiDestoryPersistent(localphs);
00095 localphs = 0;
00096 #endif
00097 }
00098
00099 void ProxyPatch::boxClosed(int box)
00100 {
00101 if ( box == 1 ) {
00102
00103
00104
00105
00106
00107 sendResults();
00108 }
00109 if ( ! --boxesOpen ) {
00110 DebugM(2,patchID << ": " << "Checking message buffer.\n");
00111
00112 if(proxyMsgBufferStatus == PROXYALLMSGBUFFERED) {
00113 CmiAssert(curProxyMsg != NULL);
00114 DebugM(3,"Patch " << patchID << " processing buffered proxy ALL data.\n");
00115 receiveAll(curProxyMsg);
00116 }else if(proxyMsgBufferStatus == PROXYDATAMSGBUFFERED) {
00117 CmiAssert(curProxyMsg != NULL);
00118 DebugM(3,"Patch " << patchID << " processing buffered proxy data.\n");
00119 receiveData(curProxyMsg);
00120 }
00121 } else {
00122 DebugM(3,"ProxyPatch " << patchID << ": " << boxesOpen << " boxes left to close.\n");
00123 }
00124 }
00125
00126 void ProxyPatch::receiveAtoms(ProxyAtomsMsg *msg)
00127 {
00128 DebugM(3, "receiveAtoms(" << patchID << ")\n");
00129 numAtoms = msg->atomIDList.size();
00130 delete msg;
00131 }
00132
00133 void ProxyPatch::receiveData(ProxyDataMsg *msg)
00134 {
00135 DebugM(3, "receiveData(" << patchID << ")\n");
00136
00137
00138 delete prevProxyMsg;
00139 prevProxyMsg = NULL;
00140
00141 if ( boxesOpen )
00142 {
00143 proxyMsgBufferStatus = PROXYDATAMSGBUFFERED;
00144
00145 curProxyMsg = msg;
00146 return;
00147 }
00148
00149
00150 curProxyMsg = msg;
00151 prevProxyMsg = curProxyMsg;
00152 flags = msg->flags;
00153
00154 #ifdef REMOVE_PROXYDATAMSG_EXTRACOPY
00155
00156
00157
00158
00159
00160 positionPtrBegin = msg->positionList;
00161 positionPtrEnd = msg->positionList + msg->plLen;
00162 #else
00163 p.resize(msg->plLen);
00164 memcpy(p.begin(), msg->positionList, sizeof(CompAtom)*(msg->plLen));
00165 #endif
00166
00167 avgPositionPtrBegin = msg->avgPositionList;
00168 avgPositionPtrEnd = msg->avgPositionList + msg->avgPlLen;
00169
00170
00171 if ( numAtoms == -1 ) {
00172
00173 numAtoms = msg->plLen;
00174
00175 #ifdef MEM_OPT_VERSION
00176
00177 CmiAssert(msg->plExtLen!=0);
00178 pExt.resize(msg->plExtLen);
00179 memcpy(pExt.begin(), msg->positionExtList, sizeof(CompAtomExt)*(msg->plExtLen));
00180 #endif
00181
00182
00183
00184 #if NAMD_SeparateWaters != 0
00185 numWaterAtoms = msg->numWaterAtoms;
00186 #endif
00187
00188 positionsReady(1);
00189 } else {
00190 positionsReady(0);
00191 }
00192 }
00193
00194 void ProxyPatch::receiveAll(ProxyDataMsg *msg)
00195 {
00196 DebugM(3, "receiveAll(" << patchID << ")\n");
00197
00198 if ( boxesOpen )
00199 {
00200 proxyMsgBufferStatus = PROXYALLMSGBUFFERED;
00201 curProxyMsg = msg;
00202 return;
00203 }
00204
00205
00206
00207 if(prevProxyMsg!=NULL) {
00208 #ifdef REMOVE_PROXYDATAMSG_EXTRACOPY
00209 AtomMap::Object()->unregisterIDs(patchID,positionPtrBegin,positionPtrEnd);
00210 #else
00211 AtomMap::Object()->unregisterIDs(patchID, p.begin(), p.end());
00212 #endif
00213 }
00214
00215 delete prevProxyMsg;
00216 curProxyMsg = msg;
00217 prevProxyMsg = curProxyMsg;
00218
00219 flags = msg->flags;
00220
00221 #ifdef REMOVE_PROXYDATAMSG_EXTRACOPY
00222
00223
00224
00225
00226
00227 positionPtrBegin = msg->positionList;
00228 positionPtrEnd = msg->positionList + msg->plLen;
00229 #else
00230 p.resize(msg->plLen);
00231 memcpy(p.begin(), msg->positionList, sizeof(CompAtom)*(msg->plLen));
00232 #endif
00233
00234 numAtoms = msg->plLen;
00235
00236
00237 avgPositionPtrBegin = msg->avgPositionList;
00238 avgPositionPtrEnd = msg->avgPositionList + msg->avgPlLen;
00239
00240 #ifdef MEM_OPT_VERSION
00241
00242
00243
00244
00245
00246
00247 pExt.resize(msg->plExtLen);
00248 memcpy(pExt.begin(), msg->positionExtList, sizeof(CompAtomExt)*(msg->plExtLen));
00249 #endif
00250
00251
00252 #if NAMD_SeparateWaters != 0
00253 numWaterAtoms = msg->numWaterAtoms;
00254 #endif
00255
00256 positionsReady(1);
00257 }
00258
00259 void ProxyPatch::sendResults(void)
00260 {
00261 DebugM(3, "sendResults(" << patchID << ")\n");
00262 register int i = 0;
00263 register ForceList::iterator f_i, f_e, f2_i;
00264 for ( i = Results::normal + 1 ; i <= flags.maxForceMerged; ++i ) {
00265 f_i = f[Results::normal].begin(); f_e = f[Results::normal].end();
00266 f2_i = f[i].begin();
00267 for ( ; f_i != f_e; ++f_i, ++f2_i ) *f_i += *f2_i;
00268 f[i].resize(0);
00269 }
00270 for ( i = flags.maxForceUsed + 1; i < Results::maxNumForces; ++i )
00271 f[i].resize(0);
00272
00273 #if CMK_PERSISTENT_COMM
00274
00275 #endif
00276 if (proxyRecvSpanning == 0) {
00277 #ifdef REMOVE_PROXYRESULTMSG_EXTRACOPY
00278 ProxyResultVarsizeMsg *msg = ProxyResultVarsizeMsg::getANewMsg(CkMyPe(), patchID, PRIORITY_SIZE, f);
00279 #else
00280 ProxyResultMsg *msg = new (PRIORITY_SIZE) ProxyResultMsg;
00281 msg->node = CkMyPe();
00282 msg->patch = patchID;
00283 for ( i = 0; i < Results::maxNumForces; ++i )
00284 msg->forceList[i] = f[i];
00285 #endif
00286 SET_PRIORITY(msg,flags.sequence,PROXY_RESULTS_PRIORITY + PATCH_PRIORITY(patchID));
00287 ProxyMgr::Object()->sendResults(msg);
00288 }
00289 else {
00290 ProxyCombinedResultMsg *msg = new (PRIORITY_SIZE) ProxyCombinedResultMsg;
00291 SET_PRIORITY(msg,flags.sequence,
00292 PROXY_RESULTS_PRIORITY + PATCH_PRIORITY(patchID));
00293 msg->nodes.add(CkMyPe());
00294 msg->patch = patchID;
00295 for ( i = 0; i < Results::maxNumForces; ++i )
00296 msg->forceList[i] = f[i];
00297 ProxyMgr::Object()->sendResults(msg);
00298 }
00299 #if CMK_PERSISTENT_COMM
00300 CmiUsePersistentHandle(NULL, 0);
00301 #endif
00302 }
00303
00304 #ifdef NODEAWARE_PROXY_SPANNINGTREE
00305 void ProxyPatch::setSpanningTree(int p, int *c, int n) {
00306 parent=p; numChild = n; nWait = 0;
00307 delete [] children;
00308 if(n==0) {
00309 children = NULL;
00310 return;
00311 }
00312 children = new int[n];
00313 for (int i=0; i<n; i++) children[i] = c[i];
00314
00315 #if defined(PROCTRACE_DEBUG) && defined(NAST_DEBUG)
00316 DebugFileTrace *dft = DebugFileTrace::Object();
00317 dft->openTrace();
00318 dft->writeTrace("ProxyPatch[%d] has %d children: ", patchID, numChild);
00319 for(int i=0; i<numChild; i++)
00320 dft->writeTrace("%d ", children[i]);
00321 dft->writeTrace("\n");
00322 dft->closeTrace();
00323 #endif
00324
00325 }
00326
00327 int ProxyPatch::getSpanningTreeChild(int *c) {
00328 for (int i=0; i<numChild; i++) c[i] = children[i];
00329 return numChild;
00330 }
00331
00332 #ifdef USE_NODEPATCHMGR
00333 void ProxyPatch::setSTNodeChildren(int numNids, int *nids){
00334 numNodeChild = numNids;
00335 delete [] nodeChildren;
00336 if(numNids==0) {
00337 nodeChildren = NULL;
00338 return;
00339 }
00340 nodeChildren = new int[numNids];
00341 for(int i=0; i<numNids; i++) nodeChildren[i] = nids[i];
00342 }
00343 #endif
00344
00345 #else //branch for not defined NODEAWARE_PROXY_SPANNINGTREE
00346 void ProxyPatch::setSpanningTree(int p, int *c, int n) {
00347 parent=p; nChild = n; nWait = 0;
00348 for (int i=0; i<n; i++) child[i] = c[i];
00349
00350 }
00351
00352 int ProxyPatch::getSpanningTreeChild(int *c) {
00353 for (int i=0; i<nChild; i++) c[i] = child[i];
00354 return nChild;
00355 }
00356 #endif
00357
00358 ProxyCombinedResultMsg *ProxyPatch::depositCombinedResultMsg(ProxyCombinedResultMsg *msg) {
00359 #if defined(NODEAWARE_PROXY_SPANNINGTREE) && defined(USE_NODEPATCHMGR)
00360 CmiLock(depositLock);
00361 #endif
00362 nWait++;
00363 if (nWait == 1) msgCBuffer = msg;
00364 else {
00365 NodeIDList::iterator n_i, n_e;
00366 n_i = msg->nodes.begin();
00367 n_e = msg->nodes.end();
00368 for (; n_i!=n_e; ++n_i) msgCBuffer->nodes.add(*n_i);
00369 for ( int k = 0; k < Results::maxNumForces; ++k )
00370 {
00371 register ForceList::iterator r_i;
00372 r_i = msgCBuffer->forceList[k].begin();
00373 register ForceList::iterator f_i, f_e;
00374 f_i = msg->forceList[k].begin();
00375 f_e = msg->forceList[k].end();
00376
00377
00378 int nf = f_e - f_i;
00379 #ifdef ARCH_POWERPC
00380 #pragma disjoint (*f_i, *r_i)
00381 #pragma unroll(4)
00382 #endif
00383 for (int count = 0; count < nf; count++) {
00384 r_i[count].x += f_i[count].x;
00385 r_i[count].y += f_i[count].y;
00386 r_i[count].z += f_i[count].z;
00387 }
00388
00389 }
00390 delete msg;
00391 }
00392
00393 #ifdef NODEAWARE_PROXY_SPANNINGTREE
00394 if(nWait == numChild+1) {
00395 #else
00396 if (nWait == nChild + 1) {
00397 #endif
00398 nWait = 0;
00399 #if defined(NODEAWARE_PROXY_SPANNINGTREE) && defined(USE_NODEPATCHMGR)
00400 CmiUnlock(depositLock);
00401 #endif
00402
00403 return msgCBuffer;
00404 }
00405
00406 #if defined(NODEAWARE_PROXY_SPANNINGTREE) && defined(USE_NODEPATCHMGR)
00407 CmiUnlock(depositLock);
00408 #endif
00409
00410 return NULL;
00411 }
00412