ParallelIOMgr.h

Go to the documentation of this file.
00001 #ifndef PARALLELIOMGR_H
00002 #define PARALLELIOMGR_H
00003 
00004 #include "ProcessorPrivate.h"
00005 #include "charm++.h"
00006 #include "BOCgroup.h"
00007 #include "common.h"
00008 #include "CompressPsf.h"
00009 #include "Hydrogen.h"
00010 #include "Vector.h"
00011 #include "NamdState.h"
00012 #include "Node.decl.h"
00013 #include "PatchMgr.h"
00014 #include "UniqueSet.h"
00015 #include "UniqueSetIter.h"
00016 #include "Molecule.h"
00017 
00018 #define COLLECT_PERFORMANCE_DATA 0
00019 
00020 #ifdef MEM_OPT_VERSION
00021 class CollectionMgr;
00022 class CollectionMaster;
00023 class CollectionMidMaster;
00024 class CollectMidVectorInstance;
00025 #endif
00026 
00027 class CollectVectorVarMsg;
00028 class PatchMap;
00029 
00030 #include "ParallelIOMgr.decl.h"
00031 
00033 class MolInfoMsg : public CMessage_MolInfoMsg
00034 {
00035 public:
00036     int numBonds;
00037     int numCalcBonds;
00038     int numAngles;
00039     int numCalcAngles;
00040     int numDihedrals;
00041     int numCalcDihedrals;
00042     int numImpropers;
00043     int numCalcImpropers;
00044     int numCrossterms;
00045     int numCalcCrossterms;
00046     int numExclusions;
00047     int numCalcExclusions;
00048     int numCalcFullExclusions;
00049     int numLJPairs;
00050     int numCalcLJPairs;
00051     
00052     int numRigidBonds;
00053 
00054     //used for "comMov" is false in the SimParameter object
00055     //which is usually true -Chao Mei
00056     BigReal totalMass;
00057     Vector totalMV;
00058 
00059     BigReal totalCharge;
00060 };
00061 
00062 class HydroBasedMsg : public CMessage_HydroBasedMsg
00063 {
00064 //Info inside this message needs to be calculated when the hydrogen
00065 //group is constructed
00066 public:
00067     int numFixedRigidBonds;
00068     int numFixedGroups;
00069 };
00070 
00071 class MoveInputAtomsMsg : public CMessage_MoveInputAtomsMsg
00072 {
00073 public:
00074     int length;
00075     InputAtom *atomList;
00076 };
00077 
00078 class AtomsCntPerPatchMsg: public CMessage_AtomsCntPerPatchMsg
00079 {
00080 public:
00081     //one-to-one mapping between two lists
00082     int length;
00083     PatchID *pidList;
00084     unsigned short *atomsCntList;
00085     unsigned short *fixedAtomsCntList;
00086 };
00087 
00088 class MovePatchAtomsMsg: public CMessage_MovePatchAtomsMsg
00089 {
00090 public:
00091     //the total size of allAtoms could be calculated from sizeList
00092     int from;
00093     int patchCnt;
00094     PatchID *pidList;
00095     int *sizeList;
00096     FullAtom *allAtoms;
00097 };
00099 
00101 struct ClusterElem {
00102     int clusterId;
00103     int atomsCnt; 
00104 
00105     ClusterElem() : clusterId(-1), atomsCnt(0) {}
00106     ClusterElem(int cid) : clusterId(cid), atomsCnt(0) {}
00107     int hash() const {
00108         return clusterId;
00109     }
00110     int operator==(const ClusterElem &a) const {
00111         return clusterId == a.clusterId;
00112     }
00113 };
00114 typedef UniqueSet<ClusterElem> ClusterSet;
00115 typedef UniqueSetIter<ClusterElem> ClusterSetIter;
00116 
00117 class ClusterSizeMsg : public CMessage_ClusterSizeMsg
00118 {
00119 public:
00120     int srcRank;
00121     int clusterId;
00122     int atomsCnt;
00123 };
00124 typedef ResizeArray<ClusterSizeMsg *> ClusterSizeMsgBuffer;
00125 
00126 struct ClusterCoorElem{
00127     int clusterId;    
00128     Vector dsum;
00129 
00130     ClusterCoorElem(): clusterId(-1), dsum(0.0) {}
00131     ClusterCoorElem(int cid): clusterId(cid), dsum(0.0) {}
00132     int hash() const {
00133         return clusterId;
00134     }
00135     int operator==(const ClusterCoorElem &a) const {
00136         return clusterId == a.clusterId;
00137     }
00138 };
00139 typedef UniqueSet<ClusterCoorElem> ClusterCoorSet;
00140 typedef UniqueSetIter<ClusterCoorElem> ClusterCoorSetIter;
00141 
00142 class ClusterCoorMsg : public CMessage_ClusterCoorMsg
00143 {
00144 public:
00145     int srcRank;
00146     int clusterId;    
00147     Vector dsum;    
00148 };
00149 typedef ResizeArray<ClusterCoorMsg *> ClusterCoorMsgBuffer;
00151 
00152 class ParallelIOMgr : public CBase_ParallelIOMgr
00153 {
00154 #ifdef MEM_OPT_VERSION
00155     friend class CollectionMgr;
00156     friend class CollectionMaster;
00157     friend class CollectionMidMaster;
00158     friend class CollectMidVectorInstance;
00159 #endif
00160 
00161 private:
00162     SimParameters *simParameters;
00163     Molecule *molecule;
00164 
00166     int numInputProcs;
00167     int *inputProcArray;
00168     //the index to the inputProcArray i.e.
00169     //inputProcArray[myInputRank] == CkMyPe();
00170     //if it is not a input proc, the rank is -1;
00171     int myInputRank;
00172 
00173     //Initially this atom list contains the initially assigned
00174     //atoms, later it will contain atoms from other input processors
00175     //based on the migration group
00176     InputAtomList initAtoms;
00177 
00178     //This atom list contains the migrated atoms from other input
00179     //processors based on the migration group. Once the migration
00180     //finishes, atoms in this list is added to initAtoms, and its
00181     //space is freed.
00182     InputAtomList tmpRecvAtoms;
00183 
00184 
00185     //This variable indicates whether this processor is ready
00186     //to receive atoms of HomePatches to be created on this
00187     //processor
00188     bool isOKToRecvHPAtoms;
00189     FullAtomList *hpAtomsList;
00190     ResizeArray<int> hpIDList; //in increasing order
00191 
00192     //tmp variables
00193     int procsReceived; //used at updateMolInfo and recvAtomsCntPerPatch
00194     int hydroMsgRecved; //used at recvHydroBasedCounter
00195     Vector totalMV; //used to remove center of mass motion
00196     BigReal totalMass; //used to remove center of mass motion
00197     BigReal totalCharge;
00199 
00201     int numOutputProcs;
00202     int *outputProcArray;
00203     char *outputProcFlags;
00204     //the index to the outputProcArray i.e.
00205     //outputProcArray[myOutputRank] == CkMyPe();
00206     //if it is not a output proc, the rank is -1;
00207     int myOutputRank;
00208     //the number of simutaneous writers 
00209     //output procs with rank distance of numOutputProcs/numOutputWrts do the
00210     //output at a time
00211     int numOutputWrts;
00212 
00213     //both arrays are of size #local atoms on this output proc
00214     int *clusterID;
00215     int *clusterSize;
00216     //record the number of atoms that a remote cluster has on this
00217     //output processor
00218     ClusterSet remoteClusters;
00219     //record the number of clusters that have atoms on other output procs
00220     //on this output proc. Should be remoteClusters.size();
00221     int numRemoteClusters;
00222     //TEMP var to indicate how many msgs from remote proc have been recved
00223     //for updating cluster sizes in my local repository (linked with 
00224     //numRemoteClusters.
00225     int numCSMAck;
00226 
00227     ClusterSizeMsgBuffer csmBuf; //used to buffer cluster size msgs
00228     //record the number of remote cluster info queries for this output proc.
00229     //i.e. SUM(for a particular cluster on this local output proc, 
00230     //the number of remote output procs that has some atoms belonging 
00231     //to this cluster). Should be csmBuf.size();  
00232     int numRemoteReqs;
00233     //TEMP var to record the number of remote cluster info queries that
00234     //has received (linked with numRemoteReqs)
00235     int numReqRecved;
00236 
00237     //used to store the caculated centralized coordinates for each cluster
00238     //on this local output proc.
00239     ClusterCoorSet remoteCoors; //similar to remoteClusters
00240     ClusterCoorMsgBuffer ccmBuf; //similar to csmBuf but for cluster coor msgs
00241     Position *tmpCoorCon;     
00242     //the array is of size #local atoms on this output proc
00243     char *isWater;
00244 
00245 #ifdef MEM_OPT_VERSION
00246     CollectMidVectorInstance *coorInstance;
00247     CollectionMidMaster *midCM;
00248 #endif
00249 
00250     CkChareID mainMaster;
00251 
00253 
00254 #if COLLECT_PERFORMANCE_DATA
00255     int numFixedAtomLookup;
00256 #endif    
00257 
00258 private:
00259     void readCoordinatesAndVelocity();
00260     //create atom lists that are used for creating home patch
00261     void prepareHomePatchAtomList();
00262     //returns the index in hpIDList which points to pid
00263     int binaryFindHPID(int pid);
00264 
00265     void readInfoForParOutput();
00266 
00267     void integrateClusterCoor();
00268 
00269     int numMyAtoms(int rank, int numProcs);
00270     //returns the number of atoms INITIALLY assigned on this input processor
00271     inline int numInitMyAtomsOnInput() {
00272         return numMyAtoms(myInputRank, numInputProcs);
00273     }
00274     inline int numMyAtomsOnOutput() {
00275         return numMyAtoms(myOutputRank, numOutputProcs);
00276     }
00277 
00278     int atomRank(int atomID, int numProcs);
00279     //returns the rank of the input proc that the atom resides on INITIALLY
00280     inline int atomInitRankOnInput(int atomID) {
00281         return atomRank(atomID, numInputProcs);
00282     }
00283     inline int atomRankOnOutput(int atomID) {
00284         return atomRank(atomID, numOutputProcs);
00285     }
00286 
00287     void getMyAtomsRange(int &lowerIdx, int &upperIdx, int rank, int numProcs);
00288     //get the range of atoms to be read based on the initial distribution
00289     //i.e. atoms from [lowerIdx ... upperIdx] are going to be loaded
00290     inline void getMyAtomsInitRangeOnInput(int &lowerIdx, int &upperIdx) {
00291         return getMyAtomsRange(lowerIdx,upperIdx,myInputRank,numInputProcs);
00292     }
00293     inline void getMyAtomsRangeOnOutput(int &lowerIdx, int &upperIdx) {
00294         return getMyAtomsRange(lowerIdx,upperIdx,myOutputRank,numOutputProcs);
00295     }
00296     inline void getAtomsRangeOnOutput(int &lowerIdx, int &upperIdx, int rank) {
00297         return getMyAtomsRange(lowerIdx,upperIdx,rank,numOutputProcs);
00298     }
00299 
00300     //This function is only valid for the inital distribution of input atoms
00301     //mySAId: the atom id this input proc starts with
00302     //regAId: the atom id to look up
00303     inline int isAtomFixed(int mySAId, int reqAId){
00304         int localIdx = reqAId-mySAId;
00305         if(localIdx>=0 && localIdx<initAtoms.size()){
00306             //atom "thisAId" is on this input proc now!
00307             return initAtoms[localIdx].atomFixed;            
00308         } else{
00309         #if COLLECT_PERFORMANCE_DATA
00310             numFixedAtomLookup++;
00311         #endif
00312             //atom "thisAId" is NOT on this input proc now!
00313             return molecule->is_atom_fixed(reqAId);
00314         }
00315     }   
00316     
00317     //This function returns the highest rank of this output group procs
00318     inline int getMyOutputGroupHighestRank(){
00319         int step = numOutputProcs/numOutputWrts;
00320         int remains = numOutputProcs%numOutputWrts;
00321         //so "remains" output groups contain "step+1" output procs;
00322         //"numOutputWrts-remains" output groups contain "step" output procs;
00323         int limit = (step+1)*remains;
00324         if(myOutputRank<limit){
00325             int idx = myOutputRank/(step+1);
00326             return (idx+1)*(step+1)-1;
00327         }else{
00328             int idx = (myOutputRank-limit)/step;
00329             return limit+(idx+1)*step-1;
00330         }
00331     }
00332 public:
00333     ParallelIOMgr();
00334     ~ParallelIOMgr();
00335 
00336     void initialize(Node *node);
00337 
00338     //read per-atom files including the binary compressed psf
00339     //file, coordinate and velocity files
00340     void readPerAtomInfo();
00341 
00342     //migrate the initally assigned atoms to appropriate input processors
00343     //based on the migration group
00344     void migrateAtomsMGrp();
00345     void recvAtomsMGrp(MoveInputAtomsMsg *msg);
00346     void integrateMigratedAtoms();
00347 
00348     //Reduce counters for Tuples and Exclusions in Molecule globally
00349     void updateMolInfo();
00350     void recvMolInfo(MolInfoMsg *msg);
00351     void bcastMolInfo(MolInfoMsg *msg);
00352     void recvHydroBasedCounter(HydroBasedMsg *msg);
00353     void bcastHydroBasedCounter(HydroBasedMsg *msg);
00354 
00355     //calculate #atoms in each patch and reduce to proc 0
00356     void calcAtomsInEachPatch();
00357     void recvAtomsCntPerPatch(AtomsCntPerPatchMsg *msg);
00358 
00359     //distribute atoms to their homepatch processors
00360     CthThread sendAtomsThread;
00361     void sendAtomsToHomePatchProcs();
00362     int numAcksOutstanding;
00363     void ackAtomsToHomePatchProcs();
00364     void recvAtomsToHomePatchProcs(MovePatchAtomsMsg *msg);
00365 
00366     //create home patches on this processor
00367     void createHomePatches();
00368 
00369     //free the space occupied by atoms' names etc.
00370     void freeMolSpace();
00371 
00372     //used in parallel IO output
00373     int getNumOutputProcs() { return numOutputProcs; }
00374     bool isOutputProcessor(int pe);
00375 
00376     void recvClusterSize(ClusterSizeMsg *msg);
00377     void integrateClusterSize();
00378     void recvFinalClusterSize(ClusterSizeMsg *msg);
00379 
00380     void receivePositions(CollectVectorVarMsg *msg);
00381     void receiveVelocities(CollectVectorVarMsg *msg);
00382     void receiveForces(CollectVectorVarMsg *msg);
00383     void disposePositions(int seq, double prevT);
00384     void disposeVelocities(int seq, double prevT);
00385     void disposeForces(int seq, double prevT);
00386 
00387     void wrapCoor(int seq, Lattice lat);
00388     void recvClusterCoor(ClusterCoorMsg *msg);
00389     void recvFinalClusterCoor(ClusterCoorMsg *msg);
00390 };
00391 
00392 #endif

Generated on Tue Sep 26 01:17:14 2017 for NAMD by  doxygen 1.4.7