Main Page | Namespace List | Class Hierarchy | Alphabetical List | Class List | File List | Class Members | File Members

ProxyMgr.C

Go to the documentation of this file.
00001 
00007 #include "InfoStream.h"
00008 #include "main.h"
00009 #include "BOCgroup.h"
00010 #include "ProxyMgr.decl.h"
00011 #include "ProxyMgr.h"
00012 #include "PatchMap.inl"
00013 #include "ProxyPatch.h"
00014 #include "ComputeMap.h"
00015 #include "HomePatch.h"
00016 #include <string.h>
00017 #include "ProcessorPrivate.h"
00018 #include "packmsg.h"
00019 #include "Priorities.h"
00020 #ifndef _NO_ALLOCA_H
00021 #include <alloca.h>
00022 #endif
00023 #ifndef _NO_MALLOC_H
00024 #include <malloc.h>
00025 #endif
00026 
00027 #include <map>
00028 #include <vector>
00029 #include <algorithm>
00030 
00031 #if defined(NODEAWARE_PROXY_SPANNINGTREE) && defined(USE_NODEPATCHMGR) && (CMK_SMP) && defined(NAMDSRC_IMMQD_HACK)
00032 #include "qd.h"
00033 #endif
00034 
00035 //#define DEBUGM
00036 #define MIN_DEBUG_LEVEL 2
00037 #include "Debug.h"
00038 
00039 #define ALLOCA(TYPE,NAME,SIZE) TYPE *NAME = (TYPE *) alloca((SIZE)*sizeof(TYPE))
00040 
00041 int proxySendSpanning   = 0;
00042 int proxyRecvSpanning   = 0;
00043 //"proxySpanDim" is a configuration parameter as "proxyTreeBranchFactor" in configuration file
00044 int proxySpanDim        = 4;
00045 int inNodeProxySpanDim = 16;
00046 
00047 PACK_MSG(ProxySpanningTreeMsg,
00048   PACK(patch);
00049   PACK(node);
00050   PACK_RESIZE(tree);
00051 )
00052 
00053 void* ProxyResultMsg::pack(ProxyResultMsg *msg) {
00054 
00055   int msg_size = 0;
00056   msg_size += sizeof(msg->node);
00057   msg_size += sizeof(msg->patch);
00058 
00059   int j;
00060   for ( j = 0; j < Results::maxNumForces; ++j ) {
00061     int array_size = msg->forceList[j].size();
00062     msg_size += sizeof(array_size);
00063     msg_size += array_size * sizeof(char);    
00064     msg_size = ALIGN_8 (msg_size);
00065     Force* f = msg->forceList[j].begin();
00066     int nonzero_count = 0;
00067     for ( int i = 0; i < array_size; ++i ) {
00068       if ( f[i].x != 0. || f[i].y != 0. || f[i].z != 0. ) { ++nonzero_count; }
00069     }
00070     msg_size += nonzero_count * sizeof(Vector);
00071   }
00072 
00073   void *msg_buf = CkAllocBuffer(msg,msg_size);
00074   char *msg_cur = (char *)msg_buf;
00075 
00076   CmiMemcpy((void*)msg_cur,(void*)(&(msg->node)),sizeof(msg->node));
00077   msg_cur += sizeof(msg->node);
00078   CmiMemcpy((void*)msg_cur,(void*)(&(msg->patch)),sizeof(msg->patch));
00079   msg_cur += sizeof(msg->patch);
00080   for ( j = 0; j < Results::maxNumForces; ++j ) {
00081     int array_size = msg->forceList[j].size();
00082     *(int *) msg_cur = array_size;
00083     msg_cur += sizeof(int);
00084     char *nonzero = msg_cur;
00085     msg_cur += array_size * sizeof(char);
00086     msg_cur = (char *)ALIGN_8 (msg_cur);
00087     Vector *farr = (Vector *)msg_cur;
00088     Force* f = msg->forceList[j].begin();
00089 
00090     for ( int i = 0; i < array_size; ++i ) {
00091       if ( f[i].x != 0. || f[i].y != 0. || f[i].z != 0. ) {
00092         nonzero[i] = 1;
00093         farr->x = f[i].x;
00094         farr->y = f[i].y;
00095         farr->z = f[i].z;
00096         farr ++;
00097       } else {
00098         nonzero[i] = 0;
00099       }
00100     }
00101     msg_cur = (char *) farr;      
00102   }
00103 
00104   delete msg;
00105   return msg_buf;
00106 }
00107 
00108 ProxyResultMsg* ProxyResultMsg::unpack(void *ptr) {
00109 
00110   void *vmsg = CkAllocBuffer(ptr,sizeof(ProxyResultMsg));
00111   ProxyResultMsg *msg = new (vmsg) ProxyResultMsg;
00112   char *msg_cur = (char*)ptr;
00113 
00114   CmiMemcpy((void*)(&(msg->node)),(void*)msg_cur,sizeof(msg->node));
00115   msg_cur += sizeof(msg->node);
00116   CmiMemcpy((void*)(&(msg->patch)),(void*)msg_cur,sizeof(msg->patch));
00117   msg_cur += sizeof(msg->patch);
00118   int j;
00119   for ( j = 0; j < Results::maxNumForces; ++j ) {
00120     int array_size = *(int *) msg_cur;
00121     msg_cur += sizeof(array_size);
00122     msg->forceList[j].resize(array_size);
00123     char *nonzero = msg_cur;
00124     msg_cur += array_size * sizeof(char);    
00125     msg_cur = (char *)ALIGN_8 (msg_cur);
00126     Vector* farr = (Vector *) msg_cur;
00127     Force* f = msg->forceList[j].begin();
00128     for ( int i = 0; i < array_size; ++i ) {
00129       if ( nonzero[i] ) {
00130         f[i].x = farr->x;
00131         f[i].y = farr->y;
00132         f[i].z = farr->z;
00133         farr++;
00134       } else {
00135         f[i].x = 0.;  f[i].y = 0.;  f[i].z = 0.;
00136       }
00137     }    
00138     msg_cur = (char *) farr;
00139   }
00140 
00141   CkFreeMsg(ptr);
00142   return msg;
00143 }
00144 
00145 ProxyResultVarsizeMsg *ProxyResultVarsizeMsg::getANewMsg(NodeID nid, PatchID pid, int prioSize, ForceList *fls){
00146 
00147     //1. decide the length of forceArr and iszero field.
00148     int tmpLen[Results::maxNumForces];
00149     int iszeroLen = 0;
00150     for (int i=0; i<Results::maxNumForces; i++){
00151         tmpLen[i] = fls[i].size();
00152         iszeroLen += tmpLen[i];
00153     }
00154     char *tmpIszero = new char[iszeroLen];
00155     char *iszeroPtr = tmpIszero;
00156     int fArrLen = 0;
00157     for(int i=0; i<Results::maxNumForces; i++) {        
00158         Force *fiPtr = fls[i].begin();
00159         for(int j=0; j<tmpLen[i]; j++, fiPtr++, iszeroPtr++) {         
00160             if(fiPtr->x!=0.0 || fiPtr->y!=0.0 || fiPtr->z!=0) {
00161                 *iszeroPtr=0;
00162                 fArrLen++;
00163             }else{
00164                 *iszeroPtr=1;
00165             }            
00166         }
00167     }
00168 
00169     //2. Ready to create the msg, and set all fields
00170     ProxyResultVarsizeMsg *retmsg = new(fArrLen, iszeroLen, prioSize)ProxyResultVarsizeMsg;
00171     retmsg->node = nid;
00172     retmsg->patch = pid;
00173     memcpy(retmsg->flLen, tmpLen, sizeof(int)*Results::maxNumForces);
00174     iszeroPtr = tmpIszero;
00175     Force *forcePtr = retmsg->forceArr;
00176     for(int i=0; i<Results::maxNumForces; i++) {        
00177         Force *fiPtr = fls[i].begin();
00178         for(int j=0; j<tmpLen[i]; j++, fiPtr++, iszeroPtr++) {
00179             if((*iszeroPtr)!=1) {
00180                 forcePtr->x = fiPtr->x;
00181                 forcePtr->y = fiPtr->y;
00182                 forcePtr->z = fiPtr->z;
00183                 forcePtr++;
00184             }            
00185         }
00186     }
00187     memcpy(retmsg->isZero, tmpIszero, sizeof(char)*iszeroLen);
00188     delete [] tmpIszero;
00189     return retmsg;
00190 }
00191 
00192 ProxyNodeAwareSpanningTreeMsg *ProxyNodeAwareSpanningTreeMsg::getANewMsg(PatchID pid, NodeID nid, proxyTreeNode *tree, int size){
00193     int numAllPes = 0;
00194     for(int i=0; i<size; i++) {
00195         numAllPes += tree[i].numPes;
00196     }
00197     ProxyNodeAwareSpanningTreeMsg *retmsg = new(size, numAllPes, 0) ProxyNodeAwareSpanningTreeMsg;
00198     retmsg->patch = pid;
00199     retmsg->procID = nid;
00200     retmsg->numNodesWithProxies = size;
00201     int *pAllPes = retmsg->allPes;
00202     for(int i=0; i<size; i++) {
00203         retmsg->numPesOfNode[i] = tree[i].numPes;
00204         for(int j=0; j<tree[i].numPes; j++) {
00205             *pAllPes = tree[i].peIDs[j];
00206             pAllPes++;
00207         }
00208     }
00209     return retmsg;
00210 }
00211 
00212 //Only available when macro PROCTRACE_DEBUG is defined
00213 void ProxyNodeAwareSpanningTreeMsg::printOut(char *tag){
00214 #ifdef PROCTRACE_DEBUG
00215     DebugFileTrace *dft = DebugFileTrace::Object();
00216     dft->openTrace();
00217     const char *patchname = "ProxyPatch";
00218     if(procID == CkMyPe()) patchname = "HomePatch";
00219     dft->writeTrace("%s: %s[%d] on proc %d node %d has ST (src %d) with %d nodes\n", 
00220                     tag, patchname, patch, CkMyPe(), CkMyNode(), procID, numNodesWithProxies);
00221     if(numNodesWithProxies==0) {
00222         dft->closeTrace();
00223         return;
00224     }
00225     dft->writeTrace("%s: ===%d===pes/node: ", tag, patch);
00226     for(int i=0; i<numNodesWithProxies; i++) {
00227         dft->writeTrace("%d ", numPesOfNode[i]);
00228     }
00229     dft->writeTrace("\n%s: ===%d===pe list: ", tag, patch);
00230     int *p = allPes;
00231     for(int i=0; i<numNodesWithProxies; i++) {
00232         for(int j=0; j<numPesOfNode[i]; j++) {
00233             dft->writeTrace("%d ", *p);
00234             p++;
00235         }
00236     }
00237     dft->writeTrace("\n");    
00238     dft->closeTrace();
00239 #endif
00240 }
00241 
00242 // for spanning tree
00243 ProxyCombinedResultRawMsg* ProxyCombinedResultMsg::toRaw(ProxyCombinedResultMsg *msg) {
00244   int totalFLLen=0;
00245   int nonzero_count = 0;
00246   int nodeSize = msg->nodes.size();
00247   for (int j = 0; j < Results::maxNumForces; ++j ) {
00248         int array_size = msg->forceList[j].size();
00249     totalFLLen +=  array_size;
00250     Force* f = msg->forceList[j].begin();
00251     for ( int i = 0; i < array_size; ++i ) {
00252       if ( f[i].x != 0. || f[i].y != 0. || f[i].z != 0. ) { ++nonzero_count; }
00253     }
00254   }
00255 
00256   ProxyCombinedResultRawMsg *msg_buf = new(nodeSize, totalFLLen, nonzero_count, PRIORITY_SIZE)ProxyCombinedResultRawMsg;
00257   //Copy envelope stuff
00258   {
00259          envelope *oenv = UsrToEnv(msg);
00260          envelope *nenv = UsrToEnv(msg_buf);
00261          CmiMemcpy(nenv->getPrioPtr(), oenv->getPrioPtr(), nenv->getPrioBytes());
00262   }
00263 
00264   msg_buf->nodeSize = nodeSize;
00265   for (int i=0; i<nodeSize; i++) {
00266     msg_buf->nodes[i] = msg->nodes[i];
00267   }
00268   #if defined(NODEAWARE_PROXY_SPANNINGTREE) && defined(USE_NODEPATCHMGR)
00269   msg_buf->destPe = msg->destPe;
00270   #if CMK_SMP && defined(NAMDSRC_IMMQD_HACK)
00271   msg_buf->isFromImmMsgCall = msg->isFromImmMsgCall;
00272   #endif
00273   #endif
00274   msg_buf->patch = msg->patch;
00275 
00276   Force *farr = msg_buf->forceArr;
00277   char *isNonZeroPtr = msg_buf->isForceNonZero;
00278   for ( int j = 0; j < Results::maxNumForces; ++j ) {
00279         int array_size = msg->forceList[j].size();
00280     msg_buf->flLen[j] = array_size;
00281     Force* f = msg->forceList[j].begin();
00282     for ( int i = 0; i < array_size; ++i , isNonZeroPtr++) {
00283       if ( f[i].x != 0. || f[i].y != 0. || f[i].z != 0. ) {
00284         *isNonZeroPtr = 1;
00285                 farr->x  =  f[i].x;
00286         farr->y  =  f[i].y;
00287         farr->z  =  f[i].z;
00288         farr ++;
00289       } else {
00290         *isNonZeroPtr = 0;
00291       }
00292     }
00293   }
00294   return msg_buf;
00295 }
00296 
00297 ProxyCombinedResultMsg* ProxyCombinedResultMsg::fromRaw(ProxyCombinedResultRawMsg *ptr) {
00298 
00299   //CkPrintf("[%d]: unpacking: plainData=%p\n", CkMyPe(), ptr->plainData);      
00300 
00301   void *vmsg = CkAllocBuffer(ptr,sizeof(ProxyCombinedResultMsg));
00302   ProxyCombinedResultMsg *msg = new (vmsg) ProxyCombinedResultMsg;
00303 
00304   for (int i=0; i<ptr->nodeSize; i++) {
00305     msg->nodes.add(ptr->nodes[i]);
00306   }
00307   #if defined(NODEAWARE_PROXY_SPANNINGTREE) && defined(USE_NODEPATCHMGR)
00308   msg->destPe = ptr->destPe;
00309   #if CMK_SMP && defined(NAMDSRC_IMMQD_HACK)
00310   msg->isFromImmMsgCall = ptr->isFromImmMsgCall;
00311   #endif
00312   #endif
00313   msg->patch = ptr->patch;
00314 
00315   char *nonzero = ptr->isForceNonZero;
00316   Force* farr = ptr->forceArr;
00317 
00318   for ( int j = 0; j < Results::maxNumForces; ++j ) {
00319     int array_size = ptr->flLen[j];
00320     msg->forceList[j].resize(array_size);
00321     Force* f = msg->forceList[j].begin();
00322 
00323     for ( int i = 0; i < array_size; ++i, nonzero++ ) {
00324       if ( *nonzero ) {
00325                 f[i].x = farr->x;
00326                 f[i].y = farr->y;
00327                 f[i].z = farr->z;
00328                 farr++;
00329       } else {
00330         f[i].x = 0.;  f[i].y = 0.;  f[i].z = 0.;
00331       }
00332     }
00333   }
00334 
00335   delete ptr;
00336   return msg;
00337 }
00338 
00339 // class static
00340 int ProxyMgr::nodecount = 0;
00341 
00342 ProxyMgr::ProxyMgr() { 
00343   if (CkpvAccess(ProxyMgr_instance)) {
00344     NAMD_bug("Tried to create ProxyMgr twice.");
00345   }
00346   CkpvAccess(ProxyMgr_instance) = this;
00347 }
00348 
00349 ProxyMgr::~ProxyMgr() { 
00350   removeProxies();
00351   CkpvAccess(ProxyMgr_instance) = NULL;
00352 }
00353 
00354 
00355 void ProxyMgr::setSendSpanning() {
00356   if(CkMyRank()!=0) return; 
00357   proxySendSpanning = 1;
00358 }
00359 
00360 int ProxyMgr::getSendSpanning() {
00361   return proxySendSpanning;
00362 }
00363 
00364 void ProxyMgr::setRecvSpanning() {
00365   if(CkMyRank()!=0) return;
00366   proxyRecvSpanning = 1;
00367 }
00368 
00369 int ProxyMgr::getRecvSpanning() {
00370   return proxyRecvSpanning;
00371 }
00372 
00373 void ProxyMgr::setProxyTreeBranchFactor(int dim){
00374     if(CkMyRank()!=0) return;
00375     proxySpanDim = dim;
00376 }
00377 
00378 ProxyTree &ProxyMgr::getPtree() {
00379   return ptree;
00380 }
00381 
00382 void ProxyMgr::removeProxies(void)
00383 {
00384   ProxySetIter pi(proxySet);
00385   for ( pi = pi.begin(); pi != pi.end(); pi++)
00386   {
00387     delete pi->proxyPatch;
00388   }
00389   proxySet.clear();
00390 }
00391 
00392 void ProxyMgr::removeUnusedProxies(void)
00393 {
00394   ResizeArray<PatchID> toDelete;
00395   ProxySetIter pi(proxySet);
00396   for ( pi = pi.begin(); pi != pi.end(); pi++)
00397   {
00398     if ( pi->proxyPatch->getNumComputes() == 0 ) {
00399       toDelete.add(pi->patchID);
00400       //fprintf(stderr, "Proxy Deleted Patch %d Proc %d", pi->patchID, CkMyPe());
00401     }
00402   }
00403   PatchID *pidi = toDelete.begin();
00404   for ( ; pidi != toDelete.end(); ++pidi ) {
00405     removeProxy(*pidi);
00406   }
00407 }
00408 
00409 // Figure out which proxies we need and create them
00410 void ProxyMgr::createProxies(void)
00411 {
00412   // Delete the old proxies.
00413   removeProxies();
00414 
00415   PatchMap *patchMap = PatchMap::Object();
00416   int numPatches = patchMap->numPatches();
00417   int myNode = CkMyPe();
00418   enum PatchFlag { Unknown, Home, NeedProxy };
00419   int *patchFlag = new int[numPatches]; 
00420   int i, j;
00421 
00422   // Note all home patches.
00423   for ( i = 0; i < numPatches; ++i )
00424   {
00425     patchFlag[i] = ( patchMap->node(i) == myNode ) ? Home : Unknown;
00426   }
00427 
00428   // Add all upstream neighbors.
00429   PatchID neighbors[PatchMap::MaxOneAway];
00430   PatchIDList basepids;
00431   patchMap->basePatchIDList(myNode,basepids);
00432   for ( i = 0; i < basepids.size(); ++i )
00433   {
00434     if ( patchMap->node(basepids[i]) != myNode ) {
00435         patchFlag[basepids[i]] = NeedProxy;
00436     }
00437     int numNeighbors = patchMap->upstreamNeighbors(basepids[i],neighbors);
00438     for ( j = 0; j < numNeighbors; ++j )
00439     {
00440       if ( ! patchFlag[neighbors[j]] ) {
00441         patchFlag[neighbors[j]] = NeedProxy;
00442       }
00443     }
00444   }
00445 
00446   // Check all patch-based compute objects.
00447   ComputeMap *computeMap = ComputeMap::Object();
00448   int nc = computeMap->numComputes();
00449   for ( i = 0; i < nc; ++i )
00450   {
00451 #ifdef NAMD_CUDA
00452     ComputeType t = computeMap->type(i);
00453     if ( t == computeNonbondedSelfType || t == computeNonbondedPairType )
00454       continue;
00455 #endif
00456     if ( computeMap->node(i) != myNode ) 
00457       continue;
00458     int numPid = computeMap->numPids(i);
00459     for ( j = 0; j < numPid; ++j )
00460     {
00461       int pid = computeMap->pid(i,j);
00462       if ( ! patchFlag[pid] ) {
00463         patchFlag[pid] = NeedProxy;
00464       }
00465     }
00466   }
00467   // Create proxy list
00468   for ( i = 0; i < numPatches; ++i ) {
00469     if ( patchFlag[i] == NeedProxy )
00470     { // create proxy patch
00471       ProxyPatch *proxy = new ProxyPatch(i);
00472       proxySet.add(ProxyElem(i, proxy));
00473       patchMap->registerPatch(i, proxy);
00474     }
00475   }
00476   delete[] patchFlag;
00477 }
00478 
00479 void
00480 ProxyMgr::createProxy(PatchID pid) {
00481   Patch *p = PatchMap::Object()->patch(pid);
00482   if (!p) {
00483      DebugM(4,"createProxy("<<pid<<")\n");
00484      ProxyPatch *proxy = new ProxyPatch(pid);
00485      proxySet.add(ProxyElem(pid,proxy));
00486      PatchMap::Object()->registerPatch(pid,proxy);
00487   }
00488   else {
00489      DebugM(4,"createProxy("<<pid<<") found " << p->getPatchID() << "\n");
00490   }
00491     
00492 }
00493 
00494 void
00495 ProxyMgr::removeProxy(PatchID pid) {
00496   ProxyElem *p = proxySet.find(ProxyElem(pid));
00497   if (p) { 
00498     PatchMap::Object()->unregisterPatch(pid,p->proxyPatch);
00499     delete p->proxyPatch;
00500     proxySet.del(ProxyElem(pid));
00501     // iout << iINFO << "Removing unused proxy " << pid << " on " << iPE << ".\n" << endi;
00502   }
00503 }
00504   
00505 void
00506 ProxyMgr::registerProxy(PatchID pid) {
00507   // determine which node gets message
00508   NodeID node = PatchMap::Object()->node(pid);
00509 
00510   RegisterProxyMsg *msg = new RegisterProxyMsg;
00511 
00512   msg->node=CkMyPe();
00513   msg->patch = pid;
00514 
00515   CProxy_ProxyMgr cp(CkpvAccess(BOCclass_group).proxyMgr);
00516   cp[node].recvRegisterProxy(msg);
00517 }
00518 
00519 void
00520 ProxyMgr::recvRegisterProxy(RegisterProxyMsg *msg) {
00521   HomePatch *homePatch = PatchMap::Object()->homePatch(msg->patch);
00522   homePatch->registerProxy(msg); // message deleted in registerProxy()
00523 }
00524 
00525 void
00526 ProxyMgr::unregisterProxy(PatchID pid) {
00527   // determine which node gets message
00528   NodeID node = PatchMap::Object()->node(pid);
00529 
00530   UnregisterProxyMsg *msg = new UnregisterProxyMsg;
00531 
00532   msg->node=CkMyPe();
00533   msg->patch = pid;
00534 
00535   CProxy_ProxyMgr cp(CkpvAccess(BOCclass_group).proxyMgr);
00536   cp[node].recvUnregisterProxy(msg);
00537 }
00538 
00539 void
00540 ProxyMgr::recvUnregisterProxy(UnregisterProxyMsg *msg) {
00541   HomePatch *homePatch = PatchMap::Object()->homePatch(msg->patch);
00542   homePatch->unregisterProxy(msg); // message deleted in registerProxy()
00543 }
00544 
00545 void 
00546 ProxyMgr::buildProxySpanningTree()
00547 {
00548   PatchIDList pids;
00549   if (!CkMyPe()) iout << iINFO << "Building spanning tree ... send: " << proxySendSpanning << " recv: " << proxyRecvSpanning 
00550       << " with branch factor " << proxySpanDim <<"\n" << endi;
00551   PatchMap::Object()->homePatchIDList(pids);
00552   for (int i=0; i<pids.size(); i++) {
00553     HomePatch *home = PatchMap::Object()->homePatch(pids[i]);
00554     if (home == NULL) CkPrintf("ERROR: homepatch NULL\n");
00555 #ifdef NODEAWARE_PROXY_SPANNINGTREE
00556     home->buildNodeAwareSpanningTree();
00557 #else
00558     home->buildSpanningTree();
00559 #endif
00560   }
00561 }
00562 
00563 void 
00564 ProxyMgr::buildProxySpanningTree2()
00565 {
00566 #if 0
00567   //The homePatchIDList is an expensive as it goes through
00568   //every patch ids.
00569   PatchIDList pids;
00570   PatchMap::Object()->homePatchIDList(pids);
00571   for (int i=0; i<pids.size(); i++) {
00572     HomePatch *home = PatchMap::Object()->homePatch(pids[i]);
00573     if (home == NULL) CkPrintf("ERROR: homepatch NULL\n");
00574     home->sendProxies();
00575   }
00576 #else
00577   HomePatchList *hpl = PatchMap::Object()->homePatchList();
00578   HomePatchListIter iter(*hpl);
00579   for(iter=iter.begin(); iter!=iter.end(); iter++) {
00580           HomePatch *home = iter->patch;
00581           home->sendProxies();
00582   }
00583 #endif
00584 }
00585 
00586 void 
00587 ProxyMgr::sendProxies(int pid, int *list, int n)
00588 {
00589   CProxy_ProxyMgr cp(CkpvAccess(BOCclass_group).proxyMgr);
00590   cp[0].recvProxies(pid, list, n);
00591 }
00592 
00593 //The value defines the max number of intermediate proxies (acting
00594 //as the node to relay proxy msgs to children) allowed to reside 
00595 //on a physical node for proxy spanning tree
00596 #define MAX_INTERNODE 1
00597 
00598 //Only for debug
00599 static void outputProxyTree(ProxyTree &ptree, int np){
00600         FILE *ofp = fopen("patch_proxylist.txt", "w");
00601         std::vector<int> plist;
00602         for(int i=0; i<np; i++) {
00603                 fprintf(ofp, "%d: ", i);
00604                 int listlen = ptree.proxylist[i].size();
00605                 fprintf(ofp, "#%d ", listlen);
00606                 plist.clear();
00607                 for(int j=0; j<listlen; j++) {
00608                         plist.push_back(ptree.proxylist[i][j]);
00609                 }
00610                 std::sort(plist.begin(), plist.end());
00611                 for(int j=0; j<listlen; j++) {
00612                         fprintf(ofp, "%d ", plist[j]);
00613                 }
00614                 fprintf(ofp, "\n");
00615         }
00616         fclose(ofp);
00617 }
00618 
00619 // only on PE 0
00620 void 
00621 ProxyMgr::recvProxies(int pid, int *list, int n)
00622 {
00623   int nPatches = PatchMap::Object()->numPatches();
00624   if (ptree.proxylist == NULL)
00625     ptree.proxylist = new NodeIDList[nPatches];
00626   ptree.proxylist[pid].resize(n);
00627   for (int i=0; i<n; i++)
00628     ptree.proxylist[pid][i] = list[i];
00629   ptree.proxyMsgCount ++;
00630   if (ptree.proxyMsgCount == nPatches) {
00631         //debug
00632         //outputProxyTree(ptree, nPatches);
00633 
00634     ptree.proxyMsgCount = 0;
00635     // building and sending of trees is done in two steps now
00636     // so that the building step can be shifted to the load balancer
00637 #ifdef NODEAWARE_PROXY_SPANNINGTREE
00638     buildNodeAwareSpanningTree0();
00639 #else
00640     buildSpanningTree0();    
00641 #endif
00642     sendSpanningTrees();
00643   }
00644 }
00645 
00646 void ProxyMgr::recvPatchProxyInfo(PatchProxyListMsg *msg){
00647         int nPatches = PatchMap::Object()->numPatches();
00648         if(ptree.proxylist == NULL) ptree.proxylist = new NodeIDList[nPatches];
00649         CmiAssert(msg->numPatches == nPatches);
00650         int peIdx = 0;
00651         for(int i=0; i<nPatches; i++) {
00652                 int pid = msg->patchIDs[i];
00653                 int plen = msg->proxyListLen[i];
00654                 ptree.proxylist[pid].resize(plen);
00655                 for(int j=0; j<plen; j++) {
00656                         ptree.proxylist[pid][j] = msg->proxyPEs[peIdx++];
00657                 }               
00658         }
00659         delete msg;
00660         
00661         //debug
00662         //outputProxyTree(ptree, nPatches);
00663 
00664         ptree.proxyMsgCount = 0;
00665     // building and sending of trees is done in two steps now
00666     // so that the building step can be shifted to the load balancer
00667 #ifdef NODEAWARE_PROXY_SPANNINGTREE
00668     buildNodeAwareSpanningTree0();
00669 #else
00670     buildSpanningTree0();
00671 #endif
00672     sendSpanningTrees();
00673 }
00674 
00675 //
00676 // XXX static and global variables are unsafe for shared memory builds.
00677 // The global and static vars should be eliminated.  
00678 // Unfortunately, the routines that use these below are actually 
00679 // in use in NAMD.
00680 //
00681 extern double *cpuloads;
00682 static int *procidx = NULL;
00683 static double averageLoad = 0.0;
00684 
00685 static int compLoad(const void *a, const void *b)
00686 {
00687   int i1 = *(int *)a;
00688   int i2 = *(int *)b;
00689   double d1 = cpuloads[i1];
00690   double d2 = cpuloads[i2];
00691   if (d1 < d2) 
00692     return 1;
00693   else if (d1 == d2) 
00694     return 0;
00695   else 
00696     return -1;
00697   // sort from high to low
00698 }
00699 
00700 static void processCpuLoad()
00701 {
00702   int i;
00703   if (!procidx) {
00704     procidx = new int[CkNumPes()];
00705   }
00706   for (i=0; i<CkNumPes(); i++) procidx[i] = i;
00707   qsort(procidx, CkNumPes(), sizeof(int), compLoad);
00708 
00709   double averageLoad = 0.0;
00710   for (i=0; i<CkNumPes(); i++) averageLoad += cpuloads[i];
00711   averageLoad /= CkNumPes();
00712 //  iout << "buildSpanningTree1: no intermediate node on " << procidx[0] << " " << procidx[1] << endi;
00713 
00714 }
00715 
00716 static int noInterNode(int p)
00717 {
00718   int exclude = 0;
00719   if(CkNumPes()<1025)
00720     exclude = 5;
00721   else if(CkNumPes()<4097)
00722     exclude = 10;
00723   else if(CkNumPes()<8193)
00724     exclude = 40;
00725   else if(CkNumPes()<16385)
00726     exclude = 40;
00727   else
00728     exclude = 80;
00729   for (int i=0; i<exclude; i++) if (procidx[i] == p) return 1;
00730 //  if (cpuloads[p] > averageLoad) return 1;
00731   return 0;
00732 }
00733 
00734 #ifdef NODEAWARE_PROXY_SPANNINGTREE
00735 //only on PE 0
00736 void ProxyMgr::buildNodeAwareSpanningTree0(){
00737         CkPrintf("Info: build node-aware spanning tree with send: %d, recv: %d with branch factor %d\n", 
00738                          proxySendSpanning, proxyRecvSpanning, proxySpanDim);
00739     int numPatches = PatchMap::Object()->numPatches();
00740     if (ptree.naTrees == NULL) ptree.naTrees = new proxyTreeNodeList[numPatches];
00741     for (int pid=0; pid<numPatches; pid++)     
00742         buildSinglePatchNodeAwareSpanningTree(pid, ptree.proxylist[pid], ptree.naTrees[pid]);
00743        
00744 
00745     //Debug
00746     //printf("#######################Naive ST#######################\n");
00747     //printProxySpanningTree();
00748 
00749     //Now the naive spanning tree has been constructed and stored in oneNATree;
00750     //Afterwards, some optimizations on this naive spanning tree could be done.
00751     //except the first element as the tree root always contains the processor
00752     //that has home patch
00753 
00754     //1st Optimization: reduce intermediate nodes as much as possible. In details,
00755     //the optimal case is that on a single physical smp node, there should be no
00756     //two proxies who act as the intermediate nodes to pass information to childrens
00757     //in the spanning tree. E.g, for patch A's proxy spanning tree, it has a node X as
00758     //its intermediate node. However, for patch B's, it also has a node X as its intermediate
00759     //node. We should avoid this situation as node X becomes the bottleneck as it has twice
00760     //amount of work to process now.
00761     //Step1: foward to the first patch that has proxies
00762     //Now proxyNodeMap records the info that how many intermediate nodes on a node
00763         //each element indiates the number of proxies residing on this node    
00764     int pid=0;
00765     for(;pid<numPatches; pid++) {
00766         if(ptree.proxylist[pid].size()>0) break;
00767     }
00768     if(pid==numPatches) {
00769         return;
00770     }
00771     int *proxyNodeMap = new int[CkNumNodes()];
00772     memset(proxyNodeMap, 0, sizeof(int)*CkNumNodes());
00773     proxyTreeNodeList onePatchT = ptree.naTrees[pid];
00774     //If a node is an intermediate node, then its idx should satisfy
00775     //idx*proxySpanDim + 1 < onePatchT.size()
00776     int lastInterNodeIdx = (onePatchT.size()-2)/proxySpanDim;
00777     for(int i=1; i<lastInterNodeIdx; i++) { //excluding the root node
00778         int nid = onePatchT.item(i).nodeID;
00779         proxyNodeMap[nid]++;
00780     }
00781     //Step2: iterate over each patch's proxy spanning tree to adjust
00782     //the tree node positions. The bad thing here is that it may involve
00783     //many memory allocations and deallocation for small-size (~100bytes)
00784     //chunks.
00785     pid++; //advance to the next patch
00786     for(; pid<numPatches; pid++) {
00787         if(ptree.proxylist[pid].size()==0) continue;
00788         onePatchT = ptree.naTrees[pid];
00789         lastInterNodeIdx = (onePatchT.size()-2)/proxySpanDim;
00790         for(int i=1; i<=lastInterNodeIdx; i++) {
00791             int nid = onePatchT.item(i).nodeID;
00792             if(proxyNodeMap[nid]<MAX_INTERNODE) {
00793                 proxyNodeMap[nid]++;
00794                 continue;
00795             }
00796             //the position is occupied, so search the children
00797             //nodes to see whether there's one to swap this node
00798             //if not found, find the first position that has smallest
00799             //amount of nodes.
00800             int leastIdx = -1;
00801             int leastAmount = ~(1<<31);
00802             //iterate children nodes
00803             int swapPos;
00804             for(swapPos=lastInterNodeIdx+1; swapPos<onePatchT.size(); swapPos++) {
00805                 int chiNId = onePatchT.item(swapPos).nodeID;
00806                 if(proxyNodeMap[chiNId]<MAX_INTERNODE) {
00807                     break;
00808                 }
00809                 if(proxyNodeMap[chiNId]<leastAmount) {
00810                     leastAmount = proxyNodeMap[chiNId];
00811                     leastIdx = swapPos;
00812                 }
00813             }
00814             if(swapPos==onePatchT.size()) {
00815                 CmiAssert(leastIdx!=-1); //because the above loop at least executes once
00816                 //indicate we cannot find a physical node which
00817                 //still allows the intermediate proxy.
00818                 swapPos = leastIdx;
00819             }
00820             //swap the current proxy tree node "i" with node "swapPos"
00821             proxyTreeNode *curNode = &onePatchT.item(i);
00822             proxyTreeNode *swapNode = &onePatchT.item(swapPos);
00823             proxyNodeMap[swapNode->nodeID]++; //update the proxyNodeMap record
00824             int tmp = curNode->nodeID;
00825             curNode->nodeID = swapNode->nodeID;
00826             swapNode->nodeID = tmp;
00827             tmp = curNode->numPes;
00828             ALLOCA(int,tmpPes,tmp);
00829             memcpy(tmpPes, curNode->peIDs, sizeof(int)*tmp);
00830             delete [] curNode->peIDs;
00831             curNode->numPes = swapNode->numPes;
00832             curNode->peIDs = new int[swapNode->numPes];
00833             memcpy(curNode->peIDs, swapNode->peIDs, sizeof(int)*swapNode->numPes);
00834             swapNode->numPes = tmp;
00835             delete [] swapNode->peIDs;
00836             swapNode->peIDs = new int[tmp];
00837             memcpy(swapNode->peIDs, tmpPes, sizeof(int)*tmp);                      
00838         }
00839     }
00840     delete [] proxyNodeMap;    
00841 
00842     //Debug
00843     //printf("#######################After 1st optimization#######################\n");
00844     //printProxySpanningTree();
00845 
00846     //2nd optimization: similar to the 1st optimization but now thinking in
00847     //the core level. If we cannot avoid place two intermediate proxy
00848     //on the same node, we'd better to place them in different cores inside
00849     //the node
00850     if(CmiMyNodeSize()==1) {
00851         //No need to perform the second optimization as every node has only 1 core
00852         return;
00853     }
00854     //Step1: forward to the first patch that has proxies
00855     pid=0;
00856     for(;pid<numPatches; pid++) {
00857         if(ptree.proxylist[pid].size()>0) break;
00858     }
00859     if(pid==numPatches) {
00860         return;
00861     }
00862     int *proxyCoreMap = new int[CkNumPes()];
00863     memset(proxyCoreMap, 0, sizeof(int)*CkNumPes());
00864     onePatchT = ptree.naTrees[pid];
00865     //If a node is an intermediate node, then its idx should satisfy
00866     //idx*proxySpanDim + 1 < onePatchT.size()
00867     lastInterNodeIdx = (onePatchT.size()-2)/proxySpanDim;
00868     for(int i=1; i<lastInterNodeIdx; i++) { //excluding the root node
00869         int rootProcID = onePatchT.item(i).peIDs[0];
00870         proxyCoreMap[rootProcID]++;
00871     }
00872     //Step2: iterate over each patch's proxy spanning tree to adjust
00873     //the root's position of intermediate proxies.
00874     pid++; //advance to the next patch
00875     for(; pid<numPatches; pid++) {
00876         if(ptree.proxylist[pid].size()==0) continue;
00877         onePatchT = ptree.naTrees[pid];
00878         lastInterNodeIdx = (onePatchT.size()-2)/proxySpanDim;
00879         for(int i=1; i<=lastInterNodeIdx; i++) {
00880             proxyTreeNode *curNode = &onePatchT.item(i);
00881             int rootProcID = curNode->peIDs[0];
00882             if(curNode->numPes==1 || proxyCoreMap[rootProcID]<MAX_INTERNODE){
00883                 //if this node contains only 1 core, then we have to leave it as it is
00884                 //because there are no other cores in the same node that could be used to
00885                 //adjust
00886                 proxyCoreMap[rootProcID]++;
00887                 continue;
00888             }
00889             
00890             //foound more than MAX_INTERNODE intermediate proxies on the same core,
00891             //adjust the root id of the core of this proxy tree node
00892             int leastIdx = -1;
00893             int leastAmount = ~(1<<31);
00894             //iterate children nodes
00895             int swapPos;
00896             
00897             for(swapPos=1; swapPos<curNode->numPes; swapPos++) {
00898                 int otherCoreID = curNode->peIDs[swapPos];
00899                 if(proxyCoreMap[otherCoreID]<MAX_INTERNODE) {
00900                     break;
00901                 }
00902                 if(proxyCoreMap[otherCoreID]<leastAmount) {
00903                     leastAmount = proxyCoreMap[otherCoreID];
00904                     leastIdx = swapPos;
00905                 }
00906             }
00907             if(swapPos==curNode->numPes) {
00908                 CmiAssert(leastIdx!=-1); //because the above loop body must execute at least once
00909                 //indicate we cannot find a physical node which
00910                 //still allows the intermediate proxy.
00911                 swapPos = leastIdx;
00912             }
00913             int tmp = curNode->peIDs[swapPos];
00914             curNode->peIDs[swapPos] = curNode->peIDs[0];
00915             curNode->peIDs[0] = tmp;
00916             proxyCoreMap[tmp]++;
00917         }      
00918     }
00919 
00920     delete proxyCoreMap;
00921 
00922     //Debug
00923     //printf("#######################After 2nd optimization#######################\n");
00924     //printProxySpanningTree();
00925 }
00926 
00927 void ProxyMgr::buildSinglePatchNodeAwareSpanningTree(PatchID pid, NodeIDList &proxyList, 
00928                                                      proxyTreeNodeList &ptnTree){       
00929     int numProxies = proxyList.size();
00930     if (numProxies == 0) {
00931         //CkPrintf ("This is sheer evil in building node-aware spanning tree!\n\n");            
00932         return;
00933     }        
00934  
00935     //usually the #proxies is at most 62 (considering 2-away in all dimensions)
00936     //so the access in proxyNodeMap and proxyTreeIdx is at most log2(62)=8 if
00937     //all proxies are in different nodes
00938     //could be better than a CkNumNodes() array in that cache perf. is better
00939     //because of the reduced memory footprint -Chao Mei
00940     std::map<int, int> proxyNodeMap; //<node id, numProxies>    
00941     std::vector<int> proxyNodeIDs;
00942     std::map<int, int> proxyTreeIdx; //<node id, idx in proxyNodeIDs>
00943     
00944     //the processor id of home patch
00945     int hpProcID = PatchMap::Object()->node(pid);
00946     int hpNodeID = CkNodeOf(hpProcID);
00947     proxyNodeMap[hpNodeID]=1;
00948     proxyTreeIdx[hpNodeID]=0;
00949     proxyNodeIDs.push_back(hpNodeID);
00950     //proxyNodeList[0] = hpNodeID;
00951     //int numNodesWithProxies = 1;
00952     
00953     for(int i=0; i<numProxies; i++) {
00954         int procId = proxyList[i];
00955         int nodeId = CkNodeOf(procId);
00956         std::map<int, int>::iterator it=proxyNodeMap.find(nodeId);
00957         if(it==proxyNodeMap.end()) {
00958             proxyNodeMap[nodeId] = 1;
00959             proxyTreeIdx[nodeId] = proxyNodeIDs.size();
00960             proxyNodeIDs.push_back(nodeId);
00961         }else{
00962             proxyNodeMap[nodeId]++;
00963         }        
00964     }
00965     proxyTreeNodeList &oneNATree = ptnTree;   // spanning tree
00966     int numNodesWithProxies = proxyNodeIDs.size();
00967     oneNATree.resize(numNodesWithProxies);
00968     //initialize oneNATree
00969     for(int i=0; i<numNodesWithProxies; i++) {
00970         proxyTreeNode *oneNode = &oneNATree.item(i);
00971         delete [] oneNode->peIDs;
00972         oneNode->nodeID = proxyNodeIDs[i];
00973         oneNode->peIDs = new int[proxyNodeMap[oneNode->nodeID]];                        
00974         oneNode->numPes = 0; //initially set to zero as used for incrementing later
00975     }
00976     
00977     //set up the tree root which contains the home patch processor
00978     proxyTreeNode *rootnode = &oneNATree.item(0);
00979     rootnode->peIDs[0] = hpProcID;
00980     rootnode->numPes++;
00981     
00982     for(int i=0; i<numProxies; i++) {
00983         int procId = proxyList[i];
00984         int nodeId = CkNodeOf(procId);
00985         int idxInTree = proxyTreeIdx[nodeId];
00986         CmiAssert(idxInTree>=0 && idxInTree<numNodesWithProxies);
00987         proxyTreeNode *oneNode = &oneNATree.item(idxInTree);
00988         oneNode->peIDs[oneNode->numPes] = procId;
00989         oneNode->numPes++;
00990     }
00991 }
00992 #else //branch of NODEAWARE_PROXY_SPANNINGTREE
00993 // only on PE 0
00994 void 
00995 ProxyMgr::buildSpanningTree0()
00996 {
00997         CkPrintf("Info: build spanning tree with send: %d, recv: %d with branch factor %d\n", 
00998                          proxySendSpanning, proxyRecvSpanning, proxySpanDim);
00999 
01000   int i;
01001 
01002   processCpuLoad();
01003 
01004   int *numPatchesOnNode = new int[CkNumPes()];
01005   int numNodesWithPatches = 0;
01006   for (i=0; i<CkNumPes(); i++) numPatchesOnNode[i] = 0;
01007   int numPatches = PatchMap::Object()->numPatches();
01008   for (i=0; i<numPatches; i++) {
01009     int node = PatchMap::Object()->node(i);
01010     numPatchesOnNode[node]++;
01011     if (numPatchesOnNode[node] == 1)
01012       numNodesWithPatches ++;
01013   }
01014   int patchNodesLast =
01015     ( numNodesWithPatches < ( 0.7 * CkNumPes() ) );
01016   int *ntrees = new int[CkNumPes()];
01017   for (i=0; i<CkNumPes(); i++) ntrees[i] = 0;
01018   if (ptree.trees == NULL) ptree.trees = new NodeIDList[numPatches];
01019   for (int pid=0; pid<numPatches; pid++) 
01020   {
01021     int numProxies = ptree.proxylist[pid].size();
01022     if (numProxies == 0) {
01023       //CkPrintf ("This is sheer evil!\n\n");
01024       //ProxyMgr::Object()->sendSpanningTreeToHomePatch(pid, NULL, 0);
01025       delete [] ntrees;
01026       delete [] numPatchesOnNode;
01027       return;
01028     }
01029     NodeIDList &tree = ptree.trees[pid];   // spanning tree
01030     NodeIDList oldtree = tree;
01031     tree.resize(numProxies+1);
01032     tree.setall(-1);
01033     tree[0] = PatchMap::Object()->node(pid);
01034     int s=1, e=numProxies;
01035     int nNonPatch = 0;
01036     int treesize = 1;
01037     int pp;
01038 
01039     // keep tree persistent for non-intermediate nodes
01040     for (pp=0; pp<numProxies; pp++) {
01041       int p = ptree.proxylist[pid][pp];
01042       int oldindex = oldtree.find(p);
01043       if (oldindex != -1 && oldindex <= numProxies) {
01044         int isIntermediate = (oldindex*proxySpanDim+1 <= numProxies);
01045         if (!isIntermediate) {
01046           tree[oldindex] = p;
01047         }
01048         else if (ntrees[p] < MAX_INTERNODE) {
01049           tree[oldindex] = p;
01050           ntrees[p] ++;
01051         }
01052       }
01053     }
01054 
01055     for (pp=0; pp<numProxies; pp++) {
01056       int p = ptree.proxylist[pid][pp];              // processor number
01057       if (tree.find(p) != -1) continue;        // already used
01058       treesize++;
01059       if (patchNodesLast && numPatchesOnNode[p] ) {
01060         while (tree[e] != -1) { e--; if (e==-1) e = numProxies; }
01061         tree[e] = p;
01062         int isIntermediate = (e*proxySpanDim+1 <= numProxies);
01063         if (isIntermediate) ntrees[p]++;
01064       }
01065       else {
01066         while (tree[s] != -1) { s++; if (s==numProxies+1) s = 1; }
01067         int isIntermediate = (s*proxySpanDim+1 <= numProxies);
01068         if (isIntermediate && (ntrees[p] >= MAX_INTERNODE || noInterNode(p))) {   // TOO MANY INTERMEDIATE TREES
01069         //if (isIntermediate && ntrees[p] >= MAX_INTERNODE)    // TOO MANY INTERMEDIATE TREES
01070           while (tree[e] != -1) { e--; if (e==-1) e = numProxies; }
01071           tree[e] = p;
01072           isIntermediate = (e*proxySpanDim+1 <= numProxies);
01073           if (isIntermediate) ntrees[p]++;
01074         }
01075         else {
01076           tree[s] = p;
01077           nNonPatch++;
01078           if (isIntermediate) ntrees[p]++;
01079         }
01080       }
01081     }
01082     // send homepatch's proxy tree
01083     if(ptree.sizes)
01084       ptree.sizes[pid] = treesize;
01085     //ProxyMgr::Object()->sendSpanningTreeToHomePatch(pid, &tree[0], treesize);
01086   }
01087   /*for (i=0; i<CkNumPes(); i++) {
01088     if (ntrees[i] > MAX_INTERNODE) iout << "Processor " << i << "has (guess) " << ntrees[i] << " intermediate nodes." << endi;
01089   }*/
01090   delete [] ntrees;
01091   delete [] numPatchesOnNode;
01092 }
01093 #endif
01094 
01095 void ProxyMgr::sendSpanningTrees()
01096 {
01097   int numPatches = PatchMap::Object()->numPatches();
01098   for (int pid=0; pid<numPatches; pid++) {
01099     int numProxies = ptree.proxylist[pid].size();
01100 #ifdef NODEAWARE_PROXY_SPANNINGTREE
01101     if (numProxies == 0)
01102       ProxyMgr::Object()->sendNodeAwareSpanningTreeToHomePatch(pid, NULL, 0);
01103     else {
01104       ProxyMgr::Object()->sendNodeAwareSpanningTreeToHomePatch(pid, ptree.naTrees[pid].begin(), ptree.naTrees[pid].size());
01105     }
01106 #else
01107     if (numProxies == 0)
01108       ProxyMgr::Object()->sendSpanningTreeToHomePatch(pid, NULL, 0);
01109     else {
01110       ProxyMgr::Object()->sendSpanningTreeToHomePatch(pid, ptree.trees[pid].begin(), ptree.trees[pid].size());
01111     }
01112 #endif
01113   }
01114 }
01115 
01116 void ProxyMgr::sendSpanningTreeToHomePatch(int pid, int *tree, int n)
01117 {
01118   CProxy_ProxyMgr cp(thisgroup);
01119   cp[PatchMap::Object()->node(pid)].recvSpanningTreeOnHomePatch(pid, tree, n);
01120 }
01121 
01122 void ProxyMgr::recvSpanningTreeOnHomePatch(int pid, int *tree, int n)
01123 {
01124   HomePatch *p = PatchMap::Object()->homePatch(pid);
01125   p->recvSpanningTree(tree, n);
01126 }
01127 
01128 void ProxyMgr::sendNodeAwareSpanningTreeToHomePatch(int pid, proxyTreeNode *tree, int n)
01129 {
01130   CProxy_ProxyMgr cp(thisgroup);
01131   ProxyNodeAwareSpanningTreeMsg *msg = ProxyNodeAwareSpanningTreeMsg::getANewMsg(pid, -1, tree, n);
01132   cp[PatchMap::Object()->node(pid)].recvNodeAwareSpanningTreeOnHomePatch(msg);
01133 }
01134 
01135 void ProxyMgr::recvNodeAwareSpanningTreeOnHomePatch(ProxyNodeAwareSpanningTreeMsg *msg)
01136 {
01137   HomePatch *p = PatchMap::Object()->homePatch(msg->patch);
01138   p->recvNodeAwareSpanningTree(msg);
01139   delete msg;
01140 }
01141 
01142 void 
01143 ProxyMgr::sendSpanningTree(ProxySpanningTreeMsg *msg) {
01144   CProxy_ProxyMgr cp(CkpvAccess(BOCclass_group).proxyMgr);
01145   cp[msg->tree[0]].recvSpanningTree(msg);
01146 }
01147 
01148 void ProxyMgr::sendNodeAwareSpanningTree(ProxyNodeAwareSpanningTreeMsg *msg){
01149   CProxy_ProxyMgr cp(CkpvAccess(BOCclass_group).proxyMgr);
01150   int pe = msg->allPes[0]; //the root procID
01151 
01152 #if defined(PROCTRACE_DEBUG) && defined(NAST_DEBUG)
01153   DebugFileTrace *dft = DebugFileTrace::Object();
01154   dft->openTrace();
01155   dft->writeTrace("PMgr::sndST: from proc %d for patch[%d]\n", pe, msg->patch);
01156   dft->closeTrace();
01157 #endif
01158 
01159   cp[pe].recvNodeAwareSpanningTree(msg);
01160 }
01161 
01162 void 
01163 ProxyMgr::recvSpanningTree(ProxySpanningTreeMsg *msg) {
01164   int size = msg->tree.size();
01165   int *child = new int[proxySpanDim];
01166   int nChild = 0;
01167   int i;
01168   ProxyPatch *proxy = (ProxyPatch *) PatchMap::Object()->patch(msg->patch);
01169   for (i=0; i<proxySpanDim; i++) {
01170     if (size > i+1) { child[i] = msg->tree[i+1]; nChild++; }
01171   }
01172   if (!PatchMap::Object()->homePatch(msg->patch)) {
01173     proxy->setSpanningTree(msg->node, child, nChild);
01174   }
01175 
01176   // build subtree and pass down
01177  if (nChild > 0) {
01178 
01179   nodecount ++;
01180   //if (nodecount > MAX_INTERNODE) 
01181   //  iout << "Processor " << CkMyPe() << "has (actual) " << nodecount << " intermediate nodes." << endi;
01182 
01183 //CkPrintf("[%d] %d:(%d) %d %d %d %d %d\n", CkMyPe(), msg->patch, size, msg->tree[0], msg->tree[1], msg->tree[2], msg->tree[3], msg->tree[4]);
01184   NodeIDList *tree = new NodeIDList[proxySpanDim];
01185   int level = 1, index=1;
01186   int done = 0;
01187   while (!done) {
01188     for (int n=0; n<nChild; n++) {
01189       if (done) break;
01190       for (int j=0; j<level; j++) {
01191        if (index >= size) { done = 1; break; }
01192        tree[n].add(msg->tree[index]);
01193        index++;
01194       }
01195     }
01196     level *=proxySpanDim;
01197   }
01198 
01199   ProxyMgr *proxyMgr = ProxyMgr::Object();
01200   for (i=0; i<proxySpanDim; i++) {
01201     if (tree[i].size()) {
01202       ProxySpanningTreeMsg *cmsg = new ProxySpanningTreeMsg;
01203       cmsg->patch = msg->patch;
01204       cmsg->node = CkMyPe();
01205       cmsg->tree = tree[i];
01206       proxyMgr->sendSpanningTree(cmsg);
01207     }
01208   }
01209 
01210   delete [] tree;
01211  }
01212 
01213   delete [] child;
01214   delete msg;
01215 }
01216 
01217 //The "msg" represents the subtree rooted at this proc
01218 void ProxyMgr::recvNodeAwareSpanningTree(ProxyNodeAwareSpanningTreeMsg *msg){
01219 #if defined(PROCTRACE_DEBUG) && defined(NAST_DEBUG)
01220     DebugFileTrace *dft = DebugFileTrace::Object();
01221     dft->openTrace();
01222     dft->writeTrace("PMgr::recvST0 for patch[%d] with #nodes=%d\n", msg->patch, msg->numNodesWithProxies);
01223     dft->closeTrace();
01224     msg->printOut("PMgr::recvST");
01225 #endif
01226 
01227     //This function is divided into three parts. The tree root is msg->allPes[0]
01228     //1. set up its own immediate childrens
01229     int treesize = msg->numNodesWithProxies; //at least include one as its internal procs    
01230     int iNChild = msg->numPesOfNode[0]-1; //number of internal children
01231     int eNChild = treesize-1; //number of external children
01232 
01233     CmiAssert(treesize>0);
01234     //use the same way of computing children in HomePatch::setupChildrenFromProxySpanningTree    
01235     eNChild = (proxySpanDim>eNChild)?eNChild:proxySpanDim;
01236     int iSlots = proxySpanDim-eNChild;
01237     if(iNChild>0) {
01238         if(iSlots==0){
01239             //at least having one internal child
01240             iNChild = 1;    
01241         }else{
01242             iNChild = (iSlots>iNChild)?iNChild:iSlots;
01243         }
01244     }    
01245     int numChild = iNChild + eNChild;
01246     if(numChild==0){
01247         //Indicating this proxy is a leaf in the spanning tree
01248         ProxyPatch *proxy = (ProxyPatch *) PatchMap::Object()->patch(msg->patch);
01249         proxy->setSpanningTree(msg->procID, NULL, 0);
01250 #if defined(NODEAWARE_PROXY_SPANNINGTREE) && defined(USE_NODEPATCHMGR)
01251                 //When using NODEPATCHMGR, the proc-level is a flat list attached to the node
01252                 //while the node-level spanning tree obeys the branch factor.
01253                 //As a result, when passing down spanning trees, if this proc is on the same node
01254                 //of its parent, then the NodeProxyMgr has already been set by its parent. There's
01255                 //no need resetting here. However, the nodeChildren attached to this proxy has
01256                 //to be set to NULL. -Chao Mei
01257                 int onSameNode = (CkMyNode() == CkNodeOf(msg->procID));
01258         //set up proxyInfo inside NodeProxyMgr
01259         if(!onSameNode && !PatchMap::Object()->homePatch(msg->patch)){
01260             //only when this processor contains a proxy patch of "msg->patch"
01261             //is the patch registeration in NodeProxyMgr needed,
01262             //and itself needs to be registered
01263             CProxy_NodeProxyMgr pm(CkpvAccess(BOCclass_group).nodeProxyMgr);
01264             NodeProxyMgr *npm = pm[CkMyNode()].ckLocalBranch();        
01265             npm->registerPatch(msg->patch, msg->numPesOfNode[0], msg->allPes);            
01266         }
01267         //set children in terms of node ids
01268         proxy->setSTNodeChildren(0, NULL);       
01269 #endif
01270         delete msg;
01271         return;
01272     }
01273 
01274     nodecount++;
01275     //if (nodecount > MAX_INTERNODE) 
01276     //  iout << "Processor " << CkMyPe() << "has (actual) " << nodecount << " intermediate nodes." << endi;
01277 
01278     if(!PatchMap::Object()->homePatch(msg->patch)){
01279         //the home patch of this spanning tree has been already set
01280         //in HomePatch::setupChildrenFromProxySpanningTree, so we
01281         //only care about the children setup for proxy patches here
01282         ProxyPatch *proxy = (ProxyPatch *) PatchMap::Object()->patch(msg->patch);
01283         ALLOCA(int,children,numChild);
01284         //add external children
01285         int *p = msg->allPes + msg->numPesOfNode[0];
01286         for(int i=0; i<eNChild; i++) {
01287             children[i] = *p;
01288             p += msg->numPesOfNode[i+1];
01289         }
01290         //add internal children
01291         for(int i=eNChild, j=1; i<numChild; i++, j++) {
01292             children[i] = msg->allPes[j]; 
01293         }
01294         proxy->setSpanningTree(msg->procID, children, numChild);
01295 
01296 #if defined(NODEAWARE_PROXY_SPANNINGTREE) && defined(USE_NODEPATCHMGR)
01297                 int onSameNode = (CkMyNode() == CkNodeOf(msg->procID));
01298                 if(!onSameNode) {
01299                         //set up proxyInfo inside NodeProxyMgr
01300                         CProxy_NodeProxyMgr pm(CkpvAccess(BOCclass_group).nodeProxyMgr);
01301                         NodeProxyMgr *npm = pm[CkMyNode()].ckLocalBranch();        
01302                         npm->registerPatch(msg->patch, msg->numPesOfNode[0], msg->allPes);        
01303         
01304                         //set children in terms of node ids
01305                         ALLOCA(int,nodeChildren,eNChild+1);
01306                         p = msg->allPes + msg->numPesOfNode[0];
01307                         for(int i=0; i<eNChild; i++) {
01308                                 nodeChildren[i] = CkNodeOf(*p);
01309                                 p += msg->numPesOfNode[i+1];
01310                         }
01311                         //the last entry always stores the node id that contains this proxy
01312                         nodeChildren[eNChild] = CkNodeOf(msg->allPes[0]);
01313                         proxy->setSTNodeChildren(eNChild+1, nodeChildren);
01314                 } else {
01315                         proxy->setSTNodeChildren(0, NULL);
01316                 }
01317 #endif
01318     }
01319 
01320     //2. send msgs for the tree to children proxies
01321     ResizeArray<int> *exTreeChildSize = new ResizeArray<int>[numChild];
01322     ResizeArray<int *> *exTreeChildPtr = new ResizeArray<int *>[numChild];    
01323 
01324     //2a. first processing children of external nodes
01325     if(eNChild > 0) {    
01326         int nodesToCnt = 1; //the number of children each root (current root's 
01327                             //immedidate external nodes) has in each level
01328         int pos = 1; //track the iteration over msg->numPesOfNode and skip the current root
01329         int *pePtr = msg->allPes + msg->numPesOfNode[0];
01330         int done = 0;
01331         while(!done) {
01332             for(int childID=0; childID<eNChild; childID++) {
01333                 //iterate nodes on each level
01334                 for(int i=0; i<nodesToCnt; i++) {
01335                     int cursize = msg->numPesOfNode[pos];
01336                     exTreeChildSize[childID].add(cursize);
01337                     exTreeChildPtr[childID].add(pePtr);
01338                     pos++;
01339                     pePtr += cursize; 
01340                     if(pos==msg->numNodesWithProxies) {
01341                         done = 1;
01342                         break;
01343                     }
01344                 }
01345                 if(done) break;                         
01346             }
01347             nodesToCnt *= proxySpanDim;
01348         }
01349     }
01350 
01351     //2b. secondly processing children on the same node
01352     if(iNChild>0) {
01353         int nodesToCnt = 1; //the number of children each root (current root's 
01354                             //immedidate internal child proxies) has in each level
01355         int pos = 1; //track the iteration over proxies on the same node and skip the current root
01356         int *pePtr = msg->allPes+1; //skip the root
01357         int done = 0;
01358         while(!done) {
01359             for(int childID=eNChild; childID<numChild; childID++) {
01360                 //iterate nodes on each level
01361                 for(int i=0; i<nodesToCnt; i++) {                    
01362                     exTreeChildSize[childID].add(1);
01363                     exTreeChildPtr[childID].add(pePtr);
01364                     pos++;
01365                     pePtr++; 
01366                     if(pos==msg->numPesOfNode[0]) {
01367                         done = 1;
01368                         break;
01369                     }
01370                 }
01371                 if(done) break;                         
01372             }
01373             nodesToCnt *= proxySpanDim;
01374         }
01375     }
01376           
01377     for(int i=0; i<numChild; i++) {                
01378         ResizeArray<int> *allSizes = &exTreeChildSize[i];
01379         ResizeArray<int *> *allPtrs = &exTreeChildPtr[i];
01380         int totalNodes = allSizes->size();
01381         int totalPes = 0;
01382         for(int j=0; j<totalNodes; j++) totalPes += allSizes->item(j);
01383         ProxyNodeAwareSpanningTreeMsg *cmsg = new(totalNodes, totalPes, 0) ProxyNodeAwareSpanningTreeMsg;
01384         cmsg->patch = msg->patch;
01385         cmsg->procID = CkMyPe();
01386         cmsg->numNodesWithProxies = totalNodes;
01387         int *pAllPes = cmsg->allPes;
01388         for(int j=0; j<totalNodes; j++) {
01389             int numPes = allSizes->item(j);
01390             cmsg->numPesOfNode[j] = numPes;
01391             memcpy(pAllPes, allPtrs->item(j), sizeof(int)*numPes);
01392             pAllPes += numPes;
01393         }
01394         #if defined(PROCTRACE_DEBUG) && defined(NAST_DEBUG)
01395         cmsg->printOut("sndChi:");
01396         #endif
01397         ProxyMgr::Object()->sendNodeAwareSpanningTree(cmsg);
01398     }
01399     delete [] exTreeChildSize;
01400     delete [] exTreeChildPtr;  
01401     delete msg;
01402 }
01403 
01404 void ProxyMgr::recvNodeAwareSTParent(int patch, int parent){
01405 #if defined(PROCTRACE_DEBUG) && defined(NAST_DEBUG)
01406     DebugFileTrace *dft = DebugFileTrace::Object();
01407     dft->openTrace();
01408     dft->writeTrace("PMgr::recvSTParent: for ProxyPatch[%d], parent is %d\n", patch, parent);
01409     dft->closeTrace();
01410 #endif
01411     ProxyPatch *proxy = (ProxyPatch *) PatchMap::Object()->patch(patch);
01412     CmiAssert(proxy!=NULL);
01413     proxy->setSpanningTree(parent, NULL, 0);
01414 }
01415 
01416 void ProxyMgr::sendResults(ProxyResultVarsizeMsg *msg) {
01417     CProxy_ProxyMgr cp(CkpvAccess(BOCclass_group).proxyMgr);
01418     NodeID node = PatchMap::Object()->node(msg->patch);
01419     CmiEnableUrgentSend(1);
01420     cp[node].recvResults(msg);
01421     CmiEnableUrgentSend(0);
01422 }
01423 
01424 void ProxyMgr::recvResults(ProxyResultVarsizeMsg *msg) {
01425     HomePatch *home = PatchMap::Object()->homePatch(msg->patch);
01426     home->receiveResults(msg); // delete done in HomePatch::receiveResults()
01427 }
01428 
01429 void ProxyMgr::sendResults(ProxyResultMsg *msg) {
01430   CProxy_ProxyMgr cp(CkpvAccess(BOCclass_group).proxyMgr);
01431   NodeID node = PatchMap::Object()->node(msg->patch);
01432   CmiEnableUrgentSend(1);
01433   cp[node].recvResults(msg);
01434   CmiEnableUrgentSend(0);
01435 }
01436 
01437 void ProxyMgr::recvResults(ProxyResultMsg *msg) {
01438   HomePatch *home = PatchMap::Object()->homePatch(msg->patch);
01439   home->receiveResults(msg); // delete done in HomePatch::receiveResults()
01440 }
01441 
01442 //sendResults is a direct function call, not an entry method
01443 void ProxyMgr::sendResults(ProxyCombinedResultMsg *msg) {
01444   ProxyPatch *patch = (ProxyPatch *)PatchMap::Object()->patch(msg->patch);
01445   ProxyCombinedResultMsg *ocMsg = patch->depositCombinedResultMsg(msg);
01446   if (ocMsg) {
01447         ProxyCombinedResultRawMsg *cMsg = ProxyCombinedResultMsg::toRaw(ocMsg);        
01448     int destPe = patch->getSpanningTreeParent();
01449     CProxy_ProxyMgr cp(CkpvAccess(BOCclass_group).proxyMgr);
01450     CmiAssert(destPe!=CkMyPe());
01451     //if(destPe != CkMyPe()) {
01452 #if defined(NODEAWARE_PROXY_SPANNINGTREE) && defined(USE_NODEPATCHMGR)
01453       /*CkPrintf("ready to call node::recvImmRes on pe[%d] to dest[%d]\n", CkMyPe(), destPe);
01454       fflush(stdout);*/
01455 
01456       cMsg->destPe = destPe;
01457       CProxy_NodeProxyMgr cnp(CkpvAccess(BOCclass_group).nodeProxyMgr);
01458       cnp[CkNodeOf(destPe)].recvImmediateResults(cMsg);
01459 #else
01460       cp[destPe].recvImmediateResults(cMsg);
01461 #endif
01462     //}
01463     //else{
01465     //  cp[destPe].recvResults(cMsg);
01466     //}
01467   }
01468 }
01469 
01470 void ProxyMgr::recvResults(ProxyCombinedResultRawMsg *omsg) {
01471         ProxyCombinedResultRawMsg *msg = omsg;
01472 
01473 //Chao Mei: hack for QD in case of SMP with immediate msg
01474 #if defined(NODEAWARE_PROXY_SPANNINGTREE) && defined(USE_NODEPATCHMGR) && (CMK_SMP) && defined(NAMDSRC_IMMQD_HACK)
01475     if(proxyRecvSpanning && msg->isFromImmMsgCall){
01476 //    CkPrintf("qdcreate called on pe[%d]\n", CkMyPe());
01477 //    fflush(stdout);
01478         //To compensate for the counter loss for message creation
01479         //inside the process of immediate message on comm thread
01480         CkpvAccess(_qd)->create();
01481     }
01482 #endif
01483 
01484   HomePatch *home = PatchMap::Object()->homePatch(msg->patch);
01485   if (home) {
01486     //printf("Home got a message\n");
01487     home->receiveResults(msg); // delete done in HomePatch::receiveResults()
01488   }
01489   else {
01490     NAMD_bug("ProxyMgr should receive result message on home processor");
01491   }
01492 }
01493 
01494 void ProxyMgr::recvImmediateResults(ProxyCombinedResultRawMsg *omsg) {
01495   HomePatch *home = PatchMap::Object()->homePatch(omsg->patch);
01496   if (home) {
01497     CProxy_ProxyMgr cp(CkpvAccess(BOCclass_group).proxyMgr);        
01498     CmiEnableUrgentSend(1);
01499     cp[CkMyPe()].recvResults(omsg);
01500     CmiEnableUrgentSend(0);
01501   }
01502   else {
01503     ProxyPatch *patch = (ProxyPatch *)PatchMap::Object()->patch(omsg->patch);
01504         ProxyCombinedResultMsg *ocMsg = patch->depositCombinedResultRawMsg(omsg);
01505     if (ocMsg) {
01506                 CProxy_ProxyMgr cp(CkpvAccess(BOCclass_group).proxyMgr);
01507                 ProxyCombinedResultRawMsg *cMsg = ProxyCombinedResultMsg::toRaw(ocMsg);         
01508                 cp[patch->getSpanningTreeParent()].recvImmediateResults(cMsg);
01509     }
01510   }
01511 }
01512 
01513 void NodeProxyMgr::recvImmediateResults(ProxyCombinedResultRawMsg *omsg){
01514     ProxyCombinedResultRawMsg *msg = omsg;
01515 #if defined(NODEAWARE_PROXY_SPANNINGTREE) && defined(USE_NODEPATCHMGR)
01516     //CkPrintf("recvImmRes called on comm thread%d pe[%d]\n", CkMyRank()==CmiMyNodeSize(), CkMyPe());
01517     //fflush(stdout);
01518 
01519     int destRank = CkRankOf(msg->destPe);
01520     PatchMap *pmap = localPatchMaps[destRank];
01521     HomePatch *home = pmap->homePatch(msg->patch);
01522     if (home) {
01523 #if CMK_SMP && defined(NAMDSRC_IMMQD_HACK)
01524         msg->isFromImmMsgCall = (CkMyRank()==CkMyNodeSize());
01525 #endif
01526         CProxy_ProxyMgr cp(localProxyMgr);
01527         CmiEnableUrgentSend(1);
01528         cp[msg->destPe].recvResults(msg);
01529         CmiEnableUrgentSend(0);
01530 /*
01531         char *srcfrom = "Isfrom";
01532         if(CkMyRank()!=CmiMyNodeSize()) srcfrom="Notfrom";
01533       CkPrintf("%s comm thread from pe[%d]\n", srcfrom, CkMyPe());
01534       fflush(stdout);
01535 */
01536     }
01537     else {
01538         ProxyPatch *patch = (ProxyPatch *)pmap->patch(msg->patch);
01539         ProxyCombinedResultMsg *ocMsg = patch->depositCombinedResultRawMsg(msg); 
01540         if (ocMsg) {
01541             CProxy_NodeProxyMgr cnp(thisgroup);
01542             ocMsg->destPe = patch->getSpanningTreeParent();
01543                         ProxyCombinedResultRawMsg *cMsg = ProxyCombinedResultMsg::toRaw(ocMsg);
01544             cnp[CkNodeOf(cMsg->destPe)].recvImmediateResults(cMsg);
01545         }
01546     }
01547 #endif
01548 }
01549 
01550 void
01551 ProxyMgr::sendProxyData(ProxyDataMsg *msg, int pcnt, int *pids) {
01552 #if defined(NODEAWARE_PROXY_SPANNINGTREE) && defined(USE_NODEPATCHMGR)
01553     if(proxySendSpanning == 1) {
01554         CProxy_NodeProxyMgr cnp(CkpvAccess(BOCclass_group).nodeProxyMgr);
01555         for(int i=0; i<pcnt-1; i++) {
01556             ProxyDataMsg *copymsg = (ProxyDataMsg *)CkCopyMsg((void **)&msg);
01557             cnp[pids[i]].recvImmediateProxyData(copymsg);
01558         }
01559         cnp[pids[pcnt-1]].recvImmediateProxyData(msg);
01560         return;
01561     }
01562 #endif
01563   CProxy_ProxyMgr cp(CkpvAccess(BOCclass_group).proxyMgr);
01564   cp.recvImmediateProxyData(msg,pcnt,pids);
01565 }
01566 
01567 void 
01568 ProxyMgr::recvProxyData(ProxyDataMsg *msg) {
01569 //Chao Mei: hack for QD in case of SMP with immediate msg
01570 #if defined(NODEAWARE_PROXY_SPANNINGTREE) && defined(USE_NODEPATCHMGR) && (CMK_SMP) && defined(NAMDSRC_IMMQD_HACK)
01571     if(proxySendSpanning && msg->isFromImmMsgCall){
01572 //    CkPrintf("qdcreate called on pe[%d]\n", CkMyPe());
01573 //    fflush(stdout);
01574         //To compensate for the counter loss for message creation
01575         //inside the process of immediate message on comm thread
01576         CkpvAccess(_qd)->create();
01577     }
01578 #endif
01579   ProxyPatch *proxy = (ProxyPatch *) PatchMap::Object()->patch(msg->patch);
01580   proxy->receiveData(msg); // deleted in ProxyPatch::receiveAtoms()
01581 }
01582 
01583 void
01584 ProxyMgr::recvImmediateProxyData(ProxyDataMsg *msg) {
01585   ProxyPatch *proxy = (ProxyPatch *) PatchMap::Object()->patch(msg->patch);  
01586   if (proxySendSpanning == 1) {
01587     // copy the message and send to spanning children
01588     //int *pids = (int*)alloca(proxy->getSpanningTreeNChild()*sizeof(int));
01589     //int npid = proxy->getSpanningTreeChild(pids);
01590     int npid = proxy->getSpanningTreeNChild();
01591     int *pids = (int *)proxy->getSpanningTreeChildPtr();
01592     if (npid) {        
01593         ProxyDataMsg *newmsg = (ProxyDataMsg *)CkCopyMsg((void **)&msg);     
01594 #if CMK_PERSISTENT_COMM && USE_PERSISTENT_TREE
01595         int ntreephs;
01596         PersistentHandle *treephs = proxy->getSpanningTreePhs(ntreephs);
01597         CmiAssert(treephs && ntreephs == npid);
01598         CmiUsePersistentHandle(treephs, ntreephs);
01599 #endif
01600         ProxyMgr::Object()->sendProxyData(newmsg,npid,pids);
01601 #if CMK_PERSISTENT_COMM && USE_PERSISTENT_TREE
01602         CmiUsePersistentHandle(NULL, 0);
01603 #endif
01604       #if 0
01605       //ChaoMei: buggy code??? the spanning tree doesn't always have 2 levels
01606       //At the second level of the tree immediate messages are not needed
01607       CProxy_ProxyMgr cp(CkpvAccess(BOCclass_group).proxyMgr);
01608       cp.recvProxyData(newmsg,npid,pids);
01609       #endif
01610     }
01611   }
01612   /* send to self via EP method to preserve priority */
01613   CProxy_ProxyMgr cp(CkpvAccess(BOCclass_group).proxyMgr);
01614   cp[CkMyPe()].recvProxyData(msg);
01615 }
01616 
01617 void NodeProxyMgr::recvImmediateProxyData(ProxyDataMsg *msg) {    
01618 #if defined(NODEAWARE_PROXY_SPANNINGTREE) && defined(USE_NODEPATCHMGR)
01619     CProxy_ProxyMgr cp(localProxyMgr);
01620     proxyTreeNode *ptn = proxyInfo[msg->patch];
01621     CmiAssert(ptn->numPes!=0);
01622 
01623     //re-send msg to this nodes's children nodes.
01624     //only the first pe of a node of node-aware ST should contain children nodes
01625     int rank = CkRankOf(ptn->peIDs[0]);
01626     PatchMap *pmap = localPatchMaps[rank];
01627     ProxyPatch *ppatch = (ProxyPatch *)pmap->patch(msg->patch);
01628 
01629     int npid = ppatch->getSTNNodeChild();
01630     int *pids = ppatch->getSTNodeChildPtr();
01631     if(npid>0) {        
01632         //only needs to send to other nodes, so check the last entry of pids.
01633         //This is because the data for proxies on the same node have been sent
01634         //later in this function by NodeProxyMgr.
01635         if(pids[npid-1]==CkMyNode()) npid--;
01636     }    
01637     CProxy_NodeProxyMgr cnp(thisgroup);
01638 #if CMK_PERSISTENT_COMM && USE_PERSISTENT_TREE
01639     if (npid) {
01640         int ntreephs;
01641         PersistentHandle *treephs = ppatch->getSpanningTreePhs(ntreephs);
01642         CmiAssert(treephs && ntreephs >= npid);
01643         CmiUsePersistentHandle(treephs, ntreephs);
01644     }
01645 #endif
01646     for(int i=0; i<npid; i++) {
01647         ProxyDataMsg *copymsg = (ProxyDataMsg *)CkCopyMsg((void **)&msg);
01648         cnp[pids[i]].recvImmediateProxyData(copymsg);
01649     }    
01650 #if CMK_PERSISTENT_COMM && USE_PERSISTENT_TREE
01651     CmiUsePersistentHandle(NULL, 0);
01652 #endif
01653 
01654     //re-send msg to it's internal cores
01655 #if CMK_SMP && defined(NAMDSRC_IMMQD_HACK)
01656     msg->isFromImmMsgCall = (CkMyRank()==CkMyNodeSize());
01657 #endif
01658     cp.recvProxyData(msg, ptn->numPes, ptn->peIDs);
01659 #else
01660     CkAbort("Bad execution path to NodeProxyMgr::recvImmediateProxyData\n");
01661 #endif
01662 }
01663 
01664 void
01665 ProxyMgr::sendProxyAll(ProxyDataMsg *msg, int pcnt, int *pids) {
01666 #if defined(NODEAWARE_PROXY_SPANNINGTREE) && defined(USE_NODEPATCHMGR)
01667     if(proxySendSpanning == 1) {
01668         CProxy_NodeProxyMgr cnp(CkpvAccess(BOCclass_group).nodeProxyMgr);
01669         for(int i=0; i<pcnt-1; i++) {
01670             ProxyDataMsg *copymsg = (ProxyDataMsg *)CkCopyMsg((void **)&msg);
01671             cnp[pids[i]].recvImmediateProxyAll(copymsg);
01672         }
01673         cnp[pids[pcnt-1]].recvImmediateProxyAll(msg);
01674         return;
01675     }
01676 #endif
01677   CProxy_ProxyMgr cp(CkpvAccess(BOCclass_group).proxyMgr);
01678   cp.recvImmediateProxyAll(msg,pcnt,pids);
01679 }
01680 
01681 void 
01682 ProxyMgr::recvProxyAll(ProxyDataMsg *msg) {
01683 //Chao Mei: hack for QD in case of SMP with immediate msg
01684 #if defined(NODEAWARE_PROXY_SPANNINGTREE) && defined(USE_NODEPATCHMGR) && (CMK_SMP) && defined(NAMDSRC_IMMQD_HACK)
01685     if(proxySendSpanning && msg->isFromImmMsgCall){
01686 //    CkPrintf("qdcreate called on pe[%d]\n", CkMyPe());
01687 //    fflush(stdout);
01688         //To compensate for the counter loss for message creation
01689         //inside the process of immediate message on comm thread
01690         CkpvAccess(_qd)->create();
01691     }
01692 #endif
01693 
01694   ProxyPatch *proxy = (ProxyPatch *) PatchMap::Object()->patch(msg->patch);
01695   proxy->receiveAll(msg); // deleted in ProxyPatch::receiveAtoms()
01696 }
01697 
01698 void
01699 ProxyMgr::recvImmediateProxyAll(ProxyDataMsg *msg) {
01700   ProxyPatch *proxy = (ProxyPatch *) PatchMap::Object()->patch(msg->patch);
01701   #if defined(PROCTRACE_DEBUG) && defined(NAST_DEBUG)
01702   DebugFileTrace *dft = DebugFileTrace::Object();
01703   dft->openTrace();
01704   dft->writeTrace("PMgr::recvImmPAll for patch[%d]\n", msg->patch);
01705   CmiAssert(proxy!=NULL);
01706   dft->writeTrace("PMgr::recvImmPAll assertion OK for patch[%d]\n", msg->patch);
01707   dft->closeTrace();
01708   #endif
01709   if (proxySendSpanning == 1) {
01710     // copy the message and send to spanning children
01711     //int *pids = (int*)alloca(proxy->getSpanningTreeNChild()*sizeof(int));
01712     //int npid = proxy->getSpanningTreeChild(pids);
01713     int npid = proxy->getSpanningTreeNChild();
01714     int *pids = (int *)proxy->getSpanningTreeChildPtr();
01715     if (npid) {
01716         ProxyDataMsg *newmsg = (ProxyDataMsg *)CkCopyMsg((void **)&msg);      
01717 #if CMK_PERSISTENT_COMM && USE_PERSISTENT_TREE
01718         int ntreephs;
01719         PersistentHandle *treephs = proxy->getSpanningTreePhs(ntreephs);
01720         CmiAssert(treephs && ntreephs == npid);
01721         CmiUsePersistentHandle(treephs, ntreephs);
01722 #endif
01723         ProxyMgr::Object()->sendProxyAll(newmsg,npid,pids);
01724 #if CMK_PERSISTENT_COMM && USE_PERSISTENT_TREE
01725         CmiUsePersistentHandle(NULL, 0);
01726 #endif
01727     }
01728   }
01729   /* send to self via EP method to preserve priority */
01730   CProxy_ProxyMgr cp(CkpvAccess(BOCclass_group).proxyMgr);
01731   cp[CkMyPe()].recvProxyAll(msg);
01732 }
01733 
01734 void NodeProxyMgr::recvImmediateProxyAll(ProxyDataMsg *msg) {    
01735 #if defined(NODEAWARE_PROXY_SPANNINGTREE) && defined(USE_NODEPATCHMGR)
01736     CProxy_ProxyMgr cp(localProxyMgr);
01737     proxyTreeNode *ptn = proxyInfo[msg->patch];
01738     CmiAssert(ptn->numPes!=0);
01739     #if defined(PROCTRACE_DEBUG) && defined(NAST_DEBUG)
01740     //This could be executed on comm thd.
01741     printf("NodePMgr::recvImmPAll for patch[%d] on node %d rank %d, prepare to send proc ", msg->patch, CkMyNode(), CkMyRank());
01742     for(int i=0; i<ptn->numPes; i++) {
01743         printf("%d, ", ptn->peIDs[i]);
01744     }
01745     printf("\n");
01746     fflush(stdout);
01747     #endif
01748 
01749     //re-send msg to this nodes's children nodes.
01750     //only the first pe of a node of node-aware ST should contain children nodes
01751     int rank = CkRankOf(ptn->peIDs[0]);
01752     PatchMap *pmap = localPatchMaps[rank];
01753     ProxyPatch *ppatch = (ProxyPatch *)pmap->patch(msg->patch);
01754 
01755     int npid = ppatch->getSTNNodeChild();
01756     int *pids = ppatch->getSTNodeChildPtr();
01757     if(npid>0) {        
01758         //only needs to send to other nodes, so check the last entry of pids.
01759         //This is because the data for proxies on the same node have been sent
01760         //later in this function by NodeProxyMgr.
01761         if(pids[npid-1]==CkMyNode()) npid--;
01762     }
01763     
01764 #if CMK_PERSISTENT_COMM && USE_PERSISTENT_TREE
01765     if (npid) {
01766         int ntreephs;
01767         PersistentHandle *treephs = ppatch->getSpanningTreePhs(ntreephs);
01768         CmiAssert(treephs && ntreephs >= npid);
01769         CmiUsePersistentHandle(treephs, ntreephs);
01770     }
01771 #endif
01772     CProxy_NodeProxyMgr cnp(thisgroup);
01773     for(int i=0; i<npid; i++) {
01774         ProxyDataMsg *copymsg = (ProxyDataMsg *)CkCopyMsg((void **)&msg);
01775         cnp[pids[i]].recvImmediateProxyAll(copymsg);
01776     }    
01777 #if CMK_PERSISTENT_COMM && USE_PERSISTENT_TREE
01778     CmiUsePersistentHandle(NULL, 0);
01779 #endif
01780 
01781     //re-send msg to it's internal cores
01782 #if CMK_SMP && defined(NAMDSRC_IMMQD_HACK)
01783     msg->isFromImmMsgCall = (CkMyRank()==CkMyNodeSize());
01784 #endif
01785     cp.recvProxyAll(msg, ptn->numPes, ptn->peIDs);
01786 #else
01787     CkAbort("Bad execution path to NodeProxyMgr::recvImmediateProxyData\n");
01788 #endif
01789 }
01790 
01791 void ProxyMgr::printProxySpanningTree(){
01792 #ifdef NODEAWARE_PROXY_SPANNINGTREE
01793     int numPatches = PatchMap::Object()->numPatches();
01794     for(int i=0; i<numPatches; i++) {
01795         proxyTreeNodeList oneList = ptree.naTrees[i];
01796         printf("ST tree for HomePatch[%d]: #nodes = %d\n", i, oneList.size()); 
01797         if(ptree.proxylist[i].size()==0) continue;
01798         printf("===%d=== pes/node: ", i);
01799         for(int j=0; j<oneList.size(); j++) {
01800             printf("%d ", oneList.item(j).numPes);
01801         }
01802         printf("\n");
01803         printf("===%d=== pe ids: ", i);
01804         for(int j=0; j<oneList.size(); j++) {
01805             for(int k=0; k<oneList.item(j).numPes; k++) {
01806                 printf("%d ", oneList.item(j).peIDs[k]);
01807             }            
01808         }
01809         printf("\n");
01810     }    
01811     fflush(stdout);  
01812 #else
01813     int numPatches = PatchMap::Object()->numPatches();
01814     for(int i=0; i<numPatches; i++) {
01815         NodeIDList oneList = ptree.trees[i];
01816         printf("ST tree for HomePatch[%d]: #nodes = %d\n", i, oneList.size()); 
01817         if(ptree.proxylist[i].size()==0) continue;        
01818         printf("===%d=== pe ids: ", i);
01819         for(int j=0; j<oneList.size(); j++) {            
01820             printf("%d ", oneList.item(j));            
01821         }
01822         printf("\n");
01823     }    
01824     fflush(stdout);  
01825 #endif
01826 }
01827 
01828 void NodeProxyMgr::registerPatch(int patchID, int numPes, int *pes){
01829     if(proxyInfo[patchID]) {
01830         delete proxyInfo[patchID];
01831     }
01832     if(numPes == 0) {
01833         proxyInfo[patchID] = NULL;
01834     }else{
01835         proxyInfo[patchID] = new proxyTreeNode(CkNodeOf(pes[0]),numPes,pes);
01836     }
01837 }
01838 
01839 void ProxyMgr::sendResult(ProxyGBISP1ResultMsg *msg) { //pp -r> hp
01840   CProxy_ProxyMgr cp(CkpvAccess(BOCclass_group).proxyMgr);
01841   NodeID node = PatchMap::Object()->node(msg->patch);
01842   CmiEnableUrgentSend(1);
01843   cp[node].recvResult(msg);
01844   CmiEnableUrgentSend(0);
01845 }
01846 void ProxyMgr::recvResult(ProxyGBISP1ResultMsg *msg) { //pp -r> hp
01847   HomePatch *homePatch = PatchMap::Object()->homePatch(msg->patch);
01848   homePatch->receiveResult(msg); // message deleted in registerProxy()
01849 }
01850 void ProxyMgr::recvData(  ProxyGBISP2DataMsg *msg) {  //hp -d> pp
01851   ProxyPatch *proxy = (ProxyPatch *) PatchMap::Object()->patch(msg->patch);
01852   proxy->receiveData(msg); // deleted in ProxyPatch::receiveAtoms() ?
01853 }
01854 void ProxyMgr::sendResult(ProxyGBISP2ResultMsg *msg) { //pp -r> hp
01855   CProxy_ProxyMgr cp(CkpvAccess(BOCclass_group).proxyMgr);
01856   NodeID node = PatchMap::Object()->node(msg->patch);
01857   CmiEnableUrgentSend(1);
01858   cp[node].recvResult(msg);
01859   CmiEnableUrgentSend(0);
01860 }
01861 void ProxyMgr::recvResult(ProxyGBISP2ResultMsg *msg) { //pp -r> hp
01862   HomePatch *homePatch = PatchMap::Object()->homePatch(msg->patch);
01863   homePatch->receiveResult(msg); // message deleted in registerProxy()
01864 }
01865 void ProxyMgr::recvData(  ProxyGBISP3DataMsg *msg) {   //hp -d> pp
01866   ProxyPatch *proxy = (ProxyPatch *) PatchMap::Object()->patch(msg->patch);
01867   proxy->receiveData(msg); // deleted in ProxyPatch::receiveAtoms() ?
01868 }
01869 
01870 PatchProxyListMsg *PatchProxyListMsg::createPatchProxyListMsg(PatchProxyListMsg **bufs, int bufSize, ProxyListInfo *info, int size){
01871         //1. compute the total patches this node manages, and the total length of all proxy lists
01872         int totalPatches = 0;
01873         int totalProxies = 0;
01874         for(int i=0; i<bufSize; i++) {
01875                 PatchProxyListMsg *one = bufs[i];
01876                 totalPatches += one->numPatches;
01877                 for(int j=0; j<one->numPatches; j++) totalProxies += one->proxyListLen[j];
01878         }
01879         totalPatches += size;
01880         for(int i=0; i<size; i++) {
01881                 totalProxies += info[i].numProxies;
01882         }
01883 
01884         PatchProxyListMsg *msg = new(totalPatches, totalPatches, totalProxies, 0)PatchProxyListMsg(totalPatches);
01885         int msgPatchIdx = 0;
01886         int msgProxyPeIdx = 0;
01887         for(int i=0; i<bufSize; i++) {
01888                 PatchProxyListMsg *one = bufs[i];
01889                 int curPeIdx = 0;
01890                 for(int j=0; j<one->numPatches; j++) {
01891                         msg->patchIDs[msgPatchIdx] = one->patchIDs[j];
01892                         int curListLen = one->proxyListLen[j];
01893                         msg->proxyListLen[msgPatchIdx++] = curListLen;
01894                         memcpy(msg->proxyPEs+msgProxyPeIdx, one->proxyPEs+curPeIdx, sizeof(int)*curListLen);
01895                         curPeIdx += curListLen;
01896                         msgProxyPeIdx += curListLen;
01897                 }
01898         }
01899         for(int i=0; i<size; i++) {
01900                 msg->patchIDs[msgPatchIdx] = info[i].patchID;
01901                 int curListLen = info[i].numProxies;
01902                 msg->proxyListLen[msgPatchIdx++] = curListLen;
01903                 memcpy(msg->proxyPEs+msgProxyPeIdx, info[i].proxyList, sizeof(int)*curListLen);
01904                 msgProxyPeIdx += curListLen;
01905         }
01906         return msg;
01907 }
01908 
01909 #define HOMEPATCH_TREE_BRFACTOR 2
01910 void NodeProxyMgr::createSTForHomePatches(PatchMap *pmap){
01911         //We use implicit tree construction for all home patches
01912         std::vector<int> nodesWithPatches; //record the id of node that has home patches
01913         int myNodeIdx = -1; //the index into the above vector of this node
01914         for(int nodeId=0; nodeId<CkNumNodes(); ++nodeId) {
01915                 int hpCnt = 0;
01916                 int firstPe = CkNodeFirst(nodeId);
01917                 int endPe = firstPe + CkNodeSize(nodeId);
01918                 for(int pe=firstPe; pe < endPe; ++pe) {
01919                         hpCnt += pmap->numPatchesOnNode(pe);
01920                 }
01921                 if(hpCnt==0) continue;
01922 
01923                 nodesWithPatches.push_back(nodeId);
01924                 if(CkMyNode() == nodeId) {
01925                         //on my node
01926                         myNodeIdx = nodesWithPatches.size()-1;
01927                         numHomePatches = hpCnt;
01928                         homepatchRecved = 0;
01929                         localProxyLists = new ProxyListInfo[hpCnt];
01930                         memset(localProxyLists, 0, sizeof(ProxyListInfo)*hpCnt);                        
01931                 }
01932         }
01933 
01934         if(myNodeIdx==-1){
01935                 //there's no home patches on this node
01936                 //just set to a value that doesn't make sense in spanning tree.
01937                 parentNode = -2; 
01938                 numKidNodes = 0;
01939                 kidRecved = 0;
01940                 return;
01941         }
01942         
01943         //calculate parent
01944         if(myNodeIdx == 0) {
01945                 parentNode = -1;
01946         }else{
01947                 int parentIdx = (myNodeIdx-1)/HOMEPATCH_TREE_BRFACTOR;
01948                 parentNode = nodesWithPatches[parentIdx];
01949         }
01950 
01951         //calculate kids
01952         numKidNodes = 0;
01953         int totalNodes = nodesWithPatches.size();
01954         for(int i=1; i<=HOMEPATCH_TREE_BRFACTOR; i++) {
01955                 int kidId = myNodeIdx*HOMEPATCH_TREE_BRFACTOR+i;
01956                 if(kidId >= totalNodes) break;
01957                 numKidNodes++;
01958         }
01959         if(numKidNodes!=0) {
01960                 remoteProxyLists = new PatchProxyListMsg *[numKidNodes];
01961         }
01962         kidRecved = 0;
01963 
01964         //CkPrintf("Node[%d] has %d homepatches with parent=%d and %d kids \n", CkMyNode(), numHomePatches, parentNode, numKidNodes);
01965 }
01966 
01967 void NodeProxyMgr::sendProxyList(int pid, int *plist, int size){
01968         int insertIdx; //indexed from 0
01969         CmiLock(localDepositLock);
01970         insertIdx = homepatchRecved++; //ensure the atomic increment
01971 
01972         localProxyLists[insertIdx].patchID = pid;
01973         localProxyLists[insertIdx].numProxies = size;
01974         localProxyLists[insertIdx].proxyList = plist;
01975 
01976         if(insertIdx == (numHomePatches-1)) {
01977                 //all local home patches have contributed
01978                 contributeToParent();
01979         }
01980         CmiUnlock(localDepositLock);
01981 }
01982 
01983 void NodeProxyMgr::sendProxyListInfo(PatchProxyListMsg *msg){
01984         int insertIdx; //indexed from 0
01985         CmiLock(localDepositLock);
01986         insertIdx = kidRecved++;
01987         
01988         remoteProxyLists[insertIdx] = msg;
01989         if(insertIdx == (numKidNodes-1)) {
01990                 //all kids have contributed;
01991                 contributeToParent();
01992         }
01993         CmiUnlock(localDepositLock);
01994 }
01995 
01996 void NodeProxyMgr::contributeToParent(){
01997         if(homepatchRecved!=numHomePatches || kidRecved != numKidNodes) return;
01998 
01999         homepatchRecved = 0;
02000         kidRecved = 0;
02001         //construct the msg
02002         PatchProxyListMsg *msg = PatchProxyListMsg::createPatchProxyListMsg(remoteProxyLists, numKidNodes, localProxyLists, numHomePatches);
02003         if(parentNode == -1) {
02004                 //send to proxy mgr on PE[0] as this is the root node
02005                 CProxy_ProxyMgr cp(CkpvAccess(BOCclass_group).proxyMgr);
02006                 cp[0].recvPatchProxyInfo(msg);
02007         }else{
02008                 CProxy_NodeProxyMgr cnp(thisgroup);
02009                 cnp[parentNode].sendProxyListInfo(msg);
02010         }
02011         for(int i=0; i<numKidNodes; i++) {
02012                 delete remoteProxyLists[i];
02013         }
02014 }
02015 
02016 #include "ProxyMgr.def.h"
02017 

Generated on Fri May 25 04:07:16 2012 for NAMD by  doxygen 1.3.9.1