Main Page | Namespace List | Class Hierarchy | Alphabetical List | Class List | File List | Class Members | File Members

ProxyPatch.C

Go to the documentation of this file.
00001 
00007 #include "InfoStream.h"
00008 
00009 #ifdef USE_COMM_LIB
00010 #include "ComlibManager.h"
00011 #endif
00012 
00013 #include "Lattice.h"
00014 #include "main.decl.h"
00015 #include "main.h"
00016 #include "ProxyPatch.h"
00017 #include "ProxyMgr.decl.h"
00018 #include "ProxyMgr.h"
00019 #include "AtomMap.h"
00020 #include "PatchMap.h"
00021 #include "Priorities.h"
00022 
00023 #define MIN_DEBUG_LEVEL 2
00024 //#define  DEBUGM
00025 #include "Debug.h"
00026 
00027 
00028 ProxyPatch::ProxyPatch(PatchID pd) : 
00029   Patch(pd), proxyMsgBufferStatus(PROXYMSGNOTBUFFERED), 
00030   curProxyMsg(NULL), prevProxyMsg(NULL)
00031 {
00032   DebugM(4, "ProxyPatch(" << pd << ") at " << this << "\n");
00033   ProxyMgr::Object()->registerProxy(patchID);
00034   numAtoms = -1;
00035   parent = -1;
00036 
00037 #ifdef NODEAWARE_PROXY_SPANNINGTREE
00038   /*numChild = 0;
00039   children = NULL;*/
00040 #else
00041   nChild = 0;
00042   child = new int[proxySpanDim];
00043 #endif
00044 
00045 #if CMK_PERSISTENT_COMM
00046   localphs = 0;
00047   localphs = CmiCreatePersistent(PatchMap::Object()->node(patchID), 300000);
00048 #endif
00049 
00050   // DMK - Atom Separation (water vs. non-water)
00051   #if NAMD_SeparateWaters != 0
00052     numWaterAtoms = -1;
00053   #endif
00054   
00055   #if defined(NODEAWARE_PROXY_SPANNINGTREE) && defined(USE_NODEPATCHMGR)
00056     depositLock = CmiCreateLock();
00057   #endif
00058 }
00059 
00060 ProxyPatch::~ProxyPatch()
00061 {
00062   DebugM(4, "ProxyPatch(" << patchID << ") deleted at " << this << "\n");
00063   ProxyMgr::Object()->unregisterProxy(patchID);
00064 
00065   // ProxyPatch may be freed because of load balancing if the compute object
00066   // it corresponds to no longer exist on this specific processor.
00067   CmiAssert(prevProxyMsg!=NULL);
00068   if(prevProxyMsg!=NULL) {
00069 // #ifdef REMOVE_PROXYDATAMSG_EXTRACOPY
00070 //       AtomMap::Object()->unregisterIDs(patchID,positionPtrBegin, positionPtrEnd);
00071 // #else
00072       AtomMap::Object()->unregisterIDs(patchID,pExt.begin(),pExt.end());
00073 // #endif      
00074       delete prevProxyMsg;
00075       prevProxyMsg = NULL;
00076   }
00077 
00078 
00079 #ifdef NODEAWARE_PROXY_SPANNINGTREE
00080   delete [] children;
00081   #ifdef USE_NODEPATCHMGR
00082   delete [] nodeChildren;  
00083   #endif
00084 #else
00085   delete [] child;
00086 #endif
00087 
00088   p.resize(0);
00089   pExt.resize(0);
00090 
00091 #if CMK_PERSISTENT_COMM
00092   CmiDestoryPersistent(localphs);
00093   localphs = 0;
00094 #endif
00095 }
00096 
00097 void ProxyPatch::boxClosed(int box)
00098 {
00099   if ( box == 1 ) {     
00100     // Note: delay the deletion of proxyDataMsg (of the 
00101     // current step) until the next step. This is done 
00102     // for the sake of atom migration (ProxyDataMsg) 
00103     // as the ProxyPatch has to  unregister the atoms 
00104     // of the previous step in the AtomMap data structure. 
00105     sendResults();
00106   }
00107   if ( ! --boxesOpen ) {
00108     DebugM(2,patchID << ": " << "Checking message buffer.\n");    
00109     
00110     if(proxyMsgBufferStatus == PROXYALLMSGBUFFERED) {
00111           CmiAssert(curProxyMsg != NULL);
00112           DebugM(3,"Patch " << patchID << " processing buffered proxy ALL data.\n");
00113           receiveAll(curProxyMsg);          
00114     }else if(proxyMsgBufferStatus == PROXYDATAMSGBUFFERED) {
00115           CmiAssert(curProxyMsg != NULL);
00116           DebugM(3,"Patch " << patchID << " processing buffered proxy data.\n");
00117           receiveData(curProxyMsg);
00118     }
00119   } else {
00120        DebugM(3,"ProxyPatch " << patchID << ": " << boxesOpen << " boxes left to close.\n");
00121   }
00122 }
00123 
00124 void ProxyPatch::receiveAtoms(ProxyAtomsMsg *msg)
00125 {
00126   DebugM(3, "receiveAtoms(" << patchID << ")\n");
00127   numAtoms = msg->atomIDList.size();
00128   delete msg;
00129 }
00130 
00131 void ProxyPatch::receiveData(ProxyDataMsg *msg)
00132 {
00133   DebugM(3, "receiveData(" << patchID << ")\n");
00134 
00135   //delete the ProxyDataMsg of the previous step
00136   delete prevProxyMsg;
00137   prevProxyMsg = NULL;
00138 
00139   if ( boxesOpen )
00140   {
00141       proxyMsgBufferStatus = PROXYDATAMSGBUFFERED;
00142     // store message in queue (only need one element, though)
00143     curProxyMsg = msg;
00144     return;
00145   }
00146 
00147   //Reuse position arrays inside proxyDataMsg --Chao Mei
00148   curProxyMsg = msg;
00149   prevProxyMsg = curProxyMsg;
00150   flags = msg->flags;
00151 
00152 #ifdef REMOVE_PROXYDATAMSG_EXTRACOPY
00153   //We could set them to 0 for the sake of easy debugging
00154   //if there are something wrong in the "reuse position arrays" code
00155   //--Chao Mei
00156   //p.resize(0);
00157   //p_avg.resize(0);  
00158   positionPtrBegin = msg->positionList;
00159   positionPtrEnd = msg->positionList + msg->plLen;
00160 #else
00161   p.resize(msg->plLen);
00162   memcpy(p.begin(), msg->positionList, sizeof(CompAtom)*(msg->plLen));
00163 #endif
00164   
00165   avgPositionPtrBegin = msg->avgPositionList;
00166   avgPositionPtrEnd = msg->avgPositionList + msg->avgPlLen;
00167 
00168   
00169   if ( numAtoms == -1 ) { // for new proxies since receiveAtoms is not called
00170       //numAtoms = p.size();
00171       numAtoms = msg->plLen;
00172 
00173       //Retrieve the CompAtomExt list
00174       CmiAssert(msg->plExtLen!=0);
00175       pExt.resize(msg->plExtLen);
00176       memcpy(pExt.begin(), msg->positionExtList, sizeof(CompAtomExt)*(msg->plExtLen));
00177 
00178 
00179     // DMK - Atom Separation (water vs. non-water)
00180     #if NAMD_SeparateWaters != 0
00181       numWaterAtoms = msg->numWaterAtoms;
00182     #endif
00183 
00184     positionsReady(1);
00185   } else {
00186     positionsReady(0);
00187   }
00188 }
00189 
00190 void ProxyPatch::receiveAll(ProxyDataMsg *msg)
00191 {
00192   DebugM(3, "receiveAll(" << patchID << ")\n");
00193 
00194   if ( boxesOpen )
00195   {
00196     proxyMsgBufferStatus = PROXYALLMSGBUFFERED;    
00197     curProxyMsg = msg;
00198     return;
00199   }  
00200 
00201   //The prevProxyMsg has to be deleted after this if-statement because
00202   // positionPtrBegin points to the space inside the prevProxyMsg
00203   if(prevProxyMsg!=NULL) {
00204 // #ifdef REMOVE_PROXYDATAMSG_EXTRACOPY
00205 //       AtomMap::Object()->unregisterIDs(patchID,positionPtrBegin,positionPtrEnd);
00206 // #else
00207       AtomMap::Object()->unregisterIDs(patchID, pExt.begin(), pExt.end());
00208 // #endif
00209   }
00210   //Now delete the ProxyDataMsg of the previous step
00211   delete prevProxyMsg;
00212   curProxyMsg = msg;
00213   prevProxyMsg = curProxyMsg;
00214 
00215   flags = msg->flags;
00216 
00217 #ifdef REMOVE_PROXYDATAMSG_EXTRACOPY
00218   //We could set them to 0 for the sake of easy debugging
00219   //if there are something wrong in the "reuse position arrays" code
00220   //--Chao Mei
00221   //p.resize(0);
00222   //p_avg.resize(0);  
00223   positionPtrBegin = msg->positionList;
00224   positionPtrEnd = msg->positionList + msg->plLen;
00225 #else
00226   p.resize(msg->plLen);
00227   memcpy(p.begin(), msg->positionList, sizeof(CompAtom)*(msg->plLen));
00228 #endif
00229 
00230   numAtoms = msg->plLen;
00231   //numAtoms = p.size();
00232   
00233   avgPositionPtrBegin = msg->avgPositionList;
00234   avgPositionPtrEnd = msg->avgPositionList + msg->avgPlLen;
00235 
00236   //We cannot reuse the CompAtomExt list inside the msg because
00237   //the information is needed at every step. In the current implementation
00238   //scheme, the ProxyDataMsg msg will be deleted for every step.
00239   //In order to keep this information, we have to do the extra copy. But
00240   //this overhead is amortized among the steps that atoms don't migrate
00241   // --Chao Mei
00242   pExt.resize(msg->plExtLen);
00243   memcpy(pExt.begin(), msg->positionExtList, sizeof(CompAtomExt)*(msg->plExtLen));
00244 
00245   // DMK - Atom Separation (water vs. non-water)
00246   #if NAMD_SeparateWaters != 0
00247     numWaterAtoms = msg->numWaterAtoms;
00248   #endif
00249 
00250   positionsReady(1);
00251 }
00252 
00253 void ProxyPatch::sendResults(void)
00254 {
00255   DebugM(3, "sendResults(" << patchID << ")\n");
00256   register int i = 0;
00257   register ForceList::iterator f_i, f_e, f2_i;
00258   for ( i = Results::normal + 1 ; i <= flags.maxForceMerged; ++i ) {
00259     f_i = f[Results::normal].begin(); f_e = f[Results::normal].end();
00260     f2_i = f[i].begin();
00261     for ( ; f_i != f_e; ++f_i, ++f2_i ) *f_i += *f2_i;
00262     f[i].resize(0);
00263   }
00264   for ( i = flags.maxForceUsed + 1; i < Results::maxNumForces; ++i )
00265     f[i].resize(0);
00266 
00267 #if CMK_PERSISTENT_COMM
00268 //  CmiUsePersistentHandle(&localphs, 1);
00269 #endif
00270   if (proxyRecvSpanning == 0) {
00271 #ifdef REMOVE_PROXYRESULTMSG_EXTRACOPY
00272     ProxyResultVarsizeMsg *msg = ProxyResultVarsizeMsg::getANewMsg(CkMyPe(), patchID, PRIORITY_SIZE, f); 
00273 #else
00274     ProxyResultMsg *msg = new (PRIORITY_SIZE) ProxyResultMsg;    
00275     msg->node = CkMyPe();
00276     msg->patch = patchID;
00277     for ( i = 0; i < Results::maxNumForces; ++i ) 
00278       msg->forceList[i] = f[i];
00279 #endif
00280     SET_PRIORITY(msg,flags.sequence,PROXY_RESULTS_PRIORITY + PATCH_PRIORITY(patchID));
00281     ProxyMgr::Object()->sendResults(msg);
00282   }
00283   else {
00284     ProxyCombinedResultMsg *msg = new (PRIORITY_SIZE) ProxyCombinedResultMsg;
00285     SET_PRIORITY(msg,flags.sequence,
00286                 PROXY_RESULTS_PRIORITY + PATCH_PRIORITY(patchID));
00287     msg->nodes.add(CkMyPe());
00288     msg->patch = patchID;
00289     for ( i = 0; i < Results::maxNumForces; ++i ) 
00290       msg->forceList[i] = f[i];
00291     ProxyMgr::Object()->sendResults(msg);
00292   }
00293 #if CMK_PERSISTENT_COMM
00294   CmiUsePersistentHandle(NULL, 0);
00295 #endif
00296 }
00297 
00298 #ifdef NODEAWARE_PROXY_SPANNINGTREE
00299 void ProxyPatch::setSpanningTree(int p, int *c, int n) { 
00300   parent=p; numChild = n; nWait = 0;
00301   delete [] children;
00302   if(n==0) {
00303       children = NULL;
00304       return;
00305   }
00306   children = new int[n];
00307   for (int i=0; i<n; i++) children[i] = c[i];
00308 
00309   #if defined(PROCTRACE_DEBUG) && defined(NAST_DEBUG)
00310     DebugFileTrace *dft = DebugFileTrace::Object();
00311     dft->openTrace();
00312     dft->writeTrace("ProxyPatch[%d] has %d children: ", patchID, numChild);
00313     for(int i=0; i<numChild; i++)
00314         dft->writeTrace("%d ", children[i]);
00315     dft->writeTrace("\n");
00316     dft->closeTrace();
00317   #endif
00318 //CkPrintf("setSpanningTree: [%d:%d] %d %d:%d %d\n", CkMyPe(), patchID, parent, nChild, child[0], child[1]);
00319 }
00320 
00321 int ProxyPatch::getSpanningTreeChild(int *c) { 
00322   for (int i=0; i<numChild; i++) c[i] = children[i];
00323   return numChild;
00324 }
00325 
00326 #ifdef USE_NODEPATCHMGR
00327 void ProxyPatch::setSTNodeChildren(int numNids, int *nids){
00328     numNodeChild = numNids;
00329     delete [] nodeChildren;
00330     if(numNids==0) {
00331         nodeChildren = NULL;
00332         return;
00333     }
00334     nodeChildren = new int[numNids];
00335     for(int i=0; i<numNids; i++) nodeChildren[i] = nids[i]; 
00336 }
00337 #endif
00338 
00339 #else //branch for not defined NODEAWARE_PROXY_SPANNINGTREE
00340 void ProxyPatch::setSpanningTree(int p, int *c, int n) { 
00341   parent=p; nChild = n; nWait = 0;
00342   for (int i=0; i<n; i++) child[i] = c[i];
00343 //CkPrintf("setSpanningTree: [%d:%d] %d %d:%d %d\n", CkMyPe(), patchID, parent, nChild, child[0], child[1]);
00344 }
00345 
00346 int ProxyPatch::getSpanningTreeChild(int *c) { 
00347   for (int i=0; i<nChild; i++) c[i] = child[i];
00348   return nChild;
00349 }
00350 #endif
00351 
00352 ProxyCombinedResultMsg *ProxyPatch::depositCombinedResultMsg(ProxyCombinedResultMsg *msg) {
00353 #if defined(NODEAWARE_PROXY_SPANNINGTREE) && defined(USE_NODEPATCHMGR)
00354   CmiLock(depositLock);
00355 #endif
00356   nWait++;
00357   if (nWait == 1) msgCBuffer = msg;
00358   else {
00359     NodeIDList::iterator n_i, n_e;
00360     n_i = msg->nodes.begin();
00361     n_e = msg->nodes.end();
00362     for (; n_i!=n_e; ++n_i) msgCBuffer->nodes.add(*n_i);
00363     for ( int k = 0; k < Results::maxNumForces; ++k )
00364     {
00365     register ForceList::iterator r_i;
00366     r_i = msgCBuffer->forceList[k].begin();
00367     register ForceList::iterator f_i, f_e;
00368     f_i = msg->forceList[k].begin();
00369     f_e = msg->forceList[k].end();
00370     //    for ( ; f_i != f_e; ++f_i, ++r_i ) *r_i += *f_i;
00371 
00372     int nf = f_e - f_i;
00373 #ifdef ARCH_POWERPC
00374 #pragma disjoint (*f_i, *r_i)
00375 #pragma unroll(4)
00376 #endif
00377     for (int count = 0; count < nf; count++) {
00378       r_i[count].x += f_i[count].x;      
00379       r_i[count].y += f_i[count].y;      
00380       r_i[count].z += f_i[count].z;
00381     }
00382 
00383     }
00384     delete msg;
00385   }
00386 //CkPrintf("[%d:%d] wait: %d of %d (%d %d %d)\n", CkMyPe(), patchID, nWait, nChild+1, parent, child[0],child[1]);
00387 #ifdef NODEAWARE_PROXY_SPANNINGTREE
00388   if(nWait == numChild+1) {
00389 #else
00390   if (nWait == nChild + 1) {
00391 #endif
00392     nWait = 0;
00393 #if defined(NODEAWARE_PROXY_SPANNINGTREE) && defined(USE_NODEPATCHMGR)
00394     CmiUnlock(depositLock);
00395 #endif
00396     
00397     return msgCBuffer;
00398   }
00399 
00400 #if defined(NODEAWARE_PROXY_SPANNINGTREE) && defined(USE_NODEPATCHMGR)
00401   CmiUnlock(depositLock);
00402 #endif
00403 
00404   return NULL;
00405 }
00406 

Generated on Tue Nov 24 04:07:45 2009 for NAMD by  doxygen 1.3.9.1