PatchMgr.C

Go to the documentation of this file.
00001 
00007 #include "InfoStream.h"
00008 #include "PatchMgr.decl.h"
00009 #include "PatchMgr.h"
00010 
00011 #include "NamdTypes.h"
00012 //#include "Compute.h"
00013 #include "HomePatch.h"
00014 #include "PatchMap.h"
00015 #include "AtomMap.h"
00016 
00017 #include "ComputeMsmMsa.h"  // needed for MsmMsaData definition
00018 #include "main.decl.h"
00019 #include "main.h"
00020 
00021 #include "WorkDistrib.decl.h"
00022 #include "WorkDistrib.h"
00023 #include "Node.h"
00024 #include "SimParameters.h"
00025 
00026 #include "packmsg.h"
00027 
00028 // #define DEBUGM
00029 #define MIN_DEBUG_LEVEL 3
00030 #include "Debug.h"
00031 
00032 
00033 // BOC constructor
00034 PatchMgr::PatchMgr()
00035 {
00036     // CkPrintf("[%d] PatchMgr Created\n", CkMyPe());
00037 
00038     // Singleton pattern
00039     if (CkpvAccess(PatchMgr_instance) == NULL) {
00040         CkpvAccess(PatchMgr_instance) = this;
00041     } else {
00042         NAMD_bug("PatchMgr instanced twice on same processor!");
00043     }
00044 
00045     // Get PatchMap singleton started
00046     patchMap = PatchMap::Instance();
00047     patchMap->registerPatchMgr(this);
00048 
00049     recvCheckpointReq_index = CmiRegisterHandler((CmiHandler)recvCheckpointReq_handler);
00050     recvCheckpointLoad_index = CmiRegisterHandler((CmiHandler)recvCheckpointLoad_handler);
00051     recvCheckpointStore_index = CmiRegisterHandler((CmiHandler)recvCheckpointStore_handler);
00052     recvCheckpointAck_index = CmiRegisterHandler((CmiHandler)recvCheckpointAck_handler);
00053 
00054     recvExchangeReq_index = CmiRegisterHandler((CmiHandler)recvExchangeReq_handler);
00055     recvExchangeMsg_index = CmiRegisterHandler((CmiHandler)recvExchangeMsg_handler);
00056 
00057     // Message combining initialization
00058     migrationCountdown = 0;
00059     combineMigrationMsgs = new MigrateAtomsCombinedMsg*[CkNumPes()];
00060     int numPes = CkNumPes();
00061     for ( int i = 0; i < numPes; ++i ) combineMigrationMsgs[i] = 0;
00062 }
00063 
00064 PatchMgr::~PatchMgr()
00065 {
00066     HomePatchListIter hi(homePatches);
00067     for ( hi = hi.begin(); hi != hi.end(); hi++) {
00068       HomePatchElem* elem = homePatches.find(HomePatchElem(hi->pid));
00069       delete elem->patch;
00070     }
00071     delete [] combineMigrationMsgs;
00072 }
00073 
00074 void PatchMgr::createHomePatch(PatchID pid, FullAtomList &a) 
00075 {
00076     HomePatch *patch = new HomePatch(pid, a);
00077     homePatches.load(HomePatchElem(pid, patch));
00078     patchMap->registerPatch(pid, patch);
00079 }
00080 
00081 
00082 // Add a HomePatch to a list of patches to be moved 
00083 // HomePatches are actually moved by invoking sendMovePatches() below
00084 void PatchMgr::movePatch(PatchID pid, NodeID nodeID) 
00085 {
00086     move.load(MovePatch(pid,nodeID));
00087 }
00088 
00089 void PatchMgr::sendOneHomePatch(int patchId, int nodeId){
00090     HomePatch *p = homePatch(patchId);
00091     patchMap->unregisterPatch(patchId, p);
00092 
00093     MovePatchesMsg *msg = new MovePatchesMsg(patchId, p->atom);
00094 
00095     // Deleting the HomePatchElem will call a destructor for clean up
00096     // but the msg elements are safe since they use a container template
00097     // that uses ref counting.
00098     delete p;
00099     homePatches.del(HomePatchElem(patchId)); 
00100 
00101     if ( msg->atom.shared() ) NAMD_bug("shared message array in PatchMgr::sendOneHomePatch");
00102 
00103     // Sending to PatchMgr::recvMovePatches on remote node
00104     CProxy_PatchMgr cp(thisgroup);
00105     cp[nodeId].recvMovePatches(msg);
00106 }
00107 
00108 // Uses list constructed by movePatch() and dispatches
00109 // HomePatch(es) to new nodes
00110 void PatchMgr::sendMovePatches() 
00111 {
00112     if (! move.size())
00113         return;
00114 
00115     MovePatchListIter m(move);
00116     for ( m = m.begin(); m != m.end(); m++) {
00117       HomePatch *p = homePatch(m->pid);
00118       patchMap->unregisterPatch(m->pid, p);
00119 
00120       MovePatchesMsg *msg = new MovePatchesMsg(m->pid, p->atom);
00121 
00122       // Deleting the HomePatchElem will call a destructor for clean up
00123       // but the msg elements are safe since they use a container template
00124       // that uses ref counting.
00125       delete p;
00126       homePatches.del(HomePatchElem(m->pid)); 
00127 
00128       if ( msg->atom.shared() ) NAMD_bug("shared message array in PatchMgr::sendMovePatches");
00129 
00130       // Sending to PatchMgr::recvMovePatches on remote node
00131       CProxy_PatchMgr cp(thisgroup);
00132       cp[m->nodeID].recvMovePatches(msg);
00133     }
00134     move.resize(0);
00135 }
00136 
00137 void PatchMgr::recvMovePatches(MovePatchesMsg *msg) {
00138     // Make a new HomePatch
00139     createHomePatch(msg->pid, msg->atom);
00140     delete msg;
00141 
00142     // Tell sending PatchMgr we received MovePatchMsg
00143 //    AckMovePatchesMsg *ackmsg = 
00144 //      new AckMovePatchesMsg;
00145 //    CSendMsgBranch(PatchMgr,ackMovePatches, ackmsg, thisgroup, msg->fromNodeID);
00146 }
00147     
00148 
00149 //void PatchMgr::ackMovePatches(AckMovePatchesMsg *msg)
00150 //{
00151 //    delete msg;
00152 //    if (! --ackMovePending) 
00153 //      WorkDistrib::messageMovePatchDone();
00154 //}
00155 
00156 
00157 void PatchMgr::sendAtoms(PatchID pid, FullAtomList &a) {
00158 
00159       MovePatchesMsg *msg = new MovePatchesMsg(pid, a);
00160 
00161       if ( msg->atom.shared() ) NAMD_bug("shared message array in PatchMgr::sendAtoms");
00162 
00163       CProxy_PatchMgr cp(thisgroup);
00164       cp[patchMap->node(pid)].recvAtoms(msg);
00165 
00166 }
00167 
00168 void PatchMgr::recvAtoms(MovePatchesMsg *msg) {
00169     patchMap->homePatch(msg->pid)->reinitAtoms(msg->atom);
00170     delete msg;
00171 }
00172 
00173 // Called by HomePatch to migrate atoms off to new patches
00174 // Message combining occurs here
00175 void PatchMgr::sendMigrationMsgs(PatchID src, MigrationInfo *m, int numMsgs) {
00176 /*
00177   for (int i=0; i < numMsgs; i++) {
00178     PatchMgr::Object()->sendMigrationMsg(src, m[i]);
00179   }
00180 */
00181   if ( ! migrationCountdown )  // (re)initialize
00182   {
00183     // DebugM(3,"migrationCountdown (re)initialize\n");
00184     numHomePatches = patchMap->numHomePatches();
00185     migrationCountdown = numHomePatches;
00186     combineMigrationDestPes.resize(0);
00187   }
00188   for (int i=0; i < numMsgs; i++) {  // buffer messages
00189     int destNodeID = m[i].destNodeID;
00190     if ( 1 ) // destNodeID != CkMyPe() )
00191     {
00192       if ( ! combineMigrationMsgs[destNodeID] )
00193       {
00194         combineMigrationMsgs[destNodeID] = new MigrateAtomsCombinedMsg();
00195         combineMigrationDestPes.add(destNodeID);
00196       }
00197       combineMigrationMsgs[destNodeID]->add(src,m[i].destPatchID,m[i].mList);
00198     }
00199     else
00200     {
00201         // for now buffer local messages too
00202     }
00203   }
00204   migrationCountdown -= 1;
00205   // DebugM(3,"migrationCountdown = " << migrationCountdown << "\n");
00206   if ( ! migrationCountdown )  // send out combined messages
00207   {
00208     int n = combineMigrationDestPes.size();
00209     for ( int i = 0; i < n; ++i ) {
00210         int destNodeID = combineMigrationDestPes[i];
00211         DebugM(3,"Sending MigrateAtomsCombinedMsg to node " << destNodeID << "\n");
00212         CProxy_PatchMgr cp(thisgroup);
00213         cp[destNodeID].recvMigrateAtomsCombined(combineMigrationMsgs[destNodeID]);
00214         combineMigrationMsgs[destNodeID] = 0;
00215     }
00216   }
00217 }
00218 
00219 void PatchMgr::recvMigrateAtomsCombined (MigrateAtomsCombinedMsg *msg)
00220 {
00221   DebugM(3,"Received MigrateAtomsCombinedMsg with " << msg->srcPatchID.size() << " messages.\n");
00222   msg->distribute();
00223   delete msg;
00224 }
00225 
00226 void PatchMgr::moveAtom(MoveAtomMsg *msg) {
00227   LocalID lid = AtomMap::Object()->localID(msg->atomid);
00228   if ( lid.pid != notUsed ) {
00229     HomePatch *hp = patchMap->homePatch(lid.pid);
00230     if ( hp ) {
00231       FullAtom &a = hp->atom[lid.index];
00232       if ( msg->moveto ) {
00233         a.fixedPosition = msg->coord;
00234       } else {
00235         a.fixedPosition = hp->lattice.reverse_transform(a.position,a.transform);
00236         a.fixedPosition += msg->coord;
00237       }
00238       a.position = hp->lattice.apply_transform(a.fixedPosition,a.transform);
00239     }
00240   }
00241   delete msg;
00242 }
00243 
00244 void PatchMgr::moveAllBy(MoveAllByMsg *msg) {
00245   // loop over homePatches, moving every atom
00246   for (HomePatchElem *elem = homePatches.begin(); elem != homePatches.end(); elem++) {
00247     HomePatch *hp = elem->patch;
00248     for (int i=0; i<hp->getNumAtoms(); i++) {
00249       FullAtom &a = hp->atom[i];
00250       a.fixedPosition = hp->lattice.reverse_transform(a.position,a.transform);
00251       a.fixedPosition += msg->offset;
00252       a.position = hp->lattice.apply_transform(a.fixedPosition,a.transform);
00253     }
00254   }
00255   delete msg;
00256 }
00257 
00258 void PatchMgr::setLattice(SetLatticeMsg *msg) {
00259   // loop over homePatches, setting the lattice to the new value.
00260   for (HomePatchElem *elem = homePatches.begin(); elem != homePatches.end(); elem++) {
00261     HomePatch *hp = elem->patch;
00262     hp->lattice = msg->lattice;
00263   }
00264   // Must also do this for SimParameters in order for pressure profile to work!
00265   Node::Object()->simParameters->lattice = msg->lattice;
00266 }
00267 
00268 
00269 // initiating replica
00270 void PatchMgr::sendCheckpointReq(int pid, int remote, const char *key, int task) {
00271   CheckpointAtomsReqMsg *msg = new (1+strlen(key),0) CheckpointAtomsReqMsg;
00272   msg->pid = pid;
00273   msg->pe = CkMyPe();
00274   msg->replica = CmiMyPartition();
00275   msg->task = task;
00276   strcpy(msg->key,key);
00277   envelope *env = UsrToEnv(CheckpointAtomsReqMsg::pack(msg));
00278   CmiSetHandler(env,recvCheckpointReq_index);
00279 #if CMK_HAS_PARTITION
00280   CmiInterSyncSendAndFree(CkMyPe(),remote,env->getTotalsize(),(char*)env);
00281 #else
00282   CmiSyncSendAndFree(CkMyPe(),env->getTotalsize(),(char*)env);
00283 #endif
00284 }
00285 
00286 // responding replica
00287 extern "C" {
00288   void recvCheckpointReq_handler(envelope *env) {
00289     PatchMgr::Object()->recvCheckpointReq(CheckpointAtomsReqMsg::unpack(EnvToUsr(env)));
00290   }
00291 }
00292 
00293 // responding replica
00294 void PatchMgr::recvCheckpointReq(CheckpointAtomsReqMsg *msg) {
00295   int patchnode = patchMap->node(msg->pid);
00296   if ( CkMyPe() != patchnode ) {
00297     thisProxy[patchnode].recvCheckpointReq(msg);
00298   } else {
00299     HomePatch *hp = patchMap->homePatch(msg->pid);
00300     if ( ! hp ) NAMD_bug("null HomePatch pointer in PatchMgr::recvCheckpointReq");
00301     hp->recvCheckpointReq(msg->task, msg->key, msg->replica, msg->pe);
00302     delete msg;
00303   }
00304 }
00305 
00306 
00307 // responding replica
00308 void PatchMgr::sendCheckpointLoad(CheckpointAtomsMsg *msg, int dst, int dstpe) {
00309   envelope *env = UsrToEnv(CheckpointAtomsMsg::pack(msg));
00310   CmiSetHandler(env,recvCheckpointLoad_index);
00311 #if CMK_HAS_PARTITION
00312   CmiInterSyncSendAndFree(dstpe,dst,env->getTotalsize(),(char*)env);
00313 #else
00314   CmiSyncSendAndFree(dstpe,env->getTotalsize(),(char*)env);
00315 #endif
00316 }
00317 
00318 // initiating replica
00319 extern "C" {
00320   void recvCheckpointLoad_handler(envelope *env) {
00321     PatchMgr::Object()->recvCheckpointLoad(CheckpointAtomsMsg::unpack(EnvToUsr(env)));
00322   }
00323 }
00324 
00325 // initiating replica
00326 void PatchMgr::recvCheckpointLoad(CheckpointAtomsMsg *msg) {
00327   HomePatch *hp = patchMap->homePatch(msg->pid);
00328   hp->recvCheckpointLoad(msg);
00329 }
00330 
00331 
00332 // initiating replica
00333 void PatchMgr::sendCheckpointStore(CheckpointAtomsMsg *msg, int dst, int dstpe) {
00334   envelope *env = UsrToEnv(CheckpointAtomsMsg::pack(msg));
00335   CmiSetHandler(env,recvCheckpointStore_index);
00336 #if CMK_HAS_PARTITION
00337   CmiInterSyncSendAndFree(dstpe,dst,env->getTotalsize(),(char*)env);
00338 #else
00339   CmiSyncSendAndFree(dstpe,env->getTotalsize(),(char*)env);
00340 #endif
00341 }
00342 
00343 // responding replica
00344 extern "C" {
00345   void recvCheckpointStore_handler(envelope *env) {
00346     PatchMgr::Object()->recvCheckpointStore(CheckpointAtomsMsg::unpack(EnvToUsr(env)));
00347   }
00348 }
00349 
00350 // responding replica
00351 void PatchMgr::recvCheckpointStore(CheckpointAtomsMsg *msg) {
00352   HomePatch *hp = patchMap->homePatch(msg->pid);
00353   hp->recvCheckpointStore(msg);
00354 }
00355 
00356 
00357 // responding replica
00358 void PatchMgr::sendCheckpointAck(int pid, int dst, int dstpe) {
00359   CheckpointAtomsReqMsg *msg = new CheckpointAtomsReqMsg;
00360   msg->pid = pid;
00361   envelope *env = UsrToEnv(CheckpointAtomsReqMsg::pack(msg));
00362   CmiSetHandler(env,recvCheckpointAck_index);
00363 #if CMK_HAS_PARTITION
00364   CmiInterSyncSendAndFree(dstpe,dst,env->getTotalsize(),(char*)env);
00365 #else
00366   CmiSyncSendAndFree(dstpe,env->getTotalsize(),(char*)env);
00367 #endif
00368 }
00369 
00370 // initiating replica
00371 extern "C" {
00372   void recvCheckpointAck_handler(envelope *env) {
00373     PatchMgr::Object()->recvCheckpointAck(CheckpointAtomsReqMsg::unpack(EnvToUsr(env)));
00374   }
00375 }
00376 
00377 // initiating replica
00378 void PatchMgr::recvCheckpointAck(CheckpointAtomsReqMsg *msg) {
00379   HomePatch *hp = patchMap->homePatch(msg->pid);
00380   if ( ! hp ) NAMD_bug("null HomePatch pointer in PatchMgr::recvCheckpointAck");
00381   hp->recvCheckpointAck();
00382   delete msg;
00383 }
00384 
00385 
00386 void PatchMgr::sendExchangeReq(int pid, int src) {
00387   ExchangeAtomsReqMsg *msg = new ExchangeAtomsReqMsg;
00388   msg->pid = pid;
00389   msg->dstpe = CkMyPe();
00390   envelope *env = UsrToEnv(ExchangeAtomsReqMsg::pack(msg));
00391   CmiSetHandler(env,recvExchangeReq_index);
00392 #if CMK_HAS_PARTITION
00393   CmiInterSyncSendAndFree(CkMyPe(),src,env->getTotalsize(),(char*)env);
00394 #else
00395   CmiSyncSendAndFree(CkMyPe(),env->getTotalsize(),(char*)env);
00396 #endif
00397 }
00398 
00399 extern "C" {
00400   void recvExchangeReq_handler(envelope *env) {
00401     PatchMgr::Object()->recvExchangeReq(ExchangeAtomsReqMsg::unpack(EnvToUsr(env)));
00402   }
00403 }
00404 
00405 void PatchMgr::recvExchangeReq(ExchangeAtomsReqMsg *msg) {
00406   int patchnode = patchMap->node(msg->pid);
00407   if ( CkMyPe() != patchnode ) {
00408     thisProxy[patchnode].recvExchangeReq(msg);
00409   } else {
00410     HomePatch *hp = patchMap->homePatch(msg->pid);
00411     if ( ! hp ) NAMD_bug("null HomePatch pointer in PatchMgr::recvExchangeReq");
00412     hp->recvExchangeReq(msg->dstpe);
00413     delete msg;
00414   }
00415 }
00416 
00417 void PatchMgr::sendExchangeMsg(ExchangeAtomsMsg *msg, int dst, int dstpe) {
00418   envelope *env = UsrToEnv(ExchangeAtomsMsg::pack(msg));
00419   CmiSetHandler(env,recvExchangeMsg_index);
00420 #if CMK_HAS_PARTITION
00421   CmiInterSyncSendAndFree(dstpe,dst,env->getTotalsize(),(char*)env);
00422 #else
00423   CmiSyncSendAndFree(dstpe,env->getTotalsize(),(char*)env);
00424 #endif
00425 }
00426 
00427 extern "C" {
00428   void recvExchangeMsg_handler(envelope *env) {
00429     PatchMgr::Object()->recvExchangeMsg(ExchangeAtomsMsg::unpack(EnvToUsr(env)));
00430   }
00431 }
00432 
00433 void PatchMgr::recvExchangeMsg(ExchangeAtomsMsg *msg) {
00434   HomePatch *hp = patchMap->homePatch(msg->pid);
00435   hp->recvExchangeMsg(msg);
00436 }
00437 
00438 PACK_MSG(MovePatchesMsg,
00439   PACK(fromNodeID);
00440   PACK(pid);
00441   PACK_RESIZE(atom);
00442 )
00443 
00444 
00445 #include "PatchMgr.def.h"
00446 

Generated on Fri Jun 22 01:17:15 2018 for NAMD by  doxygen 1.4.7