Main Page | Namespace List | Class Hierarchy | Alphabetical List | Class List | File List | Namespace Members | Class Members | File Members

ParallelIOMgr.h

Go to the documentation of this file.
00001 #ifndef PARALLELIOMGR_H
00002 #define PARALLELIOMGR_H
00003 
00004 #include "ProcessorPrivate.h"
00005 #include "charm++.h"
00006 #include "BOCgroup.h"
00007 #include "common.h"
00008 #include "CompressPsf.h"
00009 #include "Hydrogen.h"
00010 #include "Vector.h"
00011 #include "NamdState.h"
00012 #include "Node.decl.h"
00013 #include "PatchMgr.h"
00014 #include "UniqueSet.h"
00015 #include "UniqueSetIter.h"
00016 #include "Molecule.h"
00017 
00018 #define COLLECT_PERFORMANCE_DATA 0
00019 
00020 #ifdef MEM_OPT_VERSION
00021 class CollectionMgr;
00022 class CollectionMaster;
00023 class CollectionMidMaster;
00024 class CollectMidVectorInstance;
00025 #endif
00026 
00027 class CollectVectorVarMsg;
00028 class PatchMap;
00029 
00030 #include "ParallelIOMgr.decl.h"
00031 
00033 class MolInfoMsg : public CMessage_MolInfoMsg
00034 {
00035 public:
00036     int numBonds;
00037     int numCalcBonds;
00038     int numAngles;
00039     int numCalcAngles;
00040     int numDihedrals;
00041     int numCalcDihedrals;
00042     int numImpropers;
00043     int numCalcImpropers;
00044     int numCrossterms;
00045     int numCalcCrossterms;
00046     int numExclusions;
00047     int numCalcExclusions;
00048     
00049     int numRigidBonds;
00050 
00051     //used for "comMov" is false in the SimParameter object
00052     //which is usually true -Chao Mei
00053     BigReal totalMass;
00054     Vector totalMV;
00055 
00056     BigReal totalCharge;
00057 };
00058 
00059 class HydroBasedMsg : public CMessage_HydroBasedMsg
00060 {
00061 //Info inside this message needs to be calculated when the hydrogen
00062 //group is constructed
00063 public:
00064     int numFixedRigidBonds;
00065     int numFixedGroups;
00066 };
00067 
00068 class MoveInputAtomsMsg : public CMessage_MoveInputAtomsMsg
00069 {
00070 public:
00071     int length;
00072     InputAtom *atomList;
00073 };
00074 
00075 class AtomsCntPerPatchMsg: public CMessage_AtomsCntPerPatchMsg
00076 {
00077 public:
00078     //one-to-one mapping between two lists
00079     int length;
00080     PatchID *pidList;
00081     unsigned short *atomsCntList;
00082     unsigned short *fixedAtomsCntList;
00083 };
00084 
00085 class MovePatchAtomsMsg: public CMessage_MovePatchAtomsMsg
00086 {
00087 public:
00088     //the total size of allAtoms could be calculated from sizeList
00089     int from;
00090     int patchCnt;
00091     PatchID *pidList;
00092     int *sizeList;
00093     FullAtom *allAtoms;
00094 };
00096 
00098 struct ClusterElem {
00099     int clusterId;
00100     int atomsCnt; 
00101 
00102     ClusterElem() : clusterId(-1), atomsCnt(0) {}
00103     ClusterElem(int cid) : clusterId(cid), atomsCnt(0) {}
00104     int hash() const {
00105         return clusterId;
00106     }
00107     int operator==(const ClusterElem &a) const {
00108         return clusterId == a.clusterId;
00109     }
00110 };
00111 typedef UniqueSet<ClusterElem> ClusterSet;
00112 typedef UniqueSetIter<ClusterElem> ClusterSetIter;
00113 
00114 class ClusterSizeMsg : public CMessage_ClusterSizeMsg
00115 {
00116 public:
00117     int srcRank;
00118     int clusterId;
00119     int atomsCnt;
00120 };
00121 typedef ResizeArray<ClusterSizeMsg *> ClusterSizeMsgBuffer;
00122 
00123 struct ClusterCoorElem{
00124     int clusterId;    
00125     Vector dsum;
00126 
00127     ClusterCoorElem(): clusterId(-1), dsum(0.0) {}
00128     ClusterCoorElem(int cid): clusterId(cid), dsum(0.0) {}
00129     int hash() const {
00130         return clusterId;
00131     }
00132     int operator==(const ClusterCoorElem &a) const {
00133         return clusterId == a.clusterId;
00134     }
00135 };
00136 typedef UniqueSet<ClusterCoorElem> ClusterCoorSet;
00137 typedef UniqueSetIter<ClusterCoorElem> ClusterCoorSetIter;
00138 
00139 class ClusterCoorMsg : public CMessage_ClusterCoorMsg
00140 {
00141 public:
00142     int srcRank;
00143     int clusterId;    
00144     Vector dsum;    
00145 };
00146 typedef ResizeArray<ClusterCoorMsg *> ClusterCoorMsgBuffer;
00148 
00149 class ParallelIOMgr : public BOCclass
00150 {
00151 #ifdef MEM_OPT_VERSION
00152     friend class CollectionMgr;
00153     friend class CollectionMaster;
00154     friend class CollectionMidMaster;
00155     friend class CollectMidVectorInstance;
00156 #endif
00157 
00158 private:
00159     SimParameters *simParameters;
00160     Molecule *molecule;
00161 
00163     int numInputProcs;
00164     int *inputProcArray;
00165     //the index to the inputProcArray i.e.
00166     //inputProcArray[myInputRank] == CkMyPe();
00167     //if it is not a input proc, the rank is -1;
00168     int myInputRank;
00169 
00170     //Initially this atom list contains the initially assigned
00171     //atoms, later it will contain atoms from other input processors
00172     //based on the migration group
00173     InputAtomList initAtoms;
00174 
00175     //This atom list contains the migrated atoms from other input
00176     //processors based on the migration group. Once the migration
00177     //finishes, atoms in this list is added to initAtoms, and its
00178     //space is freed.
00179     InputAtomList tmpRecvAtoms;
00180 
00181 
00182     //This variable indicates whether this processor is ready
00183     //to receive atoms of HomePatches to be created on this
00184     //processor
00185     bool isOKToRecvHPAtoms;
00186     FullAtomList *hpAtomsList;
00187     ResizeArray<int> hpIDList; //in increasing order
00188 
00189     //tmp variables
00190     int procsReceived; //used at updateMolInfo and recvAtomsCntPerPatch
00191     int hydroMsgRecved; //used at recvHydroBasedCounter
00192     Vector totalMV; //used to remove center of mass motion
00193     BigReal totalMass; //used to remove center of mass motion
00194     BigReal totalCharge;
00196 
00198     int numOutputProcs;
00199     int *outputProcArray;
00200     char *outputProcFlags;
00201     //the index to the outputProcArray i.e.
00202     //outputProcArray[myOutputRank] == CkMyPe();
00203     //if it is not a output proc, the rank is -1;
00204     int myOutputRank;
00205     //the number of simutaneous writers 
00206     //output procs with rank distance of numOutputProcs/numOutputWrts do the
00207     //output at a time
00208     int numOutputWrts;
00209 
00210     //both arrays are of size #local atoms on this output proc
00211     int *clusterID;
00212     int *clusterSize;
00213     //record the number of atoms that a remote cluster has on this
00214     //output processor
00215     ClusterSet remoteClusters;
00216     //record the number of clusters that have atoms on other output procs
00217     //on this output proc. Should be remoteClusters.size();
00218     int numRemoteClusters;
00219     //TEMP var to indicate how many msgs from remote proc have been recved
00220     //for updating cluster sizes in my local repository (linked with 
00221     //numRemoteClusters.
00222     int numCSMAck;
00223 
00224     ClusterSizeMsgBuffer csmBuf; //used to buffer cluster size msgs
00225     //record the number of remote cluster info queries for this output proc.
00226     //i.e. SUM(for a particular cluster on this local output proc, 
00227     //the number of remote output procs that has some atoms belonging 
00228     //to this cluster). Should be csmBuf.size();  
00229     int numRemoteReqs;
00230     //TEMP var to record the number of remote cluster info queries that
00231     //has received (linked with numRemoteReqs)
00232     int numReqRecved;
00233 
00234     //used to store the caculated centralized coordinates for each cluster
00235     //on this local output proc.
00236     ClusterCoorSet remoteCoors; //similar to remoteClusters
00237     ClusterCoorMsgBuffer ccmBuf; //similar to csmBuf but for cluster coor msgs
00238     Position *tmpCoorCon;     
00239     //the array is of size #local atoms on this output proc
00240     char *isWater;
00241 
00242 #ifdef MEM_OPT_VERSION
00243     CollectMidVectorInstance *coorInstance;
00244     CollectionMidMaster *midCM;
00245 #endif
00246 
00247     CkChareID mainMaster;
00248 
00250 
00251 #if COLLECT_PERFORMANCE_DATA
00252     int numFixedAtomLookup;
00253 #endif    
00254 
00255 private:
00256     void readCoordinatesAndVelocity();
00257     //create atom lists that are used for creating home patch
00258     void prepareHomePatchAtomList();
00259     //returns the index in hpIDList which points to pid
00260     int binaryFindHPID(int pid);
00261 
00262     void readInfoForParOutput();
00263 
00264     void integrateClusterCoor();
00265 
00266     int numMyAtoms(int rank, int numProcs);
00267     //returns the number of atoms INITIALLY assigned on this input processor
00268     inline int numInitMyAtomsOnInput() {
00269         return numMyAtoms(myInputRank, numInputProcs);
00270     }
00271     inline int numMyAtomsOnOutput() {
00272         return numMyAtoms(myOutputRank, numOutputProcs);
00273     }
00274 
00275     int atomRank(int atomID, int numProcs);
00276     //returns the rank of the input proc that the atom resides on INITIALLY
00277     inline int atomInitRankOnInput(int atomID) {
00278         return atomRank(atomID, numInputProcs);
00279     }
00280     inline int atomRankOnOutput(int atomID) {
00281         return atomRank(atomID, numOutputProcs);
00282     }
00283 
00284     void getMyAtomsRange(int &lowerIdx, int &upperIdx, int rank, int numProcs);
00285     //get the range of atoms to be read based on the initial distribution
00286     //i.e. atoms from [lowerIdx ... upperIdx] are going to be loaded
00287     inline void getMyAtomsInitRangeOnInput(int &lowerIdx, int &upperIdx) {
00288         return getMyAtomsRange(lowerIdx,upperIdx,myInputRank,numInputProcs);
00289     }
00290     inline void getMyAtomsRangeOnOutput(int &lowerIdx, int &upperIdx) {
00291         return getMyAtomsRange(lowerIdx,upperIdx,myOutputRank,numOutputProcs);
00292     }
00293     inline void getAtomsRangeOnOutput(int &lowerIdx, int &upperIdx, int rank) {
00294         return getMyAtomsRange(lowerIdx,upperIdx,rank,numOutputProcs);
00295     }
00296 
00297     //This function is only valid for the inital distribution of input atoms
00298     //mySAId: the atom id this input proc starts with
00299     //regAId: the atom id to look up
00300     inline int isAtomFixed(int mySAId, int reqAId){
00301         int localIdx = reqAId-mySAId;
00302         if(localIdx>=0 && localIdx<initAtoms.size()){
00303             //atom "thisAId" is on this input proc now!
00304             return initAtoms[localIdx].atomFixed;            
00305         } else{
00306         #if COLLECT_PERFORMANCE_DATA
00307             numFixedAtomLookup++;
00308         #endif
00309             //atom "thisAId" is NOT on this input proc now!
00310             return molecule->is_atom_fixed(reqAId);
00311         }
00312     }   
00313     
00314     //This function returns the highest rank of this output group procs
00315     inline int getMyOutputGroupHighestRank(){
00316         int step = numOutputProcs/numOutputWrts;
00317         int remains = numOutputProcs%numOutputWrts;
00318         //so "remains" output groups contain "step+1" output procs;
00319         //"numOutputWrts-remains" output groups contain "step" output procs;
00320         int limit = (step+1)*remains;
00321         if(myOutputRank<limit){
00322             int idx = myOutputRank/(step+1);
00323             return (idx+1)*(step+1)-1;
00324         }else{
00325             int idx = (myOutputRank-limit)/step;
00326             return limit+(idx+1)*step-1;
00327         }
00328     }
00329 public:
00330     ParallelIOMgr();
00331     ~ParallelIOMgr();
00332 
00333     void initialize(Node *node);
00334 
00335     //read per-atom files including the binary compressed psf
00336     //file, coordinate and velocity files
00337     void readPerAtomInfo();
00338 
00339     //migrate the initally assigned atoms to appropriate input processors
00340     //based on the migration group
00341     void migrateAtomsMGrp();
00342     void recvAtomsMGrp(MoveInputAtomsMsg *msg);
00343     void integrateMigratedAtoms();
00344 
00345     //Reduce counters for Tuples and Exclusions in Molecule globally
00346     void updateMolInfo();
00347     void recvMolInfo(MolInfoMsg *msg);
00348     void bcastMolInfo(MolInfoMsg *msg);
00349     void recvHydroBasedCounter(HydroBasedMsg *msg);
00350     void bcastHydroBasedCounter(HydroBasedMsg *msg);
00351 
00352     //calculate #atoms in each patch and reduce to proc 0
00353     void calcAtomsInEachPatch();
00354     void recvAtomsCntPerPatch(AtomsCntPerPatchMsg *msg);
00355 
00356     //distribute atoms to their homepatch processors
00357     CthThread sendAtomsThread;
00358     void sendAtomsToHomePatchProcs();
00359     int numAcksOutstanding;
00360     void ackAtomsToHomePatchProcs();
00361     void recvAtomsToHomePatchProcs(MovePatchAtomsMsg *msg);
00362 
00363     //create home patches on this processor
00364     void createHomePatches();
00365 
00366     //free the space occupied by atoms' names etc.
00367     void freeMolSpace();
00368 
00369     //used in parallel IO output
00370     int getNumOutputProcs() { return numOutputProcs; }
00371     bool isOutputProcessor(int pe);
00372 
00373     void recvClusterSize(ClusterSizeMsg *msg);
00374     void integrateClusterSize();
00375     void recvFinalClusterSize(ClusterSizeMsg *msg);
00376 
00377     void receivePositions(CollectVectorVarMsg *msg);
00378     void receiveVelocities(CollectVectorVarMsg *msg);
00379     void receiveForces(CollectVectorVarMsg *msg);
00380     void disposePositions(int seq, double prevT);
00381     void disposeVelocities(int seq, double prevT);
00382     void disposeForces(int seq, double prevT);
00383 
00384     void wrapCoor(int seq, Lattice lat);
00385     void recvClusterCoor(ClusterCoorMsg *msg);
00386     void recvFinalClusterCoor(ClusterCoorMsg *msg);
00387 };
00388 
00389 #endif

Generated on Wed May 22 04:07:17 2013 for NAMD by  doxygen 1.3.9.1