Main Page | Namespace List | Class Hierarchy | Alphabetical List | Class List | File List | Class Members | File Members

ParallelIOMgr.h

Go to the documentation of this file.
00001 #ifndef PARALLELIOMGR_H
00002 #define PARALLELIOMGR_H
00003 
00004 #include "ProcessorPrivate.h"
00005 #include "charm++.h"
00006 #include "BOCgroup.h"
00007 #include "common.h"
00008 #include "CompressPsf.h"
00009 #include "Hydrogen.h"
00010 #include "Vector.h"
00011 #include "NamdState.h"
00012 #include "Node.decl.h"
00013 #include "PatchMgr.h"
00014 #include "UniqueSet.h"
00015 #include "UniqueSetIter.h"
00016 #include "Molecule.h"
00017 
00018 #define COLLECT_PERFORMANCE_DATA 0
00019 
00020 #ifdef MEM_OPT_VERSION
00021 class CollectionMgr;
00022 class CollectionMaster;
00023 class CollectionMidMaster;
00024 class CollectMidVectorInstance;
00025 #endif
00026 
00027 class CollectVectorVarMsg;
00028 class PatchMap;
00029 
00030 #include "ParallelIOMgr.decl.h"
00031 
00033 class MolInfoMsg : public CMessage_MolInfoMsg
00034 {
00035 public:
00036     int numBonds;
00037     int numCalcBonds;
00038     int numAngles;
00039     int numCalcAngles;
00040     int numDihedrals;
00041     int numCalcDihedrals;
00042     int numImpropers;
00043     int numCalcImpropers;
00044     int numCrossterms;
00045     int numCalcCrossterms;
00046     int numExclusions;
00047     int numCalcExclusions;
00048     
00049     int numRigidBonds;
00050 
00051     //used for "comMov" is false in the SimParameter object
00052     //which is usually true -Chao Mei
00053     BigReal totalMass;
00054     Vector totalMV;
00055 
00056     BigReal totalCharge;
00057 };
00058 
00059 class HydroBasedMsg : public CMessage_HydroBasedMsg
00060 {
00061 //Info inside this message needs to be calculated when the hydrogen
00062 //group is constructed
00063 public:
00064     int numFixedRigidBonds;
00065     int numFixedGroups;
00066 };
00067 
00068 class MoveInputAtomsMsg : public CMessage_MoveInputAtomsMsg
00069 {
00070 public:
00071     int length;
00072     InputAtom *atomList;
00073 };
00074 
00075 class AtomsCntPerPatchMsg: public CMessage_AtomsCntPerPatchMsg
00076 {
00077 public:
00078     //one-to-one mapping between two lists
00079     int length;
00080     PatchID *pidList;
00081     unsigned short *atomsCntList;
00082     unsigned short *fixedAtomsCntList;
00083 };
00084 
00085 class MovePatchAtomsMsg: public CMessage_MovePatchAtomsMsg
00086 {
00087 public:
00088     //the total size of allAtoms could be calculated from sizeList
00089     int from;
00090     int patchCnt;
00091     PatchID *pidList;
00092     int *sizeList;
00093     FullAtom *allAtoms;
00094 };
00096 
00098 struct ClusterElem {
00099     int clusterId;
00100     int atomsCnt; 
00101 
00102     ClusterElem() : clusterId(-1), atomsCnt(0) {}
00103     ClusterElem(int cid) : clusterId(cid), atomsCnt(0) {}
00104     int hash() const {
00105         return clusterId;
00106     }
00107     int operator==(const ClusterElem &a) const {
00108         return clusterId == a.clusterId;
00109     }
00110 };
00111 typedef UniqueSet<ClusterElem> ClusterSet;
00112 typedef UniqueSetIter<ClusterElem> ClusterSetIter;
00113 
00114 class ClusterSizeMsg : public CMessage_ClusterSizeMsg
00115 {
00116 public:
00117     int srcRank;
00118     int clusterId;
00119     int atomsCnt;
00120 };
00121 typedef ResizeArray<ClusterSizeMsg *> ClusterSizeMsgBuffer;
00122 
00123 struct ClusterCoorElem{
00124     int clusterId;    
00125     Vector dsum;
00126 
00127     ClusterCoorElem(): clusterId(-1), dsum(0.0) {}
00128     ClusterCoorElem(int cid): clusterId(cid), dsum(0.0) {}
00129     int hash() const {
00130         return clusterId;
00131     }
00132     int operator==(const ClusterCoorElem &a) const {
00133         return clusterId == a.clusterId;
00134     }
00135 };
00136 typedef UniqueSet<ClusterCoorElem> ClusterCoorSet;
00137 typedef UniqueSetIter<ClusterCoorElem> ClusterCoorSetIter;
00138 
00139 class ClusterCoorMsg : public CMessage_ClusterCoorMsg
00140 {
00141 public:
00142     int srcRank;
00143     int clusterId;    
00144     Vector dsum;    
00145 };
00146 typedef ResizeArray<ClusterCoorMsg *> ClusterCoorMsgBuffer;
00148 
00149 class ParallelIOMgr : public BOCclass
00150 {
00151 #ifdef MEM_OPT_VERSION
00152     friend class CollectionMgr;
00153     friend class CollectionMaster;
00154     friend class CollectionMidMaster;
00155     friend class CollectMidVectorInstance;
00156 #endif
00157 
00158 private:
00159     SimParameters *simParameters;
00160     Molecule *molecule;
00161 
00163     int numInputProcs;
00164     int *inputProcArray;
00165     //the index to the inputProcArray i.e.
00166     //inputProcArray[myInputRank] == CkMyPe();
00167     //if it is not a input proc, the rank is -1;
00168     int myInputRank;
00169 
00170     //Initially this atom list contains the initially assigned
00171     //atoms, later it will contain atoms from other input processors
00172     //based on the migration group
00173     InputAtomList initAtoms;
00174 
00175     //This atom list contains the migrated atoms from other input
00176     //processors based on the migration group. Once the migration
00177     //finishes, atoms in this list is added to initAtoms, and its
00178     //space is freed.
00179     InputAtomList tmpRecvAtoms;
00180 
00181 
00182     //This variable indicates whether this processor is ready
00183     //to receive atoms of HomePatches to be created on this
00184     //processor
00185     bool isOKToRecvHPAtoms;
00186     FullAtomList *hpAtomsList;
00187     ResizeArray<int> hpIDList; //in increasing order
00188 
00189     //tmp variables
00190     int procsReceived; //used at updateMolInfo and recvAtomsCntPerPatch
00191     int hydroMsgRecved; //used at recvHydroBasedCounter
00192     Vector totalMV; //used to remove center of mass motion
00193     BigReal totalMass; //used to remove center of mass motion
00194     BigReal totalCharge;
00196 
00198     int numOutputProcs;
00199     int *outputProcArray;
00200     //the index to the outputProcArray i.e.
00201     //outputProcArray[myOutputRank] == CkMyPe();
00202     //if it is not a output proc, the rank is -1;
00203     int myOutputRank;
00204     //the number of simutaneous writers 
00205     //output procs with rank distance of numOutputProcs/numOutputWrts do the
00206     //output at a time
00207     int numOutputWrts;
00208 
00209     //both arrays are of size #local atoms on this output proc
00210     int *clusterID;
00211     int *clusterSize;
00212     //record the number of atoms that a remote cluster has on this
00213     //output processor
00214     ClusterSet remoteClusters;
00215     //record the number of clusters that have atoms on other output procs
00216     //on this output proc. Should be remoteClusters.size();
00217     int numRemoteClusters;
00218     //TEMP var to indicate how many msgs from remote proc have been recved
00219     //for updating cluster sizes in my local repository (linked with 
00220     //numRemoteClusters.
00221     int numCSMAck;
00222 
00223     ClusterSizeMsgBuffer csmBuf; //used to buffer cluster size msgs
00224     //record the number of remote cluster info queries for this output proc.
00225     //i.e. SUM(for a particular cluster on this local output proc, 
00226     //the number of remote output procs that has some atoms belonging 
00227     //to this cluster). Should be csmBuf.size();  
00228     int numRemoteReqs;
00229     //TEMP var to record the number of remote cluster info queries that
00230     //has received (linked with numRemoteReqs)
00231     int numReqRecved;
00232 
00233     //used to store the caculated centralized coordinates for each cluster
00234     //on this local output proc.
00235     ClusterCoorSet remoteCoors; //similar to remoteClusters
00236     ClusterCoorMsgBuffer ccmBuf; //similar to csmBuf but for cluster coor msgs
00237     Position *tmpCoorCon;     
00238     //the array is of size #local atoms on this output proc
00239     char *isWater;
00240 
00241 #ifdef MEM_OPT_VERSION
00242     CollectMidVectorInstance *coorInstance;
00243     CollectionMidMaster *midCM;
00244 #endif
00245 
00246     CkChareID mainMaster;
00247 
00249 
00250 #if COLLECT_PERFORMANCE_DATA
00251     int numFixedAtomLookup;
00252 #endif    
00253 
00254 private:
00255     void readCoordinatesAndVelocity();
00256     //create atom lists that are used for creating home patch
00257     void prepareHomePatchAtomList();
00258     //returns the index in hpIDList which points to pid
00259     int binaryFindHPID(int pid);
00260 
00261     void readInfoForParOutput();
00262 
00263     void integrateClusterCoor();
00264 
00265     int numMyAtoms(int rank, int numProcs);
00266     //returns the number of atoms INITIALLY assigned on this input processor
00267     inline int numInitMyAtomsOnInput() {
00268         return numMyAtoms(myInputRank, numInputProcs);
00269     }
00270     inline int numMyAtomsOnOutput() {
00271         return numMyAtoms(myOutputRank, numOutputProcs);
00272     }
00273 
00274     int atomRank(int atomID, int numProcs);
00275     //returns the rank of the input proc that the atom resides on INITIALLY
00276     inline int atomInitRankOnInput(int atomID) {
00277         return atomRank(atomID, numInputProcs);
00278     }
00279     inline int atomRankOnOutput(int atomID) {
00280         return atomRank(atomID, numOutputProcs);
00281     }
00282 
00283     void getMyAtomsRange(int &lowerIdx, int &upperIdx, int rank, int numProcs);
00284     //get the range of atoms to be read based on the initial distribution
00285     //i.e. atoms from [lowerIdx ... upperIdx] are going to be loaded
00286     inline void getMyAtomsInitRangeOnInput(int &lowerIdx, int &upperIdx) {
00287         return getMyAtomsRange(lowerIdx,upperIdx,myInputRank,numInputProcs);
00288     }
00289     inline void getMyAtomsRangeOnOutput(int &lowerIdx, int &upperIdx) {
00290         return getMyAtomsRange(lowerIdx,upperIdx,myOutputRank,numOutputProcs);
00291     }
00292     inline void getAtomsRangeOnOutput(int &lowerIdx, int &upperIdx, int rank) {
00293         return getMyAtomsRange(lowerIdx,upperIdx,rank,numOutputProcs);
00294     }
00295 
00296     //This function is only valid for the inital distribution of input atoms
00297     //mySAId: the atom id this input proc starts with
00298     //regAId: the atom id to look up
00299     inline int isAtomFixed(int mySAId, int reqAId){
00300         int localIdx = reqAId-mySAId;
00301         if(localIdx>=0 && localIdx<initAtoms.size()){
00302             //atom "thisAId" is on this input proc now!
00303             return initAtoms[localIdx].atomFixed;            
00304         } else{
00305         #if COLLECT_PERFORMANCE_DATA
00306             numFixedAtomLookup++;
00307         #endif
00308             //atom "thisAId" is NOT on this input proc now!
00309             return molecule->is_atom_fixed(reqAId);
00310         }
00311     }   
00312     
00313     //This function returns the highest rank of this output group procs
00314     inline int getMyOutputGroupHighestRank(){
00315         int step = numOutputProcs/numOutputWrts;
00316         int remains = numOutputProcs%numOutputWrts;
00317         //so "remains" output groups contain "step+1" output procs;
00318         //"numOutputWrts-remains" output groups contain "step" output procs;
00319         int limit = (step+1)*remains;
00320         if(myOutputRank<limit){
00321             int idx = myOutputRank/(step+1);
00322             return (idx+1)*(step+1)-1;
00323         }else{
00324             int idx = (myOutputRank-limit)/step;
00325             return limit+(idx+1)*step-1;
00326         }
00327     }
00328 public:
00329     ParallelIOMgr();
00330     ~ParallelIOMgr();
00331 
00332     void initialize(Node *node);
00333 
00334     //read per-atom files including the binary compressed psf
00335     //file, coordinate and velocity files
00336     void readPerAtomInfo();
00337 
00338     //migrate the initally assigned atoms to appropriate input processors
00339     //based on the migration group
00340     void migrateAtomsMGrp();
00341     void recvAtomsMGrp(MoveInputAtomsMsg *msg);
00342     void integrateMigratedAtoms();
00343 
00344     //Reduce counters for Tuples and Exclusions in Molecule globally
00345     void updateMolInfo();
00346     void recvMolInfo(MolInfoMsg *msg);
00347     void bcastMolInfo(MolInfoMsg *msg);
00348     void recvHydroBasedCounter(HydroBasedMsg *msg);
00349     void bcastHydroBasedCounter(HydroBasedMsg *msg);
00350 
00351     //calculate #atoms in each patch and reduce to proc 0
00352     void calcAtomsInEachPatch();
00353     void recvAtomsCntPerPatch(AtomsCntPerPatchMsg *msg);
00354 
00355     //distribute atoms to their homepatch processors
00356     CthThread sendAtomsThread;
00357     void sendAtomsToHomePatchProcs();
00358     int numAcksOutstanding;
00359     void ackAtomsToHomePatchProcs();
00360     void recvAtomsToHomePatchProcs(MovePatchAtomsMsg *msg);
00361 
00362     //create home patches on this processor
00363     void createHomePatches();
00364 
00365     //free the space occupied by atoms' names etc.
00366     void freeMolSpace();
00367 
00368     //used in parallel IO output
00369     int getNumOutputProcs() { return numOutputProcs; }
00370 
00371     void recvClusterSize(ClusterSizeMsg *msg);
00372     void integrateClusterSize();
00373     void recvFinalClusterSize(ClusterSizeMsg *msg);
00374 
00375     void receivePositions(CollectVectorVarMsg *msg);
00376     void receiveVelocities(CollectVectorVarMsg *msg);
00377     void receiveForces(CollectVectorVarMsg *msg);
00378     void disposePositions(int seq, double prevT);
00379     void disposeVelocities(int seq, double prevT);
00380     void disposeForces(int seq, double prevT);
00381 
00382     void wrapCoor(int seq, Lattice lat);
00383     void recvClusterCoor(ClusterCoorMsg *msg);
00384     void recvFinalClusterCoor(ClusterCoorMsg *msg);
00385 };
00386 
00387 #endif

Generated on Fri May 25 04:07:16 2012 for NAMD by  doxygen 1.3.9.1