NAMD
ParallelIOMgr.h
Go to the documentation of this file.
1 #ifndef PARALLELIOMGR_H
2 #define PARALLELIOMGR_H
3 
4 #include "ProcessorPrivate.h"
5 #include "charm++.h"
6 #include "BOCgroup.h"
7 #include "common.h"
8 #include "CompressPsf.h"
9 #include "Hydrogen.h"
10 #include "Vector.h"
11 #include "NamdState.h"
12 #include "Node.decl.h"
13 #include "PatchMgr.h"
14 #include "UniqueSet.h"
15 #include "UniqueSetIter.h"
16 #include "Molecule.h"
17 #include "IndexFile.h"
18 #define COLLECT_PERFORMANCE_DATA 0
19 
20 #ifdef MEM_OPT_VERSION
21 class CollectionMgr;
22 class CollectionMaster;
23 class CollectionMidMaster;
24 class CollectMidVectorInstance;
25 #endif
26 
29 class PatchMap;
30 
31 #include "ParallelIOMgr.decl.h"
32 
34 class MolInfoMsg : public CMessage_MolInfoMsg
35 {
36 public:
37  int numBonds;
39  int numAngles;
52 
54 
55  //used for "comMov" is false in the SimParameter object
56  //which is usually true -Chao Mei
59 
61 };
62 
63 class HydroBasedMsg : public CMessage_HydroBasedMsg
64 {
65 //Info inside this message needs to be calculated when the hydrogen
66 //group is constructed
67 public:
70 };
71 
72 class MoveInputAtomsMsg : public CMessage_MoveInputAtomsMsg
73 {
74 public:
75  int length;
77 };
78 
79 class AtomsCntPerPatchMsg: public CMessage_AtomsCntPerPatchMsg
80 {
81 public:
82  //one-to-one mapping between two lists
83  int length;
85  unsigned short *atomsCntList;
86  unsigned short *fixedAtomsCntList;
87 };
88 
89 class MovePatchAtomsMsg: public CMessage_MovePatchAtomsMsg
90 {
91 public:
92  //the total size of allAtoms could be calculated from sizeList
93  int from;
94  int patchCnt;
96  int *sizeList;
98 };
99 
100 
102 
104 struct ClusterElem {
106  int atomsCnt;
107 
109  ClusterElem(int cid) : clusterId(cid), atomsCnt(0) {}
110  int hash() const {
111  return clusterId;
112  }
113  int operator==(const ClusterElem &a) const {
114  return clusterId == a.clusterId;
115  }
116 };
119 
120 class ClusterSizeMsg : public CMessage_ClusterSizeMsg
121 {
122 public:
123  int srcRank;
125  int atomsCnt;
126 };
128 
130  int clusterId;
132 
134  ClusterCoorElem(int cid): clusterId(cid), dsum(0.0) {}
135  int hash() const {
136  return clusterId;
137  }
138  int operator==(const ClusterCoorElem &a) const {
139  return clusterId == a.clusterId;
140  }
141 };
144 
145 class ClusterCoorMsg : public CMessage_ClusterCoorMsg
146 {
147 public:
148  int srcRank;
149  int clusterId;
151 };
154 
155 class ParallelIOMgr : public CBase_ParallelIOMgr
156 {
157 #ifdef MEM_OPT_VERSION
158  friend class CollectionMgr;
159  friend class CollectionMaster;
160  friend class CollectionMidMaster;
161  friend class CollectMidVectorInstance;
162 #endif
163 
164 private:
165  SimParameters *simParameters;
166  Molecule *molecule;
167 
169  int numInputProcs;
170  int *inputProcArray;
171  //the index to the inputProcArray i.e.
172  //inputProcArray[myInputRank] == CkMyPe();
173  //if it is not a input proc, the rank is -1;
174  int myInputRank;
175 
176  //Initially this atom list contains the initially assigned
177  //atoms, later it will contain atoms from other input processors
178  //based on the migration group
179  InputAtomList initAtoms;
180 
181  //This atom list contains the migrated atoms from other input
182  //processors based on the migration group. Once the migration
183  //finishes, atoms in this list is added to initAtoms, and its
184  //space is freed.
185  InputAtomList tmpRecvAtoms;
186 
187 
188  //This variable indicates whether this processor is ready
189  //to receive atoms of HomePatches to be created on this
190  //processor
191  bool isOKToRecvHPAtoms;
192  FullAtomList *hpAtomsList;
193  ResizeArray<int> hpIDList; //in increasing order
194 
195  //tmp variables
196  int procsReceived; //used at updateMolInfo and recvAtomsCntPerPatch
197  int hydroMsgRecved; //used at recvHydroBasedCounter
198  Vector totalMV; //used to remove center of mass motion
199  BigReal totalMass; //used to remove center of mass motion
200  BigReal totalCharge;
201  int64 numTotalExclusions;
202  int64 numCalcExclusions;
203  int64 numCalcFullExclusions;
205 
207  int numOutputProcs;
208  int *outputProcArray;
209  char *outputProcFlags;
210  //the index to the outputProcArray i.e.
211  //outputProcArray[myOutputRank] == CkMyPe();
212  //if it is not a output proc, the rank is -1;
213  int myOutputRank;
214  //the number of simutaneous writers
215  //output procs with rank distance of numOutputProcs/numOutputWrts do the
216  //output at a time
217  int numOutputWrts;
218 
219  int numProxiesPerOutputProc;
220  int myOutputProxyRank;
221  int *outputProxyArray;
222  int *myOutputProxies;
223 
224  int calcMyOutputProxyClients();
225 
226  CollectProxyVectorSequence *myOutputProxyPositions;
227  CollectProxyVectorSequence *myOutputProxyVelocities;
228  CollectProxyVectorSequence *myOutputProxyForces;
229 
230  //both arrays are of size #local atoms on this output proc
231  int *clusterID;
232  int *clusterSize;
233  //record the number of atoms that a remote cluster has on this
234  //output processor
235  ClusterSet remoteClusters;
236  //record the number of clusters that have atoms on other output procs
237  //on this output proc. Should be remoteClusters.size();
238  int numRemoteClusters;
239  //TEMP var to indicate how many msgs from remote proc have been recved
240  //for updating cluster sizes in my local repository (linked with
241  //numRemoteClusters.
242  int numCSMAck;
243 
244  ClusterSizeMsgBuffer csmBuf; //used to buffer cluster size msgs
245  //record the number of remote cluster info queries for this output proc.
246  //i.e. SUM(for a particular cluster on this local output proc,
247  //the number of remote output procs that has some atoms belonging
248  //to this cluster). Should be csmBuf.size();
249  int numRemoteReqs;
250  //TEMP var to record the number of remote cluster info queries that
251  //has received (linked with numRemoteReqs)
252  int numReqRecved;
253 
254  //used to store the caculated centralized coordinates for each cluster
255  //on this local output proc.
256  ClusterCoorSet remoteCoors; //similar to remoteClusters
257  ClusterCoorMsgBuffer ccmBuf; //similar to csmBuf but for cluster coor msgs
258  Position *tmpCoorCon;
259  //the array is of size #local atoms on this output proc
260  char *isWater;
261 
262 #ifdef MEM_OPT_VERSION
263  CollectMidVectorInstance *coorInstance;
264  CollectionMidMaster *midCM;
265 #endif
266 
267  CkChareID mainMaster;
268 
270 
271 #if COLLECT_PERFORMANCE_DATA
272  int numFixedAtomLookup;
273 #endif
274 
275 private:
276  void readCoordinatesAndVelocity();
277  //create atom lists that are used for creating home patch
278  void prepareHomePatchAtomList();
279  //returns the index in hpIDList which points to pid
280  int binaryFindHPID(int pid);
281 
282  void readInfoForParOutput();
283 
284  void integrateClusterCoor();
285 
286  int numMyAtoms(int rank, int numProcs);
287  //returns the number of atoms INITIALLY assigned on this input processor
288  inline int numInitMyAtomsOnInput() {
289  return numMyAtoms(myInputRank, numInputProcs);
290  }
291  inline int numMyAtomsOnOutput() {
292  return numMyAtoms(myOutputRank, numOutputProcs);
293  }
294 
295  int atomRank(int atomID, int numProcs);
296  //returns the rank of the input proc that the atom resides on INITIALLY
297  inline int atomInitRankOnInput(int atomID) {
298  return atomRank(atomID, numInputProcs);
299  }
300  inline int atomRankOnOutput(int atomID) {
301  return atomRank(atomID, numOutputProcs);
302  }
303 
304  void getMyAtomsRange(int &lowerIdx, int &upperIdx, int rank, int numProcs);
305 
306  //get the range of atoms to be read based on the initial distribution
307  //i.e. atoms from [lowerIdx ... upperIdx] are going to be loaded
308  inline void getMyAtomsInitRangeOnInput(int &lowerIdx, int &upperIdx) {
309  return getMyAtomsRange(lowerIdx,upperIdx,myInputRank,numInputProcs);
310  }
311  inline void getMyAtomsRangeOnOutput(int &lowerIdx, int &upperIdx) {
312  return getMyAtomsRange(lowerIdx,upperIdx,myOutputRank,numOutputProcs);
313  }
314  inline void getAtomsRangeOnOutput(int &lowerIdx, int &upperIdx, int rank) {
315  return getMyAtomsRange(lowerIdx,upperIdx,rank,numOutputProcs);
316  }
317  //This function is only valid for the inital distribution of input atoms
318  //mySAId: the atom id this input proc starts with
319  //regAId: the atom id to look up
320  inline int isAtomFixed(int mySAId, int reqAId){
321  int localIdx = reqAId-mySAId;
322  if(localIdx>=0 && localIdx<initAtoms.size()){
323  //atom "thisAId" is on this input proc now!
324  return initAtoms[localIdx].atomFixed;
325  } else{
326  #if COLLECT_PERFORMANCE_DATA
327  numFixedAtomLookup++;
328  #endif
329  //atom "thisAId" is NOT on this input proc now!
330  return molecule->is_atom_fixed(reqAId);
331  }
332  }
333 
334  //This function returns the highest rank of this output group procs
335  inline int getMyOutputGroupHighestRank(){
336  int step = numOutputProcs/numOutputWrts;
337  int remains = numOutputProcs%numOutputWrts;
338  //so "remains" output groups contain "step+1" output procs;
339  //"numOutputWrts-remains" output groups contain "step" output procs;
340  int limit = (step+1)*remains;
341  if(myOutputRank<limit){
342  int idx = myOutputRank/(step+1);
343  return (idx+1)*(step+1)-1;
344  }else{
345  int idx = (myOutputRank-limit)/step;
346  return limit+(idx+1)*step-1;
347  }
348  }
349 public:
350  ParallelIOMgr();
351  ~ParallelIOMgr();
352 
353  void initialize(Node *node);
354 
355  //read per-atom files including the binary compressed psf
356  //file, coordinate and velocity files
357  void readPerAtomInfo();
358 
359  // determine dcdselection intersection with output processor
361  //migrate the initally assigned atoms to appropriate input processors
362  //based on the migration group
363  void migrateAtomsMGrp();
364  void recvAtomsMGrp(MoveInputAtomsMsg *msg);
365  void integrateMigratedAtoms();
366 
367  //Reduce counters for Tuples and Exclusions in Molecule globally
368  void updateMolInfo();
369  void recvMolInfo(MolInfoMsg *msg);
370  void bcastMolInfo(MolInfoMsg *msg);
373  void sendDcdParams();
374  void recvDcdParams(std::vector<uint16> tags,
375  std::vector<std::string> inputFileNames,
376  std::vector<std::string> outputFileNames,
377  std::vector<int> freqs,
378  std::vector<OUTPUTFILETYPE> types);
379  //calculate #atoms in each patch and reduce to proc 0
380  void calcAtomsInEachPatch();
382 
383  //distribute atoms to their homepatch processors
384  CthThread sendAtomsThread;
389 
390  //create home patches on this processor
391  void createHomePatches();
392 
393  // set the offset and size for each dcdselection in each output processor
395 
396  //free the space occupied by atoms' names etc.
397  void freeMolSpace();
398 
399  //used in parallel IO output
400  int getNumOutputProcs() { return numOutputProcs; }
401  bool isOutputProcessor(int pe);
402 
403  void recvClusterSize(ClusterSizeMsg *msg);
404  void integrateClusterSize();
406 
410  void disposePositions(int seq, double prevT);
411  void disposeVelocities(int seq, double prevT);
412  void disposeForces(int seq, double prevT);
413 
414  void wrapCoor(int seq, Lattice lat);
415  void recvClusterCoor(ClusterCoorMsg *msg);
417 
418 };
419 
420 #endif
int64 numCalcFullExclusions
Definition: ParallelIOMgr.h:49
int hash() const
unsigned short * fixedAtomsCntList
Definition: ParallelIOMgr.h:86
void sendAtomsToHomePatchProcs()
int size(void) const
Definition: ResizeArray.h:131
Definition: Node.h:78
int getNumOutputProcs()
void recvClusterSize(ClusterSizeMsg *msg)
int numCalcCrossterms
Definition: ParallelIOMgr.h:46
Definition: Vector.h:72
UniqueSet< ClusterCoorElem > ClusterCoorSet
int operator==(const ClusterCoorElem &a) const
void integrateClusterSize()
void recvAtomsMGrp(MoveInputAtomsMsg *msg)
void readPerAtomInfo()
InputAtom * atomList
Definition: ParallelIOMgr.h:76
void disposePositions(int seq, double prevT)
int64 numCalcExclusions
Definition: ParallelIOMgr.h:48
void migrateAtomsMGrp()
UniqueSetIter< ClusterElem > ClusterSetIter
int numRigidBonds
Definition: ParallelIOMgr.h:53
Molecule stores the structural information for the system.
Definition: Molecule.h:174
void wrapCoor(int seq, Lattice lat)
int numCalcDihedrals
Definition: ParallelIOMgr.h:42
ResizeArray< ClusterCoorMsg * > ClusterCoorMsgBuffer
void receiveForces(CollectVectorVarMsg *msg)
void recvAtomsToHomePatchProcs(MovePatchAtomsMsg *msg)
void integrateMigratedAtoms()
int numDihedrals
Definition: ParallelIOMgr.h:41
void disposeForces(int seq, double prevT)
void sendDcdParams()
void initializeDcdSelectionParams()
BigReal totalCharge
Definition: ParallelIOMgr.h:60
void recvClusterCoor(ClusterCoorMsg *msg)
void updateMolInfo()
void recvDcdParams(std::vector< uint16 > tags, std::vector< std::string > inputFileNames, std::vector< std::string > outputFileNames, std::vector< int > freqs, std::vector< OUTPUTFILETYPE > types)
void recvAtomsCntPerPatch(AtomsCntPerPatchMsg *msg)
int64 numExclusions
Definition: ParallelIOMgr.h:47
BigReal totalMass
Definition: ParallelIOMgr.h:57
void recvHydroBasedCounter(HydroBasedMsg *msg)
CthThread sendAtomsThread
FullAtom * allAtoms
Definition: ParallelIOMgr.h:97
void recvFinalClusterCoor(ClusterCoorMsg *msg)
void readInfoForParOutDcdSelection()
int numCalcImpropers
Definition: ParallelIOMgr.h:44
int numCrossterms
Definition: ParallelIOMgr.h:45
int numCalcLJPairs
Definition: ParallelIOMgr.h:51
void ackAtomsToHomePatchProcs()
ClusterCoorElem(int cid)
void initialize(Node *node)
int numFixedRigidBonds
Definition: ParallelIOMgr.h:68
void recvFinalClusterSize(ClusterSizeMsg *msg)
void disposeVelocities(int seq, double prevT)
void calcAtomsInEachPatch()
ResizeArray< ClusterSizeMsg * > ClusterSizeMsgBuffer
unsigned short * atomsCntList
Definition: ParallelIOMgr.h:85
bool isOutputProcessor(int pe)
int numCalcBonds
Definition: ParallelIOMgr.h:38
UniqueSetIter< ClusterCoorElem > ClusterCoorSetIter
void bcastHydroBasedCounter(HydroBasedMsg *msg)
int hash() const
Vector totalMV
Definition: ParallelIOMgr.h:58
ClusterElem(int cid)
void receiveVelocities(CollectVectorVarMsg *msg)
Bool is_atom_fixed(int atomnum) const
Definition: Molecule.h:1504
int numImpropers
Definition: ParallelIOMgr.h:43
void recvMolInfo(MolInfoMsg *msg)
int operator==(const ClusterElem &a) const
int64_t int64
Definition: common.h:39
int32 PatchID
Definition: NamdTypes.h:287
void bcastMolInfo(MolInfoMsg *msg)
void createHomePatches()
UniqueSet< ClusterElem > ClusterSet
double BigReal
Definition: common.h:123
void receivePositions(CollectVectorVarMsg *msg)
int numCalcAngles
Definition: ParallelIOMgr.h:40