Main Page | Namespace List | Class Hierarchy | Alphabetical List | Class List | File List | Class Members | File Members

ProxyPatch.C

Go to the documentation of this file.
00001 
00007 #include "InfoStream.h"
00008 
00009 #ifdef USE_COMM_LIB
00010 #include "ComlibManager.h"
00011 #endif
00012 
00013 #include "main.decl.h"
00014 #include "main.h"
00015 #include "ProxyPatch.h"
00016 #include "ProxyMgr.decl.h"
00017 #include "ProxyMgr.h"
00018 #include "AtomMap.h"
00019 #include "PatchMap.h"
00020 #include "Priorities.h"
00021 
00022 #define MIN_DEBUG_LEVEL 2
00023 //#define  DEBUGM
00024 #include "Debug.h"
00025 
00026 
00027 ProxyPatch::ProxyPatch(PatchID pd) : 
00028   Patch(pd), proxyMsgBufferStatus(PROXYMSGNOTBUFFERED), 
00029   curProxyMsg(NULL), prevProxyMsg(NULL)
00030 {
00031   DebugM(4, "ProxyPatch(" << pd << ") at " << this << "\n");
00032   ProxyMgr::Object()->registerProxy(patchID);
00033   numAtoms = -1;
00034   parent = -1;
00035 
00036 #ifdef NODEAWARE_PROXY_SPANNINGTREE
00037   /*numChild = 0;
00038   children = NULL;*/
00039 #else
00040   nChild = 0;
00041   child = new int[proxySpanDim];
00042 #endif
00043 
00044 #if CMK_PERSISTENT_COMM
00045   localphs = 0;
00046   localphs = CmiCreatePersistent(PatchMap::Object()->node(patchID), 300000);
00047 #endif
00048 
00049   // DMK - Atom Separation (water vs. non-water)
00050   #if NAMD_SeparateWaters != 0
00051     numWaterAtoms = -1;
00052   #endif
00053   
00054   #if defined(NODEAWARE_PROXY_SPANNINGTREE) && defined(USE_NODEPATCHMGR)
00055     depositLock = CmiCreateLock();
00056   #endif
00057 }
00058 
00059 ProxyPatch::~ProxyPatch()
00060 {
00061   DebugM(4, "ProxyPatch(" << patchID << ") deleted at " << this << "\n");
00062   ProxyMgr::Object()->unregisterProxy(patchID);
00063 
00064   // ProxyPatch may be freed because of load balancing if the compute object
00065   // it corresponds to no longer exist on this specific processor.
00066   CmiAssert(prevProxyMsg!=NULL);
00067   if(prevProxyMsg!=NULL) {
00068 #ifdef REMOVE_PROXYDATAMSG_EXTRACOPY
00069       AtomMap::Object()->unregisterIDs(patchID,positionPtrBegin, positionPtrEnd);
00070 #else
00071       AtomMap::Object()->unregisterIDs(patchID,p.begin(),p.end());
00072 #endif      
00073       delete prevProxyMsg;
00074       prevProxyMsg = NULL;
00075   }
00076 
00077 
00078 #ifdef NODEAWARE_PROXY_SPANNINGTREE
00079   delete [] children;
00080   #ifdef USE_NODEPATCHMGR
00081   delete [] nodeChildren;  
00082   #endif
00083 #else
00084   delete [] child;
00085 #endif
00086 
00087   p.resize(0);
00088 
00089 #ifdef MEM_OPT_VERSION
00090   pExt.resize(0);
00091 #endif
00092 
00093 #if CMK_PERSISTENT_COMM
00094   CmiDestoryPersistent(localphs);
00095   localphs = 0;
00096 #endif
00097 }
00098 
00099 void ProxyPatch::boxClosed(int box)
00100 {
00101   if ( box == 1 ) {     
00102     // Note: delay the deletion of proxyDataMsg (of the 
00103     // current step) until the next step. This is done 
00104     // for the sake of atom migration (ProxyDataMsg) 
00105     // as the ProxyPatch has to  unregister the atoms 
00106     // of the previous step in the AtomMap data structure. 
00107     sendResults();
00108   }
00109   if ( ! --boxesOpen ) {
00110     DebugM(2,patchID << ": " << "Checking message buffer.\n");    
00111     
00112     if(proxyMsgBufferStatus == PROXYALLMSGBUFFERED) {
00113           CmiAssert(curProxyMsg != NULL);
00114           DebugM(3,"Patch " << patchID << " processing buffered proxy ALL data.\n");
00115           receiveAll(curProxyMsg);          
00116     }else if(proxyMsgBufferStatus == PROXYDATAMSGBUFFERED) {
00117           CmiAssert(curProxyMsg != NULL);
00118           DebugM(3,"Patch " << patchID << " processing buffered proxy data.\n");
00119           receiveData(curProxyMsg);
00120     }
00121   } else {
00122        DebugM(3,"ProxyPatch " << patchID << ": " << boxesOpen << " boxes left to close.\n");
00123   }
00124 }
00125 
00126 void ProxyPatch::receiveAtoms(ProxyAtomsMsg *msg)
00127 {
00128   DebugM(3, "receiveAtoms(" << patchID << ")\n");
00129   numAtoms = msg->atomIDList.size();
00130   delete msg;
00131 }
00132 
00133 void ProxyPatch::receiveData(ProxyDataMsg *msg)
00134 {
00135   DebugM(3, "receiveData(" << patchID << ")\n");
00136 
00137   //delete the ProxyDataMsg of the previous step
00138   delete prevProxyMsg;
00139   prevProxyMsg = NULL;
00140 
00141   if ( boxesOpen )
00142   {
00143       proxyMsgBufferStatus = PROXYDATAMSGBUFFERED;
00144     // store message in queue (only need one element, though)
00145     curProxyMsg = msg;
00146     return;
00147   }
00148 
00149   //Reuse position arrays inside proxyDataMsg --Chao Mei
00150   curProxyMsg = msg;
00151   prevProxyMsg = curProxyMsg;
00152   flags = msg->flags;
00153 
00154 #ifdef REMOVE_PROXYDATAMSG_EXTRACOPY
00155   //We could set them to 0 for the sake of easy debugging
00156   //if there are something wrong in the "reuse position arrays" code
00157   //--Chao Mei
00158   //p.resize(0);
00159   //p_avg.resize(0);  
00160   positionPtrBegin = msg->positionList;
00161   positionPtrEnd = msg->positionList + msg->plLen;
00162 #else
00163   p.resize(msg->plLen);
00164   memcpy(p.begin(), msg->positionList, sizeof(CompAtom)*(msg->plLen));
00165 #endif
00166   
00167   avgPositionPtrBegin = msg->avgPositionList;
00168   avgPositionPtrEnd = msg->avgPositionList + msg->avgPlLen;
00169 
00170   
00171   if ( numAtoms == -1 ) { // for new proxies since receiveAtoms is not called
00172       //numAtoms = p.size();
00173       numAtoms = msg->plLen;
00174 
00175 #ifdef MEM_OPT_VERSION
00176       //Retrieve the CompAtomExt list
00177       CmiAssert(msg->plExtLen!=0);
00178       pExt.resize(msg->plExtLen);
00179       memcpy(pExt.begin(), msg->positionExtList, sizeof(CompAtomExt)*(msg->plExtLen));
00180 #endif
00181 
00182 
00183     // DMK - Atom Separation (water vs. non-water)
00184     #if NAMD_SeparateWaters != 0
00185       numWaterAtoms = msg->numWaterAtoms;
00186     #endif
00187 
00188     positionsReady(1);
00189   } else {
00190     positionsReady(0);
00191   }
00192 }
00193 
00194 void ProxyPatch::receiveAll(ProxyDataMsg *msg)
00195 {
00196   DebugM(3, "receiveAll(" << patchID << ")\n");
00197 
00198   if ( boxesOpen )
00199   {
00200     proxyMsgBufferStatus = PROXYALLMSGBUFFERED;    
00201     curProxyMsg = msg;
00202     return;
00203   }  
00204 
00205   //The prevProxyMsg has to be deleted after this if-statement because
00206   // positionPtrBegin points to the space inside the prevProxyMsg
00207   if(prevProxyMsg!=NULL) {
00208 #ifdef REMOVE_PROXYDATAMSG_EXTRACOPY
00209       AtomMap::Object()->unregisterIDs(patchID,positionPtrBegin,positionPtrEnd);
00210 #else
00211       AtomMap::Object()->unregisterIDs(patchID, p.begin(), p.end());
00212 #endif
00213   }
00214   //Now delete the ProxyDataMsg of the previous step
00215   delete prevProxyMsg;
00216   curProxyMsg = msg;
00217   prevProxyMsg = curProxyMsg;
00218 
00219   flags = msg->flags;
00220 
00221 #ifdef REMOVE_PROXYDATAMSG_EXTRACOPY
00222   //We could set them to 0 for the sake of easy debugging
00223   //if there are something wrong in the "reuse position arrays" code
00224   //--Chao Mei
00225   //p.resize(0);
00226   //p_avg.resize(0);  
00227   positionPtrBegin = msg->positionList;
00228   positionPtrEnd = msg->positionList + msg->plLen;
00229 #else
00230   p.resize(msg->plLen);
00231   memcpy(p.begin(), msg->positionList, sizeof(CompAtom)*(msg->plLen));
00232 #endif
00233 
00234   numAtoms = msg->plLen;
00235   //numAtoms = p.size();
00236   
00237   avgPositionPtrBegin = msg->avgPositionList;
00238   avgPositionPtrEnd = msg->avgPositionList + msg->avgPlLen;
00239 
00240 #ifdef MEM_OPT_VERSION
00241   //We cannot reuse the CompAtomExt list inside the msg because
00242   //the information is needed at every step. In the current implementation
00243   //scheme, the ProxyDataMsg msg will be deleted for every step.
00244   //In order to keep this information, we have to do the extra copy. But
00245   //this overhead is amortized among the steps that atoms don't migrate
00246   // --Chao Mei
00247   pExt.resize(msg->plExtLen);
00248   memcpy(pExt.begin(), msg->positionExtList, sizeof(CompAtomExt)*(msg->plExtLen));
00249 #endif
00250 
00251   // DMK - Atom Separation (water vs. non-water)
00252   #if NAMD_SeparateWaters != 0
00253     numWaterAtoms = msg->numWaterAtoms;
00254   #endif
00255 
00256   positionsReady(1);
00257 }
00258 
00259 void ProxyPatch::sendResults(void)
00260 {
00261   DebugM(3, "sendResults(" << patchID << ")\n");
00262   register int i = 0;
00263   register ForceList::iterator f_i, f_e, f2_i;
00264   for ( i = Results::normal + 1 ; i <= flags.maxForceMerged; ++i ) {
00265     f_i = f[Results::normal].begin(); f_e = f[Results::normal].end();
00266     f2_i = f[i].begin();
00267     for ( ; f_i != f_e; ++f_i, ++f2_i ) *f_i += *f2_i;
00268     f[i].resize(0);
00269   }
00270   for ( i = flags.maxForceUsed + 1; i < Results::maxNumForces; ++i )
00271     f[i].resize(0);
00272 
00273 #if CMK_PERSISTENT_COMM
00274 //  CmiUsePersistentHandle(&localphs, 1);
00275 #endif
00276   if (proxyRecvSpanning == 0) {
00277 #ifdef REMOVE_PROXYRESULTMSG_EXTRACOPY
00278     ProxyResultVarsizeMsg *msg = ProxyResultVarsizeMsg::getANewMsg(CkMyPe(), patchID, PRIORITY_SIZE, f); 
00279 #else
00280     ProxyResultMsg *msg = new (PRIORITY_SIZE) ProxyResultMsg;    
00281     msg->node = CkMyPe();
00282     msg->patch = patchID;
00283     for ( i = 0; i < Results::maxNumForces; ++i ) 
00284       msg->forceList[i] = f[i];
00285 #endif
00286     SET_PRIORITY(msg,flags.sequence,PROXY_RESULTS_PRIORITY + PATCH_PRIORITY(patchID));
00287     ProxyMgr::Object()->sendResults(msg);
00288   }
00289   else {
00290     ProxyCombinedResultMsg *msg = new (PRIORITY_SIZE) ProxyCombinedResultMsg;
00291     SET_PRIORITY(msg,flags.sequence,
00292                 PROXY_RESULTS_PRIORITY + PATCH_PRIORITY(patchID));
00293     msg->nodes.add(CkMyPe());
00294     msg->patch = patchID;
00295     for ( i = 0; i < Results::maxNumForces; ++i ) 
00296       msg->forceList[i] = f[i];
00297     ProxyMgr::Object()->sendResults(msg);
00298   }
00299 #if CMK_PERSISTENT_COMM
00300   CmiUsePersistentHandle(NULL, 0);
00301 #endif
00302 }
00303 
00304 #ifdef NODEAWARE_PROXY_SPANNINGTREE
00305 void ProxyPatch::setSpanningTree(int p, int *c, int n) { 
00306   parent=p; numChild = n; nWait = 0;
00307   delete [] children;
00308   if(n==0) {
00309       children = NULL;
00310       return;
00311   }
00312   children = new int[n];
00313   for (int i=0; i<n; i++) children[i] = c[i];
00314 
00315   #if defined(PROCTRACE_DEBUG) && defined(NAST_DEBUG)
00316     DebugFileTrace *dft = DebugFileTrace::Object();
00317     dft->openTrace();
00318     dft->writeTrace("ProxyPatch[%d] has %d children: ", patchID, numChild);
00319     for(int i=0; i<numChild; i++)
00320         dft->writeTrace("%d ", children[i]);
00321     dft->writeTrace("\n");
00322     dft->closeTrace();
00323   #endif
00324 //CkPrintf("setSpanningTree: [%d:%d] %d %d:%d %d\n", CkMyPe(), patchID, parent, nChild, child[0], child[1]);
00325 }
00326 
00327 int ProxyPatch::getSpanningTreeChild(int *c) { 
00328   for (int i=0; i<numChild; i++) c[i] = children[i];
00329   return numChild;
00330 }
00331 
00332 #ifdef USE_NODEPATCHMGR
00333 void ProxyPatch::setSTNodeChildren(int numNids, int *nids){
00334     numNodeChild = numNids;
00335     delete [] nodeChildren;
00336     if(numNids==0) {
00337         nodeChildren = NULL;
00338         return;
00339     }
00340     nodeChildren = new int[numNids];
00341     for(int i=0; i<numNids; i++) nodeChildren[i] = nids[i]; 
00342 }
00343 #endif
00344 
00345 #else //branch for not defined NODEAWARE_PROXY_SPANNINGTREE
00346 void ProxyPatch::setSpanningTree(int p, int *c, int n) { 
00347   parent=p; nChild = n; nWait = 0;
00348   for (int i=0; i<n; i++) child[i] = c[i];
00349 //CkPrintf("setSpanningTree: [%d:%d] %d %d:%d %d\n", CkMyPe(), patchID, parent, nChild, child[0], child[1]);
00350 }
00351 
00352 int ProxyPatch::getSpanningTreeChild(int *c) { 
00353   for (int i=0; i<nChild; i++) c[i] = child[i];
00354   return nChild;
00355 }
00356 #endif
00357 
00358 ProxyCombinedResultMsg *ProxyPatch::depositCombinedResultMsg(ProxyCombinedResultMsg *msg) {
00359 #if defined(NODEAWARE_PROXY_SPANNINGTREE) && defined(USE_NODEPATCHMGR)
00360   CmiLock(depositLock);
00361 #endif
00362   nWait++;
00363   if (nWait == 1) msgCBuffer = msg;
00364   else {
00365     NodeIDList::iterator n_i, n_e;
00366     n_i = msg->nodes.begin();
00367     n_e = msg->nodes.end();
00368     for (; n_i!=n_e; ++n_i) msgCBuffer->nodes.add(*n_i);
00369     for ( int k = 0; k < Results::maxNumForces; ++k )
00370     {
00371     register ForceList::iterator r_i;
00372     r_i = msgCBuffer->forceList[k].begin();
00373     register ForceList::iterator f_i, f_e;
00374     f_i = msg->forceList[k].begin();
00375     f_e = msg->forceList[k].end();
00376     //    for ( ; f_i != f_e; ++f_i, ++r_i ) *r_i += *f_i;
00377 
00378     int nf = f_e - f_i;
00379 #ifdef ARCH_POWERPC
00380 #pragma disjoint (*f_i, *r_i)
00381 #pragma unroll(4)
00382 #endif
00383     for (int count = 0; count < nf; count++) {
00384       r_i[count].x += f_i[count].x;      
00385       r_i[count].y += f_i[count].y;      
00386       r_i[count].z += f_i[count].z;
00387     }
00388 
00389     }
00390     delete msg;
00391   }
00392 //CkPrintf("[%d:%d] wait: %d of %d (%d %d %d)\n", CkMyPe(), patchID, nWait, nChild+1, parent, child[0],child[1]);
00393 #ifdef NODEAWARE_PROXY_SPANNINGTREE
00394   if(nWait == numChild+1) {
00395 #else
00396   if (nWait == nChild + 1) {
00397 #endif
00398     nWait = 0;
00399 #if defined(NODEAWARE_PROXY_SPANNINGTREE) && defined(USE_NODEPATCHMGR)
00400     CmiUnlock(depositLock);
00401 #endif
00402     
00403     return msgCBuffer;
00404   }
00405 
00406 #if defined(NODEAWARE_PROXY_SPANNINGTREE) && defined(USE_NODEPATCHMGR)
00407   CmiUnlock(depositLock);
00408 #endif
00409 
00410   return NULL;
00411 }
00412 

Generated on Mon Sep 8 04:07:42 2008 for NAMD by  doxygen 1.3.9.1