NAMD
ParallelIOMgr.h
Go to the documentation of this file.
1 #ifndef PARALLELIOMGR_H
2 #define PARALLELIOMGR_H
3 
4 #include "ProcessorPrivate.h"
5 #include "charm++.h"
6 #include "BOCgroup.h"
7 #include "common.h"
8 #include "CompressPsf.h"
9 #include "Hydrogen.h"
10 #include "Vector.h"
11 #include "NamdState.h"
12 #include "Node.decl.h"
13 #include "PatchMgr.h"
14 #include "UniqueSet.h"
15 #include "UniqueSetIter.h"
16 #include "Molecule.h"
17 
18 #define COLLECT_PERFORMANCE_DATA 0
19 
20 #ifdef MEM_OPT_VERSION
21 class CollectionMgr;
22 class CollectionMaster;
23 class CollectionMidMaster;
24 class CollectMidVectorInstance;
25 #endif
26 
29 class PatchMap;
30 
31 #include "ParallelIOMgr.decl.h"
32 
34 class MolInfoMsg : public CMessage_MolInfoMsg
35 {
36 public:
37  int numBonds;
39  int numAngles;
52 
54 
55  //used for "comMov" is false in the SimParameter object
56  //which is usually true -Chao Mei
59 
61 };
62 
63 class HydroBasedMsg : public CMessage_HydroBasedMsg
64 {
65 //Info inside this message needs to be calculated when the hydrogen
66 //group is constructed
67 public:
70 };
71 
72 class MoveInputAtomsMsg : public CMessage_MoveInputAtomsMsg
73 {
74 public:
75  int length;
77 };
78 
79 class AtomsCntPerPatchMsg: public CMessage_AtomsCntPerPatchMsg
80 {
81 public:
82  //one-to-one mapping between two lists
83  int length;
85  unsigned short *atomsCntList;
86  unsigned short *fixedAtomsCntList;
87 };
88 
89 class MovePatchAtomsMsg: public CMessage_MovePatchAtomsMsg
90 {
91 public:
92  //the total size of allAtoms could be calculated from sizeList
93  int from;
94  int patchCnt;
96  int *sizeList;
98 };
100 
102 struct ClusterElem {
104  int atomsCnt;
105 
107  ClusterElem(int cid) : clusterId(cid), atomsCnt(0) {}
108  int hash() const {
109  return clusterId;
110  }
111  int operator==(const ClusterElem &a) const {
112  return clusterId == a.clusterId;
113  }
114 };
117 
118 class ClusterSizeMsg : public CMessage_ClusterSizeMsg
119 {
120 public:
121  int srcRank;
123  int atomsCnt;
124 };
126 
128  int clusterId;
130 
132  ClusterCoorElem(int cid): clusterId(cid), dsum(0.0) {}
133  int hash() const {
134  return clusterId;
135  }
136  int operator==(const ClusterCoorElem &a) const {
137  return clusterId == a.clusterId;
138  }
139 };
142 
143 class ClusterCoorMsg : public CMessage_ClusterCoorMsg
144 {
145 public:
146  int srcRank;
147  int clusterId;
149 };
152 
153 class ParallelIOMgr : public CBase_ParallelIOMgr
154 {
155 #ifdef MEM_OPT_VERSION
156  friend class CollectionMgr;
157  friend class CollectionMaster;
158  friend class CollectionMidMaster;
159  friend class CollectMidVectorInstance;
160 #endif
161 
162 private:
163  SimParameters *simParameters;
164  Molecule *molecule;
165 
167  int numInputProcs;
168  int *inputProcArray;
169  //the index to the inputProcArray i.e.
170  //inputProcArray[myInputRank] == CkMyPe();
171  //if it is not a input proc, the rank is -1;
172  int myInputRank;
173 
174  //Initially this atom list contains the initially assigned
175  //atoms, later it will contain atoms from other input processors
176  //based on the migration group
177  InputAtomList initAtoms;
178 
179  //This atom list contains the migrated atoms from other input
180  //processors based on the migration group. Once the migration
181  //finishes, atoms in this list is added to initAtoms, and its
182  //space is freed.
183  InputAtomList tmpRecvAtoms;
184 
185 
186  //This variable indicates whether this processor is ready
187  //to receive atoms of HomePatches to be created on this
188  //processor
189  bool isOKToRecvHPAtoms;
190  FullAtomList *hpAtomsList;
191  ResizeArray<int> hpIDList; //in increasing order
192 
193  //tmp variables
194  int procsReceived; //used at updateMolInfo and recvAtomsCntPerPatch
195  int hydroMsgRecved; //used at recvHydroBasedCounter
196  Vector totalMV; //used to remove center of mass motion
197  BigReal totalMass; //used to remove center of mass motion
198  BigReal totalCharge;
199  int64 numTotalExclusions;
200  int64 numCalcExclusions;
201  int64 numCalcFullExclusions;
203 
205  int numOutputProcs;
206  int *outputProcArray;
207  char *outputProcFlags;
208  //the index to the outputProcArray i.e.
209  //outputProcArray[myOutputRank] == CkMyPe();
210  //if it is not a output proc, the rank is -1;
211  int myOutputRank;
212  //the number of simutaneous writers
213  //output procs with rank distance of numOutputProcs/numOutputWrts do the
214  //output at a time
215  int numOutputWrts;
216 
217  int numProxiesPerOutputProc;
218  int myOutputProxyRank;
219  int *outputProxyArray;
220  int *myOutputProxies;
221 
222  int calcMyOutputProxyClients();
223 
224  CollectProxyVectorSequence *myOutputProxyPositions;
225  CollectProxyVectorSequence *myOutputProxyVelocities;
226  CollectProxyVectorSequence *myOutputProxyForces;
227 
228  //both arrays are of size #local atoms on this output proc
229  int *clusterID;
230  int *clusterSize;
231  //record the number of atoms that a remote cluster has on this
232  //output processor
233  ClusterSet remoteClusters;
234  //record the number of clusters that have atoms on other output procs
235  //on this output proc. Should be remoteClusters.size();
236  int numRemoteClusters;
237  //TEMP var to indicate how many msgs from remote proc have been recved
238  //for updating cluster sizes in my local repository (linked with
239  //numRemoteClusters.
240  int numCSMAck;
241 
242  ClusterSizeMsgBuffer csmBuf; //used to buffer cluster size msgs
243  //record the number of remote cluster info queries for this output proc.
244  //i.e. SUM(for a particular cluster on this local output proc,
245  //the number of remote output procs that has some atoms belonging
246  //to this cluster). Should be csmBuf.size();
247  int numRemoteReqs;
248  //TEMP var to record the number of remote cluster info queries that
249  //has received (linked with numRemoteReqs)
250  int numReqRecved;
251 
252  //used to store the caculated centralized coordinates for each cluster
253  //on this local output proc.
254  ClusterCoorSet remoteCoors; //similar to remoteClusters
255  ClusterCoorMsgBuffer ccmBuf; //similar to csmBuf but for cluster coor msgs
256  Position *tmpCoorCon;
257  //the array is of size #local atoms on this output proc
258  char *isWater;
259 
260 #ifdef MEM_OPT_VERSION
261  CollectMidVectorInstance *coorInstance;
262  CollectionMidMaster *midCM;
263 #endif
264 
265  CkChareID mainMaster;
266 
268 
269 #if COLLECT_PERFORMANCE_DATA
270  int numFixedAtomLookup;
271 #endif
272 
273 private:
274  void readCoordinatesAndVelocity();
275  //create atom lists that are used for creating home patch
276  void prepareHomePatchAtomList();
277  //returns the index in hpIDList which points to pid
278  int binaryFindHPID(int pid);
279 
280  void readInfoForParOutput();
281 
282  void integrateClusterCoor();
283 
284  int numMyAtoms(int rank, int numProcs);
285  //returns the number of atoms INITIALLY assigned on this input processor
286  inline int numInitMyAtomsOnInput() {
287  return numMyAtoms(myInputRank, numInputProcs);
288  }
289  inline int numMyAtomsOnOutput() {
290  return numMyAtoms(myOutputRank, numOutputProcs);
291  }
292 
293  int atomRank(int atomID, int numProcs);
294  //returns the rank of the input proc that the atom resides on INITIALLY
295  inline int atomInitRankOnInput(int atomID) {
296  return atomRank(atomID, numInputProcs);
297  }
298  inline int atomRankOnOutput(int atomID) {
299  return atomRank(atomID, numOutputProcs);
300  }
301 
302  void getMyAtomsRange(int &lowerIdx, int &upperIdx, int rank, int numProcs);
303  //get the range of atoms to be read based on the initial distribution
304  //i.e. atoms from [lowerIdx ... upperIdx] are going to be loaded
305  inline void getMyAtomsInitRangeOnInput(int &lowerIdx, int &upperIdx) {
306  return getMyAtomsRange(lowerIdx,upperIdx,myInputRank,numInputProcs);
307  }
308  inline void getMyAtomsRangeOnOutput(int &lowerIdx, int &upperIdx) {
309  return getMyAtomsRange(lowerIdx,upperIdx,myOutputRank,numOutputProcs);
310  }
311  inline void getAtomsRangeOnOutput(int &lowerIdx, int &upperIdx, int rank) {
312  return getMyAtomsRange(lowerIdx,upperIdx,rank,numOutputProcs);
313  }
314 
315  //This function is only valid for the inital distribution of input atoms
316  //mySAId: the atom id this input proc starts with
317  //regAId: the atom id to look up
318  inline int isAtomFixed(int mySAId, int reqAId){
319  int localIdx = reqAId-mySAId;
320  if(localIdx>=0 && localIdx<initAtoms.size()){
321  //atom "thisAId" is on this input proc now!
322  return initAtoms[localIdx].atomFixed;
323  } else{
324  #if COLLECT_PERFORMANCE_DATA
325  numFixedAtomLookup++;
326  #endif
327  //atom "thisAId" is NOT on this input proc now!
328  return molecule->is_atom_fixed(reqAId);
329  }
330  }
331 
332  //This function returns the highest rank of this output group procs
333  inline int getMyOutputGroupHighestRank(){
334  int step = numOutputProcs/numOutputWrts;
335  int remains = numOutputProcs%numOutputWrts;
336  //so "remains" output groups contain "step+1" output procs;
337  //"numOutputWrts-remains" output groups contain "step" output procs;
338  int limit = (step+1)*remains;
339  if(myOutputRank<limit){
340  int idx = myOutputRank/(step+1);
341  return (idx+1)*(step+1)-1;
342  }else{
343  int idx = (myOutputRank-limit)/step;
344  return limit+(idx+1)*step-1;
345  }
346  }
347 public:
348  ParallelIOMgr();
349  ~ParallelIOMgr();
350 
351  void initialize(Node *node);
352 
353  //read per-atom files including the binary compressed psf
354  //file, coordinate and velocity files
355  void readPerAtomInfo();
356 
357  //migrate the initally assigned atoms to appropriate input processors
358  //based on the migration group
359  void migrateAtomsMGrp();
360  void recvAtomsMGrp(MoveInputAtomsMsg *msg);
361  void integrateMigratedAtoms();
362 
363  //Reduce counters for Tuples and Exclusions in Molecule globally
364  void updateMolInfo();
365  void recvMolInfo(MolInfoMsg *msg);
366  void bcastMolInfo(MolInfoMsg *msg);
369 
370  //calculate #atoms in each patch and reduce to proc 0
371  void calcAtomsInEachPatch();
373 
374  //distribute atoms to their homepatch processors
375  CthThread sendAtomsThread;
380 
381  //create home patches on this processor
382  void createHomePatches();
383 
384  //free the space occupied by atoms' names etc.
385  void freeMolSpace();
386 
387  //used in parallel IO output
388  int getNumOutputProcs() { return numOutputProcs; }
389  bool isOutputProcessor(int pe);
390 
391  void recvClusterSize(ClusterSizeMsg *msg);
392  void integrateClusterSize();
394 
398  void disposePositions(int seq, double prevT);
399  void disposeVelocities(int seq, double prevT);
400  void disposeForces(int seq, double prevT);
401 
402  void wrapCoor(int seq, Lattice lat);
403  void recvClusterCoor(ClusterCoorMsg *msg);
405 };
406 
407 #endif
int64 numCalcFullExclusions
Definition: ParallelIOMgr.h:49
unsigned short * fixedAtomsCntList
Definition: ParallelIOMgr.h:86
void sendAtomsToHomePatchProcs()
Definition: Node.h:78
int getNumOutputProcs()
void recvClusterSize(ClusterSizeMsg *msg)
int numCalcCrossterms
Definition: ParallelIOMgr.h:46
Definition: Vector.h:64
UniqueSet< ClusterCoorElem > ClusterCoorSet
void integrateClusterSize()
void recvAtomsMGrp(MoveInputAtomsMsg *msg)
void readPerAtomInfo()
InputAtom * atomList
Definition: ParallelIOMgr.h:76
int operator==(const ClusterCoorElem &a) const
void disposePositions(int seq, double prevT)
int64 numCalcExclusions
Definition: ParallelIOMgr.h:48
int operator==(const ClusterElem &a) const
void migrateAtomsMGrp()
UniqueSetIter< ClusterElem > ClusterSetIter
int hash() const
int numRigidBonds
Definition: ParallelIOMgr.h:53
void wrapCoor(int seq, Lattice lat)
int numCalcDihedrals
Definition: ParallelIOMgr.h:42
ResizeArray< ClusterCoorMsg * > ClusterCoorMsgBuffer
void receiveForces(CollectVectorVarMsg *msg)
void recvAtomsToHomePatchProcs(MovePatchAtomsMsg *msg)
void integrateMigratedAtoms()
int numDihedrals
Definition: ParallelIOMgr.h:41
void disposeForces(int seq, double prevT)
int hash() const
BigReal totalCharge
Definition: ParallelIOMgr.h:60
void recvClusterCoor(ClusterCoorMsg *msg)
void updateMolInfo()
void recvAtomsCntPerPatch(AtomsCntPerPatchMsg *msg)
int64 numExclusions
Definition: ParallelIOMgr.h:47
BigReal totalMass
Definition: ParallelIOMgr.h:57
void recvHydroBasedCounter(HydroBasedMsg *msg)
CthThread sendAtomsThread
FullAtom * allAtoms
Definition: ParallelIOMgr.h:97
void recvFinalClusterCoor(ClusterCoorMsg *msg)
int PatchID
Definition: NamdTypes.h:182
int numCalcImpropers
Definition: ParallelIOMgr.h:44
int numCrossterms
Definition: ParallelIOMgr.h:45
int numCalcLJPairs
Definition: ParallelIOMgr.h:51
void ackAtomsToHomePatchProcs()
ClusterCoorElem(int cid)
void initialize(Node *node)
long long int64
Definition: common.h:34
int numFixedRigidBonds
Definition: ParallelIOMgr.h:68
void recvFinalClusterSize(ClusterSizeMsg *msg)
void disposeVelocities(int seq, double prevT)
void calcAtomsInEachPatch()
ResizeArray< ClusterSizeMsg * > ClusterSizeMsgBuffer
unsigned short * atomsCntList
Definition: ParallelIOMgr.h:85
bool isOutputProcessor(int pe)
int numCalcBonds
Definition: ParallelIOMgr.h:38
int size(void) const
Definition: ResizeArray.h:127
UniqueSetIter< ClusterCoorElem > ClusterCoorSetIter
Bool is_atom_fixed(int atomnum) const
Definition: Molecule.h:1411
void bcastHydroBasedCounter(HydroBasedMsg *msg)
Vector totalMV
Definition: ParallelIOMgr.h:58
ClusterElem(int cid)
void receiveVelocities(CollectVectorVarMsg *msg)
int numImpropers
Definition: ParallelIOMgr.h:43
void recvMolInfo(MolInfoMsg *msg)
void bcastMolInfo(MolInfoMsg *msg)
void createHomePatches()
UniqueSet< ClusterElem > ClusterSet
double BigReal
Definition: common.h:114
void receivePositions(CollectVectorVarMsg *msg)
int numCalcAngles
Definition: ParallelIOMgr.h:40