PatchMgr.C

Go to the documentation of this file.
00001 
00007 #include "InfoStream.h"
00008 #include "PatchMgr.decl.h"
00009 #include "PatchMgr.h"
00010 
00011 #include "NamdTypes.h"
00012 //#include "Compute.h"
00013 #include "HomePatch.h"
00014 #include "PatchMap.h"
00015 #include "AtomMap.h"
00016 
00017 #include "ComputeMsmMsa.h"  // needed for MsmMsaData definition
00018 #include "main.decl.h"
00019 #include "main.h"
00020 
00021 #include "WorkDistrib.decl.h"
00022 #include "WorkDistrib.h"
00023 #include "Node.h"
00024 #include "SimParameters.h"
00025 
00026 #include "packmsg.h"
00027 
00028 // #define DEBUGM
00029 #define MIN_DEBUG_LEVEL 3
00030 #include "Debug.h"
00031 
00032 
00033 // BOC constructor
00034 PatchMgr::PatchMgr()
00035 {
00036     // CkPrintf("[%d] PatchMgr Created\n", CkMyPe());
00037 
00038     // Singleton pattern
00039     if (CkpvAccess(PatchMgr_instance) == NULL) {
00040         CkpvAccess(PatchMgr_instance) = this;
00041     } else {
00042         NAMD_bug("PatchMgr instanced twice on same processor!");
00043     }
00044 
00045     // Get PatchMap singleton started
00046     patchMap = PatchMap::Instance();
00047     patchMap->registerPatchMgr(this);
00048 
00049     recvCheckpointReq_index = CmiRegisterHandler((CmiHandler)recvCheckpointReq_handler);
00050     recvCheckpointLoad_index = CmiRegisterHandler((CmiHandler)recvCheckpointLoad_handler);
00051     recvCheckpointStore_index = CmiRegisterHandler((CmiHandler)recvCheckpointStore_handler);
00052     recvCheckpointAck_index = CmiRegisterHandler((CmiHandler)recvCheckpointAck_handler);
00053 
00054     recvExchangeReq_index = CmiRegisterHandler((CmiHandler)recvExchangeReq_handler);
00055     recvExchangeMsg_index = CmiRegisterHandler((CmiHandler)recvExchangeMsg_handler);
00056 
00057     // Message combining initialization
00058     migrationCountdown = 0;
00059     combineMigrationMsgs = new MigrateAtomsCombinedMsg*[CkNumPes()];
00060     int numPes = CkNumPes();
00061     for ( int i = 0; i < numPes; ++i ) combineMigrationMsgs[i] = 0;
00062 }
00063 
00064 PatchMgr::~PatchMgr()
00065 {
00066     HomePatchListIter hi(homePatches);
00067     for ( hi = hi.begin(); hi != hi.end(); hi++) {
00068       HomePatchElem* elem = homePatches.find(HomePatchElem(hi->pid));
00069       delete elem->patch;
00070     }
00071     delete [] combineMigrationMsgs;
00072 }
00073 
00074 void PatchMgr::preCreateHomePatch(PatchID pid, int atomCnt){
00075     HomePatch *patch = new HomePatch(pid, atomCnt);
00076     homePatches.load(HomePatchElem(pid, patch));
00077     patchMap->registerPatch(pid, patch);
00078 }
00079 
00080 void PatchMgr::createHomePatch(PatchID pid, FullAtomList &a) 
00081 {
00082     HomePatch *patch = new HomePatch(pid, a);
00083     homePatches.load(HomePatchElem(pid, patch));
00084     patchMap->registerPatch(pid, patch);
00085 }
00086 
00087 
00088 // Add a HomePatch to a list of patches to be moved 
00089 // HomePatches are actually moved by invoking sendMovePatches() below
00090 void PatchMgr::movePatch(PatchID pid, NodeID nodeID) 
00091 {
00092     move.load(MovePatch(pid,nodeID));
00093 }
00094 
00095 void PatchMgr::sendOneHomePatch(int patchId, int nodeId){
00096     HomePatch *p = homePatch(patchId);
00097     patchMap->unregisterPatch(patchId, p);
00098 
00099     MovePatchesMsg *msg = new MovePatchesMsg(patchId, p->atom);
00100 
00101     // Deleting the HomePatchElem will call a destructor for clean up
00102     // but the msg elements are safe since they use a container template
00103     // that uses ref counting.
00104     delete p;
00105     homePatches.del(HomePatchElem(patchId)); 
00106 
00107     if ( msg->atom.shared() ) NAMD_bug("shared message array in PatchMgr::sendOneHomePatch");
00108 
00109     // Sending to PatchMgr::recvMovePatches on remote node
00110     CProxy_PatchMgr cp(thisgroup);
00111     cp[nodeId].recvMovePatches(msg);
00112 }
00113 
00114 // Uses list constructed by movePatch() and dispatches
00115 // HomePatch(es) to new nodes
00116 void PatchMgr::sendMovePatches() 
00117 {
00118     if (! move.size())
00119         return;
00120 
00121     MovePatchListIter m(move);
00122     for ( m = m.begin(); m != m.end(); m++) {
00123       HomePatch *p = homePatch(m->pid);
00124       patchMap->unregisterPatch(m->pid, p);
00125 
00126       MovePatchesMsg *msg = new MovePatchesMsg(m->pid, p->atom);
00127 
00128       // Deleting the HomePatchElem will call a destructor for clean up
00129       // but the msg elements are safe since they use a container template
00130       // that uses ref counting.
00131       delete p;
00132       homePatches.del(HomePatchElem(m->pid)); 
00133 
00134       if ( msg->atom.shared() ) NAMD_bug("shared message array in PatchMgr::sendMovePatches");
00135 
00136       // Sending to PatchMgr::recvMovePatches on remote node
00137       CProxy_PatchMgr cp(thisgroup);
00138       cp[m->nodeID].recvMovePatches(msg);
00139     }
00140     move.resize(0);
00141 }
00142 
00143 void PatchMgr::recvMovePatches(MovePatchesMsg *msg) {
00144     // Make a new HomePatch
00145     createHomePatch(msg->pid, msg->atom);
00146     delete msg;
00147 
00148     // Tell sending PatchMgr we received MovePatchMsg
00149 //    AckMovePatchesMsg *ackmsg = 
00150 //      new AckMovePatchesMsg;
00151 //    CSendMsgBranch(PatchMgr,ackMovePatches, ackmsg, thisgroup, msg->fromNodeID);
00152 }
00153     
00154 
00155 //void PatchMgr::ackMovePatches(AckMovePatchesMsg *msg)
00156 //{
00157 //    delete msg;
00158 //    if (! --ackMovePending) 
00159 //      WorkDistrib::messageMovePatchDone();
00160 //}
00161 
00162 
00163 void PatchMgr::sendAtoms(PatchID pid, FullAtomList &a) {
00164 
00165       MovePatchesMsg *msg = new MovePatchesMsg(pid, a);
00166 
00167       if ( msg->atom.shared() ) NAMD_bug("shared message array in PatchMgr::sendAtoms");
00168 
00169       CProxy_PatchMgr cp(thisgroup);
00170       cp[patchMap->node(pid)].recvAtoms(msg);
00171 
00172 }
00173 
00174 void PatchMgr::recvAtoms(MovePatchesMsg *msg) {
00175     patchMap->homePatch(msg->pid)->reinitAtoms(msg->atom);
00176     delete msg;
00177 }
00178 
00179 // Called by HomePatch to migrate atoms off to new patches
00180 // Message combining occurs here
00181 void PatchMgr::sendMigrationMsgs(PatchID src, MigrationInfo *m, int numMsgs) {
00182 /*
00183   for (int i=0; i < numMsgs; i++) {
00184     PatchMgr::Object()->sendMigrationMsg(src, m[i]);
00185   }
00186 */
00187   if ( ! migrationCountdown )  // (re)initialize
00188   {
00189     // DebugM(3,"migrationCountdown (re)initialize\n");
00190     numHomePatches = patchMap->numHomePatches();
00191     migrationCountdown = numHomePatches;
00192     combineMigrationDestPes.resize(0);
00193   }
00194   for (int i=0; i < numMsgs; i++) {  // buffer messages
00195     int destNodeID = m[i].destNodeID;
00196     if ( 1 ) // destNodeID != CkMyPe() )
00197     {
00198       if ( ! combineMigrationMsgs[destNodeID] )
00199       {
00200         combineMigrationMsgs[destNodeID] = new MigrateAtomsCombinedMsg();
00201         combineMigrationDestPes.add(destNodeID);
00202       }
00203       combineMigrationMsgs[destNodeID]->add(src,m[i].destPatchID,m[i].mList);
00204     }
00205     else
00206     {
00207         // for now buffer local messages too
00208     }
00209   }
00210   migrationCountdown -= 1;
00211   // DebugM(3,"migrationCountdown = " << migrationCountdown << "\n");
00212   if ( ! migrationCountdown )  // send out combined messages
00213   {
00214     int n = combineMigrationDestPes.size();
00215     for ( int i = 0; i < n; ++i ) {
00216         int destNodeID = combineMigrationDestPes[i];
00217         DebugM(3,"Sending MigrateAtomsCombinedMsg to node " << destNodeID << "\n");
00218         CProxy_PatchMgr cp(thisgroup);
00219         cp[destNodeID].recvMigrateAtomsCombined(combineMigrationMsgs[destNodeID]);
00220         combineMigrationMsgs[destNodeID] = 0;
00221     }
00222   }
00223 }
00224 
00225 void PatchMgr::recvMigrateAtomsCombined (MigrateAtomsCombinedMsg *msg)
00226 {
00227   DebugM(3,"Received MigrateAtomsCombinedMsg with " << msg->srcPatchID.size() << " messages.\n");
00228   msg->distribute();
00229   delete msg;
00230 }
00231 
00232 void PatchMgr::moveAtom(MoveAtomMsg *msg) {
00233   LocalID lid = AtomMap::Object()->localID(msg->atomid);
00234   if ( lid.pid != notUsed ) {
00235     HomePatch *hp = patchMap->homePatch(lid.pid);
00236     if ( hp ) {
00237       FullAtom &a = hp->atom[lid.index];
00238       if ( msg->moveto ) {
00239         a.fixedPosition = msg->coord;
00240       } else {
00241         a.fixedPosition = hp->lattice.reverse_transform(a.position,a.transform);
00242         a.fixedPosition += msg->coord;
00243       }
00244       a.position = hp->lattice.apply_transform(a.fixedPosition,a.transform);
00245     }
00246   }
00247   delete msg;
00248 }
00249 
00250 void PatchMgr::moveAllBy(MoveAllByMsg *msg) {
00251   // loop over homePatches, moving every atom
00252   for (HomePatchElem *elem = homePatches.begin(); elem != homePatches.end(); elem++) {
00253     HomePatch *hp = elem->patch;
00254     for (int i=0; i<hp->getNumAtoms(); i++) {
00255       FullAtom &a = hp->atom[i];
00256       a.fixedPosition = hp->lattice.reverse_transform(a.position,a.transform);
00257       a.fixedPosition += msg->offset;
00258       a.position = hp->lattice.apply_transform(a.fixedPosition,a.transform);
00259     }
00260   }
00261   delete msg;
00262 }
00263 
00264 void PatchMgr::setLattice(SetLatticeMsg *msg) {
00265   // loop over homePatches, setting the lattice to the new value.
00266   for (HomePatchElem *elem = homePatches.begin(); elem != homePatches.end(); elem++) {
00267     HomePatch *hp = elem->patch;
00268     hp->lattice = msg->lattice;
00269   }
00270   // Must also do this for SimParameters in order for pressure profile to work!
00271   Node::Object()->simParameters->lattice = msg->lattice;
00272 }
00273 
00274 
00275 // initiating replica
00276 void PatchMgr::sendCheckpointReq(int pid, int remote, const char *key, int task) {
00277   CheckpointAtomsReqMsg *msg = new (1+strlen(key),0) CheckpointAtomsReqMsg;
00278   msg->pid = pid;
00279   msg->pe = CkMyPe();
00280   msg->replica = CmiMyPartition();
00281   msg->task = task;
00282   strcpy(msg->key,key);
00283   envelope *env = UsrToEnv(CheckpointAtomsReqMsg::pack(msg));
00284   CmiSetHandler(env,recvCheckpointReq_index);
00285 #if CMK_HAS_PARTITION
00286   CmiInterSyncSendAndFree(CkMyPe(),remote,env->getTotalsize(),(char*)env);
00287 #else
00288   CmiSyncSendAndFree(CkMyPe(),env->getTotalsize(),(char*)env);
00289 #endif
00290 }
00291 
00292 // responding replica
00293 extern "C" {
00294   void recvCheckpointReq_handler(envelope *env) {
00295     PatchMgr::Object()->recvCheckpointReq(CheckpointAtomsReqMsg::unpack(EnvToUsr(env)));
00296   }
00297 }
00298 
00299 // responding replica
00300 void PatchMgr::recvCheckpointReq(CheckpointAtomsReqMsg *msg) {
00301   int patchnode = patchMap->node(msg->pid);
00302   if ( CkMyPe() != patchnode ) {
00303     thisProxy[patchnode].recvCheckpointReq(msg);
00304   } else {
00305     HomePatch *hp = patchMap->homePatch(msg->pid);
00306     if ( ! hp ) NAMD_bug("null HomePatch pointer in PatchMgr::recvCheckpointReq");
00307     hp->recvCheckpointReq(msg->task, msg->key, msg->replica, msg->pe);
00308     delete msg;
00309   }
00310 }
00311 
00312 
00313 // responding replica
00314 void PatchMgr::sendCheckpointLoad(CheckpointAtomsMsg *msg, int dst, int dstpe) {
00315   envelope *env = UsrToEnv(CheckpointAtomsMsg::pack(msg));
00316   CmiSetHandler(env,recvCheckpointLoad_index);
00317 #if CMK_HAS_PARTITION
00318   CmiInterSyncSendAndFree(dstpe,dst,env->getTotalsize(),(char*)env);
00319 #else
00320   CmiSyncSendAndFree(dstpe,env->getTotalsize(),(char*)env);
00321 #endif
00322 }
00323 
00324 // initiating replica
00325 extern "C" {
00326   void recvCheckpointLoad_handler(envelope *env) {
00327     PatchMgr::Object()->recvCheckpointLoad(CheckpointAtomsMsg::unpack(EnvToUsr(env)));
00328   }
00329 }
00330 
00331 // initiating replica
00332 void PatchMgr::recvCheckpointLoad(CheckpointAtomsMsg *msg) {
00333   HomePatch *hp = patchMap->homePatch(msg->pid);
00334   hp->recvCheckpointLoad(msg);
00335 }
00336 
00337 
00338 // initiating replica
00339 void PatchMgr::sendCheckpointStore(CheckpointAtomsMsg *msg, int dst, int dstpe) {
00340   envelope *env = UsrToEnv(CheckpointAtomsMsg::pack(msg));
00341   CmiSetHandler(env,recvCheckpointStore_index);
00342 #if CMK_HAS_PARTITION
00343   CmiInterSyncSendAndFree(dstpe,dst,env->getTotalsize(),(char*)env);
00344 #else
00345   CmiSyncSendAndFree(dstpe,env->getTotalsize(),(char*)env);
00346 #endif
00347 }
00348 
00349 // responding replica
00350 extern "C" {
00351   void recvCheckpointStore_handler(envelope *env) {
00352     PatchMgr::Object()->recvCheckpointStore(CheckpointAtomsMsg::unpack(EnvToUsr(env)));
00353   }
00354 }
00355 
00356 // responding replica
00357 void PatchMgr::recvCheckpointStore(CheckpointAtomsMsg *msg) {
00358   HomePatch *hp = patchMap->homePatch(msg->pid);
00359   hp->recvCheckpointStore(msg);
00360 }
00361 
00362 
00363 // responding replica
00364 void PatchMgr::sendCheckpointAck(int pid, int dst, int dstpe) {
00365   CheckpointAtomsReqMsg *msg = new CheckpointAtomsReqMsg;
00366   msg->pid = pid;
00367   envelope *env = UsrToEnv(CheckpointAtomsReqMsg::pack(msg));
00368   CmiSetHandler(env,recvCheckpointAck_index);
00369 #if CMK_HAS_PARTITION
00370   CmiInterSyncSendAndFree(dstpe,dst,env->getTotalsize(),(char*)env);
00371 #else
00372   CmiSyncSendAndFree(dstpe,env->getTotalsize(),(char*)env);
00373 #endif
00374 }
00375 
00376 // initiating replica
00377 extern "C" {
00378   void recvCheckpointAck_handler(envelope *env) {
00379     PatchMgr::Object()->recvCheckpointAck(CheckpointAtomsReqMsg::unpack(EnvToUsr(env)));
00380   }
00381 }
00382 
00383 // initiating replica
00384 void PatchMgr::recvCheckpointAck(CheckpointAtomsReqMsg *msg) {
00385   HomePatch *hp = patchMap->homePatch(msg->pid);
00386   if ( ! hp ) NAMD_bug("null HomePatch pointer in PatchMgr::recvCheckpointAck");
00387   hp->recvCheckpointAck();
00388   delete msg;
00389 }
00390 
00391 
00392 void PatchMgr::sendExchangeReq(int pid, int src) {
00393   ExchangeAtomsReqMsg *msg = new ExchangeAtomsReqMsg;
00394   msg->pid = pid;
00395   msg->dstpe = CkMyPe();
00396   envelope *env = UsrToEnv(ExchangeAtomsReqMsg::pack(msg));
00397   CmiSetHandler(env,recvExchangeReq_index);
00398 #if CMK_HAS_PARTITION
00399   CmiInterSyncSendAndFree(CkMyPe(),src,env->getTotalsize(),(char*)env);
00400 #else
00401   CmiSyncSendAndFree(CkMyPe(),env->getTotalsize(),(char*)env);
00402 #endif
00403 }
00404 
00405 extern "C" {
00406   void recvExchangeReq_handler(envelope *env) {
00407     PatchMgr::Object()->recvExchangeReq(ExchangeAtomsReqMsg::unpack(EnvToUsr(env)));
00408   }
00409 }
00410 
00411 void PatchMgr::recvExchangeReq(ExchangeAtomsReqMsg *msg) {
00412   int patchnode = patchMap->node(msg->pid);
00413   if ( CkMyPe() != patchnode ) {
00414     thisProxy[patchnode].recvExchangeReq(msg);
00415   } else {
00416     HomePatch *hp = patchMap->homePatch(msg->pid);
00417     if ( ! hp ) NAMD_bug("null HomePatch pointer in PatchMgr::recvExchangeReq");
00418     hp->recvExchangeReq(msg->dstpe);
00419     delete msg;
00420   }
00421 }
00422 
00423 void PatchMgr::sendExchangeMsg(ExchangeAtomsMsg *msg, int dst, int dstpe) {
00424   envelope *env = UsrToEnv(ExchangeAtomsMsg::pack(msg));
00425   CmiSetHandler(env,recvExchangeMsg_index);
00426 #if CMK_HAS_PARTITION
00427   CmiInterSyncSendAndFree(dstpe,dst,env->getTotalsize(),(char*)env);
00428 #else
00429   CmiSyncSendAndFree(dstpe,env->getTotalsize(),(char*)env);
00430 #endif
00431 }
00432 
00433 extern "C" {
00434   void recvExchangeMsg_handler(envelope *env) {
00435     PatchMgr::Object()->recvExchangeMsg(ExchangeAtomsMsg::unpack(EnvToUsr(env)));
00436   }
00437 }
00438 
00439 void PatchMgr::recvExchangeMsg(ExchangeAtomsMsg *msg) {
00440   HomePatch *hp = patchMap->homePatch(msg->pid);
00441   hp->recvExchangeMsg(msg);
00442 }
00443 
00444 PACK_MSG(MovePatchesMsg,
00445   PACK(fromNodeID);
00446   PACK(pid);
00447   PACK_RESIZE(atom);
00448 )
00449 
00450 
00451 #include "PatchMgr.def.h"
00452 

Generated on Tue Nov 21 01:17:14 2017 for NAMD by  doxygen 1.4.7