ParallelIOMgr.h

Go to the documentation of this file.
00001 #ifndef PARALLELIOMGR_H
00002 #define PARALLELIOMGR_H
00003 
00004 #include "ProcessorPrivate.h"
00005 #include "charm++.h"
00006 #include "BOCgroup.h"
00007 #include "common.h"
00008 #include "CompressPsf.h"
00009 #include "Hydrogen.h"
00010 #include "Vector.h"
00011 #include "NamdState.h"
00012 #include "Node.decl.h"
00013 #include "PatchMgr.h"
00014 #include "UniqueSet.h"
00015 #include "UniqueSetIter.h"
00016 #include "Molecule.h"
00017 
00018 #define COLLECT_PERFORMANCE_DATA 0
00019 
00020 #ifdef MEM_OPT_VERSION
00021 class CollectionMgr;
00022 class CollectionMaster;
00023 class CollectionMidMaster;
00024 class CollectMidVectorInstance;
00025 #endif
00026 
00027 class CollectVectorVarMsg;
00028 class PatchMap;
00029 
00030 #include "ParallelIOMgr.decl.h"
00031 
00033 class MolInfoMsg : public CMessage_MolInfoMsg
00034 {
00035 public:
00036     int numBonds;
00037     int numCalcBonds;
00038     int numAngles;
00039     int numCalcAngles;
00040     int numDihedrals;
00041     int numCalcDihedrals;
00042     int numImpropers;
00043     int numCalcImpropers;
00044     int numCrossterms;
00045     int numCalcCrossterms;
00046     int numExclusions;
00047     int numCalcExclusions;
00048     int numCalcFullExclusions;
00049     int numLJPairs;
00050     int numCalcLJPairs;
00051     
00052     int numRigidBonds;
00053 
00054     //used for "comMov" is false in the SimParameter object
00055     //which is usually true -Chao Mei
00056     BigReal totalMass;
00057     Vector totalMV;
00058 
00059     BigReal totalCharge;
00060 };
00061 
00062 class HydroBasedMsg : public CMessage_HydroBasedMsg
00063 {
00064 //Info inside this message needs to be calculated when the hydrogen
00065 //group is constructed
00066 public:
00067     int numFixedRigidBonds;
00068     int numFixedGroups;
00069 };
00070 
00071 class MoveInputAtomsMsg : public CMessage_MoveInputAtomsMsg
00072 {
00073 public:
00074     int length;
00075     InputAtom *atomList;
00076 };
00077 
00078 class AtomsCntPerPatchMsg: public CMessage_AtomsCntPerPatchMsg
00079 {
00080 public:
00081     //one-to-one mapping between two lists
00082     int length;
00083     PatchID *pidList;
00084     unsigned short *atomsCntList;
00085     unsigned short *fixedAtomsCntList;
00086 };
00087 
00088 class MovePatchAtomsMsg: public CMessage_MovePatchAtomsMsg
00089 {
00090 public:
00091     //the total size of allAtoms could be calculated from sizeList
00092     int from;
00093     int patchCnt;
00094     PatchID *pidList;
00095     int *sizeList;
00096     FullAtom *allAtoms;
00097 };
00099 
00101 struct ClusterElem {
00102     int clusterId;
00103     int atomsCnt; 
00104 
00105     ClusterElem() : clusterId(-1), atomsCnt(0) {}
00106     ClusterElem(int cid) : clusterId(cid), atomsCnt(0) {}
00107     int hash() const {
00108         return clusterId;
00109     }
00110     int operator==(const ClusterElem &a) const {
00111         return clusterId == a.clusterId;
00112     }
00113 };
00114 typedef UniqueSet<ClusterElem> ClusterSet;
00115 typedef UniqueSetIter<ClusterElem> ClusterSetIter;
00116 
00117 class ClusterSizeMsg : public CMessage_ClusterSizeMsg
00118 {
00119 public:
00120     int srcRank;
00121     int clusterId;
00122     int atomsCnt;
00123 };
00124 typedef ResizeArray<ClusterSizeMsg *> ClusterSizeMsgBuffer;
00125 
00126 struct ClusterCoorElem{
00127     int clusterId;    
00128     Vector dsum;
00129 
00130     ClusterCoorElem(): clusterId(-1), dsum(0.0) {}
00131     ClusterCoorElem(int cid): clusterId(cid), dsum(0.0) {}
00132     int hash() const {
00133         return clusterId;
00134     }
00135     int operator==(const ClusterCoorElem &a) const {
00136         return clusterId == a.clusterId;
00137     }
00138 };
00139 typedef UniqueSet<ClusterCoorElem> ClusterCoorSet;
00140 typedef UniqueSetIter<ClusterCoorElem> ClusterCoorSetIter;
00141 
00142 class ClusterCoorMsg : public CMessage_ClusterCoorMsg
00143 {
00144 public:
00145     int srcRank;
00146     int clusterId;    
00147     Vector dsum;    
00148 };
00149 typedef ResizeArray<ClusterCoorMsg *> ClusterCoorMsgBuffer;
00151 
00152 class ParallelIOMgr : public CBase_ParallelIOMgr
00153 {
00154 #ifdef MEM_OPT_VERSION
00155     friend class CollectionMgr;
00156     friend class CollectionMaster;
00157     friend class CollectionMidMaster;
00158     friend class CollectMidVectorInstance;
00159 #endif
00160 
00161 private:
00162     SimParameters *simParameters;
00163     Molecule *molecule;
00164 
00166     int numInputProcs;
00167     int *inputProcArray;
00168     //the index to the inputProcArray i.e.
00169     //inputProcArray[myInputRank] == CkMyPe();
00170     //if it is not a input proc, the rank is -1;
00171     int myInputRank;
00172 
00173     //Initially this atom list contains the initially assigned
00174     //atoms, later it will contain atoms from other input processors
00175     //based on the migration group
00176     InputAtomList initAtoms;
00177 
00178     //This atom list contains the migrated atoms from other input
00179     //processors based on the migration group. Once the migration
00180     //finishes, atoms in this list is added to initAtoms, and its
00181     //space is freed.
00182     InputAtomList tmpRecvAtoms;
00183 
00184 
00185     //This variable indicates whether this processor is ready
00186     //to receive atoms of HomePatches to be created on this
00187     //processor
00188     bool isOKToRecvHPAtoms;
00189     FullAtomList *hpAtomsList;
00190     ResizeArray<int> hpIDList; //in increasing order
00191 
00192     //tmp variables
00193     int procsReceived; //used at updateMolInfo and recvAtomsCntPerPatch
00194     int hydroMsgRecved; //used at recvHydroBasedCounter
00195     Vector totalMV; //used to remove center of mass motion
00196     BigReal totalMass; //used to remove center of mass motion
00197     BigReal totalCharge;
00198     int64 numTotalExclusions;
00199     int64 numCalcExclusions;
00200     int64 numCalcFullExclusions;
00202 
00204     int numOutputProcs;
00205     int *outputProcArray;
00206     char *outputProcFlags;
00207     //the index to the outputProcArray i.e.
00208     //outputProcArray[myOutputRank] == CkMyPe();
00209     //if it is not a output proc, the rank is -1;
00210     int myOutputRank;
00211     //the number of simutaneous writers 
00212     //output procs with rank distance of numOutputProcs/numOutputWrts do the
00213     //output at a time
00214     int numOutputWrts;
00215 
00216     //both arrays are of size #local atoms on this output proc
00217     int *clusterID;
00218     int *clusterSize;
00219     //record the number of atoms that a remote cluster has on this
00220     //output processor
00221     ClusterSet remoteClusters;
00222     //record the number of clusters that have atoms on other output procs
00223     //on this output proc. Should be remoteClusters.size();
00224     int numRemoteClusters;
00225     //TEMP var to indicate how many msgs from remote proc have been recved
00226     //for updating cluster sizes in my local repository (linked with 
00227     //numRemoteClusters.
00228     int numCSMAck;
00229 
00230     ClusterSizeMsgBuffer csmBuf; //used to buffer cluster size msgs
00231     //record the number of remote cluster info queries for this output proc.
00232     //i.e. SUM(for a particular cluster on this local output proc, 
00233     //the number of remote output procs that has some atoms belonging 
00234     //to this cluster). Should be csmBuf.size();  
00235     int numRemoteReqs;
00236     //TEMP var to record the number of remote cluster info queries that
00237     //has received (linked with numRemoteReqs)
00238     int numReqRecved;
00239 
00240     //used to store the caculated centralized coordinates for each cluster
00241     //on this local output proc.
00242     ClusterCoorSet remoteCoors; //similar to remoteClusters
00243     ClusterCoorMsgBuffer ccmBuf; //similar to csmBuf but for cluster coor msgs
00244     Position *tmpCoorCon;     
00245     //the array is of size #local atoms on this output proc
00246     char *isWater;
00247 
00248 #ifdef MEM_OPT_VERSION
00249     CollectMidVectorInstance *coorInstance;
00250     CollectionMidMaster *midCM;
00251 #endif
00252 
00253     CkChareID mainMaster;
00254 
00256 
00257 #if COLLECT_PERFORMANCE_DATA
00258     int numFixedAtomLookup;
00259 #endif    
00260 
00261 private:
00262     void readCoordinatesAndVelocity();
00263     //create atom lists that are used for creating home patch
00264     void prepareHomePatchAtomList();
00265     //returns the index in hpIDList which points to pid
00266     int binaryFindHPID(int pid);
00267 
00268     void readInfoForParOutput();
00269 
00270     void integrateClusterCoor();
00271 
00272     int numMyAtoms(int rank, int numProcs);
00273     //returns the number of atoms INITIALLY assigned on this input processor
00274     inline int numInitMyAtomsOnInput() {
00275         return numMyAtoms(myInputRank, numInputProcs);
00276     }
00277     inline int numMyAtomsOnOutput() {
00278         return numMyAtoms(myOutputRank, numOutputProcs);
00279     }
00280 
00281     int atomRank(int atomID, int numProcs);
00282     //returns the rank of the input proc that the atom resides on INITIALLY
00283     inline int atomInitRankOnInput(int atomID) {
00284         return atomRank(atomID, numInputProcs);
00285     }
00286     inline int atomRankOnOutput(int atomID) {
00287         return atomRank(atomID, numOutputProcs);
00288     }
00289 
00290     void getMyAtomsRange(int &lowerIdx, int &upperIdx, int rank, int numProcs);
00291     //get the range of atoms to be read based on the initial distribution
00292     //i.e. atoms from [lowerIdx ... upperIdx] are going to be loaded
00293     inline void getMyAtomsInitRangeOnInput(int &lowerIdx, int &upperIdx) {
00294         return getMyAtomsRange(lowerIdx,upperIdx,myInputRank,numInputProcs);
00295     }
00296     inline void getMyAtomsRangeOnOutput(int &lowerIdx, int &upperIdx) {
00297         return getMyAtomsRange(lowerIdx,upperIdx,myOutputRank,numOutputProcs);
00298     }
00299     inline void getAtomsRangeOnOutput(int &lowerIdx, int &upperIdx, int rank) {
00300         return getMyAtomsRange(lowerIdx,upperIdx,rank,numOutputProcs);
00301     }
00302 
00303     //This function is only valid for the inital distribution of input atoms
00304     //mySAId: the atom id this input proc starts with
00305     //regAId: the atom id to look up
00306     inline int isAtomFixed(int mySAId, int reqAId){
00307         int localIdx = reqAId-mySAId;
00308         if(localIdx>=0 && localIdx<initAtoms.size()){
00309             //atom "thisAId" is on this input proc now!
00310             return initAtoms[localIdx].atomFixed;            
00311         } else{
00312         #if COLLECT_PERFORMANCE_DATA
00313             numFixedAtomLookup++;
00314         #endif
00315             //atom "thisAId" is NOT on this input proc now!
00316             return molecule->is_atom_fixed(reqAId);
00317         }
00318     }   
00319     
00320     //This function returns the highest rank of this output group procs
00321     inline int getMyOutputGroupHighestRank(){
00322         int step = numOutputProcs/numOutputWrts;
00323         int remains = numOutputProcs%numOutputWrts;
00324         //so "remains" output groups contain "step+1" output procs;
00325         //"numOutputWrts-remains" output groups contain "step" output procs;
00326         int limit = (step+1)*remains;
00327         if(myOutputRank<limit){
00328             int idx = myOutputRank/(step+1);
00329             return (idx+1)*(step+1)-1;
00330         }else{
00331             int idx = (myOutputRank-limit)/step;
00332             return limit+(idx+1)*step-1;
00333         }
00334     }
00335 public:
00336     ParallelIOMgr();
00337     ~ParallelIOMgr();
00338 
00339     void initialize(Node *node);
00340 
00341     //read per-atom files including the binary compressed psf
00342     //file, coordinate and velocity files
00343     void readPerAtomInfo();
00344 
00345     //migrate the initally assigned atoms to appropriate input processors
00346     //based on the migration group
00347     void migrateAtomsMGrp();
00348     void recvAtomsMGrp(MoveInputAtomsMsg *msg);
00349     void integrateMigratedAtoms();
00350 
00351     //Reduce counters for Tuples and Exclusions in Molecule globally
00352     void updateMolInfo();
00353     void recvMolInfo(MolInfoMsg *msg);
00354     void bcastMolInfo(MolInfoMsg *msg);
00355     void recvHydroBasedCounter(HydroBasedMsg *msg);
00356     void bcastHydroBasedCounter(HydroBasedMsg *msg);
00357 
00358     //calculate #atoms in each patch and reduce to proc 0
00359     void calcAtomsInEachPatch();
00360     void recvAtomsCntPerPatch(AtomsCntPerPatchMsg *msg);
00361 
00362     //distribute atoms to their homepatch processors
00363     CthThread sendAtomsThread;
00364     void sendAtomsToHomePatchProcs();
00365     int numAcksOutstanding;
00366     void ackAtomsToHomePatchProcs();
00367     void recvAtomsToHomePatchProcs(MovePatchAtomsMsg *msg);
00368 
00369     //create home patches on this processor
00370     void createHomePatches();
00371 
00372     //free the space occupied by atoms' names etc.
00373     void freeMolSpace();
00374 
00375     //used in parallel IO output
00376     int getNumOutputProcs() { return numOutputProcs; }
00377     bool isOutputProcessor(int pe);
00378 
00379     void recvClusterSize(ClusterSizeMsg *msg);
00380     void integrateClusterSize();
00381     void recvFinalClusterSize(ClusterSizeMsg *msg);
00382 
00383     void receivePositions(CollectVectorVarMsg *msg);
00384     void receiveVelocities(CollectVectorVarMsg *msg);
00385     void receiveForces(CollectVectorVarMsg *msg);
00386     void disposePositions(int seq, double prevT);
00387     void disposeVelocities(int seq, double prevT);
00388     void disposeForces(int seq, double prevT);
00389 
00390     void wrapCoor(int seq, Lattice lat);
00391     void recvClusterCoor(ClusterCoorMsg *msg);
00392     void recvFinalClusterCoor(ClusterCoorMsg *msg);
00393 };
00394 
00395 #endif

Generated on Mon Nov 20 01:17:13 2017 for NAMD by  doxygen 1.4.7