NAMD
ParallelIOMgr.C
Go to the documentation of this file.
1 
2 #include "largefiles.h" // must be first!
3 
4 #include <stdio.h>
5 #include <filesystem>
6 #include "BOCgroup.h"
7 #include "Molecule.h"
8 #include "Node.h"
9 #include "Node.decl.h"
10 #include "NamdState.h"
11 #include "WorkDistrib.h"
12 #include "PDB.h"
13 #include "PatchMap.h"
14 #include "PatchMap.inl"
15 #include "packmsg.h"
16 #include "HomePatch.h"
17 #include "InfoStream.h"
18 #include "CollectionMaster.h"
19 #include "CollectionMgr.h"
20 
21 #include "ParallelIOMgr.decl.h"
22 #include "ParallelIOMgr.h"
23 
24 #include "Output.h"
25 #include "Random.h"
26 
27 #include <algorithm>
28 #define MIN_DEBUG_LEVEL 3
29 //#define DEBUGM
30 #include "Debug.h"
31 
32 using namespace std;
33 
34 #if !(defined(__NVCC__) || defined(__HIPCC__))
35 #include <pup.h>
37 #endif
38 
39 
40 // data structures for IO proxy, all destined for same IO processor
41 
43  public:
44 
45  CollectProxyVectorInstance() : seq(-10) { ; }
46 
47  void free() { seq = -10; }
48  int notfree() { return ( seq != -10 ); }
49 
50  void reset(int s, CollectVectorVarMsg::DataStatus v, int numClients) {
51  if ( s == -10 ) NAMD_bug("seq == free in CollectionMgr");
52  seq = s;
53  vstatus = v;
54  remaining = numClients;
55  aid.resize(0);
56  data.resize(0);
57  fdata.resize(0);
58  }
59 
60  // true -> send it and delete it!
62  {
63  if ( msg->status != vstatus ) {
64  NAMD_bug("CollectProxyVectorInstance vstatus mismatch");
65  }
66  if ( msg->seq != seq ) {
67  NAMD_bug("CollectProxyVectorInstance seq mismatch");
68  }
69  int size = msg->size;
70  for( int i = 0; i < size; ++i ) { aid.add(msg->aid[i]); }
71  if ( vstatus == CollectVectorVarMsg::VectorValid ||
72  vstatus == CollectVectorVarMsg::BothValid ) {
73  for( int i = 0; i < size; ++i ) { data.add(msg->data[i]); }
74  }
75  if ( vstatus == CollectVectorVarMsg::FloatVectorValid ||
76  vstatus == CollectVectorVarMsg::BothValid ) {
77  for( int i = 0; i < size; ++i ) { fdata.add(msg->fdata[i]); }
78  }
79  const int atoms_per_message_target = 100000;
80  return ( ! --remaining || aid.size() > atoms_per_message_target );
81  }
82 
84  int numAtoms = aid.size();
86  if ( ! numAtoms ) {
87  msg = 0;
88  } else if ( vstatus == CollectVectorVarMsg::VectorValid) {
89  msg = new(numAtoms, numAtoms, 0, 0) CollectVectorVarMsg;
90  for(int j=0; j<numAtoms; j++) {
91  msg->aid[j] = aid[j];
92  msg->data[j] = data[j];
93  }
94  } else if (vstatus == CollectVectorVarMsg::FloatVectorValid) {
95  msg = new(numAtoms, 0, numAtoms, 0) CollectVectorVarMsg;
96  for(int j=0; j<numAtoms; j++) {
97  msg->aid[j] = aid[j];
98  msg->fdata[j] = fdata[j];
99  }
100  } else {
101  msg = new(numAtoms, numAtoms, numAtoms, 0) CollectVectorVarMsg;
102  for(int j=0; j<numAtoms; j++) {
103  msg->aid[j] = aid[j];
104  msg->data[j] = data[j];
105  msg->fdata[j] = fdata[j];
106  }
107  }
108  if ( msg ) {
109  msg->seq = seq;
110  msg->size = numAtoms;
111  msg->status = vstatus;
112  }
113  if ( remaining ) reset(seq,vstatus,remaining);
114  else free();
115  return msg;
116  }
117 
118  int seq;
123 
124  private:
125  int remaining;
126 
127  };
128 
130  public:
131  CollectProxyVectorSequence(int nc) : numClients(nc) { ; }
132 
134  CollectProxyVectorInstance **c = data.begin();
135  CollectProxyVectorInstance **c_e = data.end();
136  for( ; c != c_e && (*c)->seq != msg->seq; ++c );
137  if ( c == c_e ) {
138  c = data.begin();
139  for( ; c != c_e && (*c)->notfree(); ++c );
140  if ( c == c_e ) {
141  data.add(new CollectProxyVectorInstance);
142  c = data.end() - 1;
143  }
144  (*c)->reset(msg->seq,msg->status,numClients);
145  }
146  if ( (*c)->append(msg) ) {
147  CollectVectorVarMsg *newmsg = (*c)->buildMsg();
148  return newmsg;
149  } else {
150  return 0;
151  }
152  }
153 
155 
156  private:
157  int numClients;
158 
159  };
160 
161 
163 {
164  CkpvAccess(BOCclass_group).ioMgr = thisgroup;
165 
166  numInputProcs=-1;
167  inputProcArray = NULL;
168  numOutputProcs=-1;
169  outputProcArray = NULL;
170 
171  procsReceived=0;
172  hydroMsgRecved=0;
173 
174  totalMV.x = totalMV.y = totalMV.z = 0.0;
175  totalMass = 0.0;
176  totalCharge = 0.0;
177  numTotalExclusions = 0;
178  numCalcExclusions = 0;
179  numCalcFullExclusions = 0;
180 
181  isOKToRecvHPAtoms = false;
182  hpAtomsList = NULL;
183 
184  clusterID = NULL;
185  clusterSize = NULL;
186 
187 #ifdef MEM_OPT_VERSION
188  midCM = NULL;
189 #endif
190 
191  isWater = NULL;
192 
193  numCSMAck = 0;
194  numReqRecved = 0;
195 
196  sendAtomsThread = 0;
197 
198 #if COLLECT_PERFORMANCE_DATA
199  numFixedAtomLookup = 0;
200 #endif
201 }
202 
204 {
205  delete [] inputProcArray;
206  delete [] outputProcArray;
207  delete [] clusterID;
208  delete [] clusterSize;
209 
210 #ifdef MEM_OPT_VERSION
211  delete midCM;
212 #endif
213 
214  delete [] isWater;
215 }
216 
217 #ifndef OUTPUT_SINGLE_FILE
218 #error OUTPUT_SINGLE_FILE not defined!
219 #endif
220 
221 // initialization needed for the parallel IO manager. Entry point through Scripttcl
223 {
224  simParameters = node->simParameters;
225  molecule = node->molecule;
226 
227  numInputProcs = simParameters->numinputprocs;
228  numOutputProcs = simParameters->numoutputprocs;
229  numOutputWrts = simParameters->numoutputwrts;
230 
231  numProxiesPerOutputProc = std::min((int)sqrt(CkNumPes()),(CkNumPes()-1)/numOutputProcs-1);
232  if ( numProxiesPerOutputProc < 2 ) numProxiesPerOutputProc = 0;
233 
234  if(!CkMyPe()) {
235  iout << iINFO << "Running with " <<numInputProcs<<" input processors.\n"<<endi;
236  #if OUTPUT_SINGLE_FILE
237  iout << iINFO << "Running with " <<numOutputProcs<<" output processors ("<<numOutputWrts<<" of them will output simultaneously).\n"<<endi;
238  #else
239  iout << iINFO << "Running with " <<numOutputProcs<<" output processors, and each of them will output to its own separate file.\n"<<endi;
240  #endif
241  if ( numProxiesPerOutputProc ) {
242  iout << iINFO << "Running with " <<numProxiesPerOutputProc<<" proxies per output processor.\n"<<endi;
243  }
244  }
245 
246  //build inputProcArray
247  {
248  inputProcArray = new int[numInputProcs];
249  myInputRank = -1;
250  for(int i=0; i<numInputProcs; ++i) {
251  inputProcArray[i] = WorkDistrib::peDiffuseOrdering[(1+numOutputProcs+i)%CkNumPes()];
252  }
253  std::sort(inputProcArray, inputProcArray+numInputProcs);
254  for(int i=0; i<numInputProcs; ++i) {
255  if ( CkMyPe() == inputProcArray[i] ) {
256  if ( myInputRank != -1 ) NAMD_bug("Duplicate input proc");
257  myInputRank = i;
258  }
259  }
260 
261  if(!CkMyPe()) {
262  iout << iINFO << "INPUT PROC LOCATIONS:";
263  int i;
264  for ( i=0; i<numInputProcs && i < 10; ++i ) {
265  iout << " " << inputProcArray[i];
266  }
267  if ( i<numInputProcs ) iout << " ... " << inputProcArray[numInputProcs-1];
268  iout << "\n" << endi;
269  }
270  }
271 
272  if(myInputRank!=-1) {
273  //NOTE: this could further be optimized by pre-allocate the memory
274  //for incoming atoms --Chao Mei
275  int numMyAtoms = numInitMyAtomsOnInput();
276  initAtoms.resize(numMyAtoms+100); // extra space for orphan hydrogens
277  initAtoms.resize(numMyAtoms);
278  tmpRecvAtoms.resize(0);
279  } else {
280  initAtoms.resize(0);
281  tmpRecvAtoms.resize(0);
282  }
283  hpIDList.resize(0);
284 
285  //build outputProcArray
286  //spread the output processors across all the processors
287  {
288  outputProcArray = new int[numOutputProcs];
289  outputProcFlags = new char[CkNumPes()];
290  outputProxyArray = new int[numOutputProcs*numProxiesPerOutputProc];
291  myOutputProxies = new int[numOutputProcs];
292  myOutputRank = -1;
293  myOutputProxyRank = -1;
294  for(int i=0; i<numOutputProcs; ++i) {
295  outputProcArray[i] = WorkDistrib::peDiffuseOrdering[(1+i)%CkNumPes()];
296  }
297  std::sort(outputProcArray, outputProcArray+numOutputProcs);
298  for(int i=0; i<numOutputProcs*numProxiesPerOutputProc; ++i) {
299  outputProxyArray[i] = WorkDistrib::peDiffuseOrdering[(1+numOutputProcs+i)%CkNumPes()];
300  }
301  std::sort(outputProxyArray, outputProxyArray+numOutputProcs*numProxiesPerOutputProc,
303  for(int i=0; i<CkNumPes(); ++i) {
304  outputProcFlags[i] = 0;
305  }
306  for(int i=0; i<numOutputProcs; ++i) {
307  outputProcFlags[outputProcArray[i]] = 1;
308  if ( CkMyPe() == outputProcArray[i] ) {
309  if ( myOutputRank != -1 ) NAMD_bug("Duplicate output proc");
310  myOutputRank = i;
311  }
312  }
313  for(int i=0; i<numOutputProcs*numProxiesPerOutputProc; ++i) {
314  if ( CkMyPe() == outputProxyArray[i] ) {
315  if ( myOutputRank != -1 ) NAMD_bug("Output proxy is also output proc");
316  if ( myOutputProxyRank != -1 ) NAMD_bug("Duplicate output proxy");
317  myOutputProxyRank = i;
318  }
319  }
320  int myProxySet = (WorkDistrib::peCompactOrderingIndex[CkMyPe()]*numProxiesPerOutputProc)/CkNumPes();
321  for(int i=0; i<numOutputProcs; ++i) {
322  if ( numProxiesPerOutputProc ) {
323  myOutputProxies[i] = outputProxyArray[myProxySet*numOutputProcs+i];
324  } else {
325  myOutputProxies[i] = outputProcArray[i];
326  }
327  }
328 
329  // delay building sequences until after PatchMap is initialized
330  myOutputProxyPositions = 0;
331  myOutputProxyVelocities = 0;
332  myOutputProxyForces = 0;
333 
334  if(!CkMyPe()) {
335  iout << iINFO << "OUTPUT PROC LOCATIONS:";
336  int i;
337  for ( i=0; i<numOutputProcs && i < 10; ++i ) {
338  iout << " " << outputProcArray[i];
339  }
340  if ( i<numOutputProcs ) iout << " ... " << outputProcArray[numOutputProcs-1];
341  iout << "\n" << endi;
342  }
343  }
344 
345 #ifdef MEM_OPT_VERSION
346  if(myOutputRank!=-1) {
347  midCM = new CollectionMidMaster(this);
348  }
349  remoteClusters.clear();
350  csmBuf.resize(0);
351  remoteCoors.clear();
352  ccmBuf.resize(0);
353 
354  mainMaster = CollectionMgr::Object()->getMasterChareID();
355 #endif
356 }
357 
359  return outputProcFlags[pe];
360 }
361 
362 int isOutputProcessor(int pe){
363  return CProxy_ParallelIOMgr::ckLocalBranch(CkpvAccess(BOCclass_group).ioMgr)->isOutputProcessor(pe);
364 }
365 
366 
367 
369 {
370 #ifdef MEM_OPT_VERSION
371  if(myInputRank!=-1) {
372  int myAtomLIdx, myAtomUIdx;
373  getMyAtomsInitRangeOnInput(myAtomLIdx, myAtomUIdx);
374 
375  //1. read the file that contains per-atom info such as signature index
376  molecule->read_binary_atom_info(myAtomLIdx, myAtomUIdx, initAtoms);
377 
378  //2. read coordinates and velocities of each atom if the velocity file
379  //exists, otherwise, the velocity of each atom is randomly generated.
380  //This has to be DONE AFTER THE FIRST STEP as the atom mass is required
381  //if the velocity is generated randomly.
382  readCoordinatesAndVelocity();
383 
384  //3. set every atom's output processor rank, i.e. the dest pe this
385  //atom will be sent for writing positions and velocities etc.
386  int oRank=atomRankOnOutput(myAtomLIdx);
387  for(int i=oRank; i<numOutputProcs; i++) {
388  int lIdx, uIdx; //indicates the range of atom ids outputProcArray[i] has
389  getAtomsRangeOnOutput(lIdx, uIdx, i);
390  if(lIdx > myAtomUIdx) break;
391  int fid = lIdx>myAtomLIdx?lIdx:myAtomLIdx;
392  int tid = uIdx>myAtomUIdx?myAtomUIdx:uIdx;
393  for(int j=fid; j<=tid; j++) initAtoms[j-myAtomLIdx].outputRank = i;
394  }
395  }
396 
397  //read clusters
398  if(myOutputRank!=-1) {
399  //only when wrapAll or wrapWater is set, cluster info is required
400  if(!(simParameters->wrapAll || simParameters->wrapWater)) return;
401  readInfoForParOutput();
402  }
403 #endif
404 }
405 
406 void ParallelIOMgr::readCoordinatesAndVelocity()
407 {
408 #ifdef MEM_OPT_VERSION
409  int needFlip = 0;
410  int myAtomLIdx, myAtomUIdx;
411  getMyAtomsInitRangeOnInput(myAtomLIdx, myAtomUIdx);
412  int myNumAtoms = myAtomUIdx-myAtomLIdx+1;
413 
414  //contains the data for Position and Velocity
415  Vector *tmpData = new Vector[myNumAtoms];
416 
417  //begin to read coordinates
418  //step1: open the file
419  FILE *ifp = fopen(simParameters->binCoorFile, "rb");
420  if(!ifp) {
421  char s[256];
422  sprintf(s, "The binary coordinate file %s cannot be opened on proc %d\n", simParameters->binCoorFile, CkMyPe());
423  NAMD_err(s);
424  }
425  //step2: check whether flip is needed
426  int filelen;
427  fread(&filelen, sizeof(int32),1,ifp);
428  char lenbuf[sizeof(int32)];
429  memcpy(lenbuf, (const char *)&filelen, sizeof(int32));
430  flipNum(lenbuf, sizeof(int32), 1);
431  if(!memcmp(lenbuf, (const char *)&filelen, sizeof(int32))) {
432  iout << iWARN << "Number of atoms in binary file " << simParameters->binCoorFile
433  <<" is palindromic, assuming same endian.\n" << endi;
434  }
435  if(filelen!=molecule->numAtoms) {
436  needFlip = 1;
437  memcpy((void *)&filelen, lenbuf,sizeof(int32));
438  }
439  if(filelen!=molecule->numAtoms) {
440  char s[256];
441  sprintf(s, "Incorrect atom count in binary file %s", simParameters->binCoorFile);
442  NAMD_die(s);
443  }
444  //step3: read the file specified by the range
445  int64 offsetPos = ((int64)myAtomLIdx)*sizeof(Position);
446 #ifdef WIN32
447  if ( _fseeki64(ifp, offsetPos, SEEK_CUR) )
448 #else
449  if ( fseeko(ifp, offsetPos, SEEK_CUR) )
450 #endif
451  {
452  char s[256];
453  sprintf(s, "Error in seeking binary file %s on proc %d", simParameters->binCoorFile, CkMyPe());
454  NAMD_err(s);
455  }
456  size_t totalRead = fread(tmpData, sizeof(Vector), myNumAtoms, ifp);
457  if(totalRead!=myNumAtoms) {
458  char s[256];
459  sprintf(s, "Error in reading binary file %s on proc %d", simParameters->binCoorFile, CkMyPe());
460  NAMD_err(s);
461  }
462  if(needFlip) flipNum((char *)tmpData, sizeof(BigReal), myNumAtoms*3);
463  fclose(ifp);
464  for(int i=0; i<myNumAtoms; i++) initAtoms[i].position = tmpData[i];
465 
466  //begin to read velocity
467  //step1: generate velocity randomly or open the file
468  if(!simParameters->binVelFile) {
469  //generate velocity randomly
470  Node::Object()->workDistrib->random_velocities_parallel(simParameters->initialTemp, initAtoms);
471  } else {
472  ifp = fopen(simParameters->binVelFile, "rb");
473  if(!ifp) {
474  char s[256];
475  sprintf(s, "The binary velocity file %s cannot be opened on proc %d\n", simParameters->binVelFile, CkMyPe());
476  NAMD_err(s);
477  }
478  //step2: check whether flip is needed
479  fread(&filelen, sizeof(int32),1,ifp);
480  memcpy(lenbuf, (const char *)&filelen, sizeof(int32));
481  flipNum(lenbuf, sizeof(int32), 1);
482  if(!memcmp(lenbuf, (const char *)&filelen, sizeof(int32))) {
483  iout << iWARN << "Number of atoms in binary file " << simParameters->binVelFile
484  <<" is palindromic, assuming same endian.\n" << endi;
485  }
486  if(filelen!=molecule->numAtoms) {
487  needFlip = 1;
488  memcpy((void *)&filelen, lenbuf,sizeof(int32));
489  }
490  if(filelen!=molecule->numAtoms) {
491  char s[256];
492  sprintf(s, "Incorrect atom count in binary file %s", simParameters->binVelFile);
493  NAMD_die(s);
494  }
495 
496  //step3: read the file specified by the range
497  int64 offsetPos = ((int64)myAtomLIdx)*sizeof(Velocity);
498 #ifdef WIN32
499  if ( _fseeki64(ifp, offsetPos, SEEK_CUR) )
500 #else
501  if ( fseeko(ifp, offsetPos, SEEK_CUR) )
502 #endif
503  {
504  char s[256];
505  sprintf(s, "Error in seeking binary file %s on proc %d", simParameters->binVelFile, CkMyPe());
506  NAMD_err(s);
507  }
508  totalRead = fread(tmpData, sizeof(Vector), myNumAtoms, ifp);
509  if(totalRead!=myNumAtoms) {
510  char s[256];
511  sprintf(s, "Error in reading binary file %s on proc %d", simParameters->binVelFile, CkMyPe());
512  NAMD_err(s);
513  }
514  if(needFlip) flipNum((char *)tmpData, sizeof(BigReal), myNumAtoms*3);
515  fclose(ifp);
516  for(int i=0; i<myNumAtoms; i++) initAtoms[i].velocity = tmpData[i];
517  }
518 
519  //begin to read reference coordinates
520  //step1: use initial positions or open the file
521  if(!simParameters->binRefFile) {
522  for(int i=0; i<myNumAtoms; i++) initAtoms[i].fixedPosition = initAtoms[i].position;
523  } else {
524  ifp = fopen(simParameters->binRefFile, "rb");
525  if(!ifp) {
526  char s[256];
527  sprintf(s, "The binary reference coordinate file %s cannot be opened on proc %d\n", simParameters->binRefFile, CkMyPe());
528  NAMD_err(s);
529  }
530  //step2: check whether flip is needed
531  fread(&filelen, sizeof(int32),1,ifp);
532  memcpy(lenbuf, (const char *)&filelen, sizeof(int32));
533  flipNum(lenbuf, sizeof(int32), 1);
534  if(!memcmp(lenbuf, (const char *)&filelen, sizeof(int32))) {
535  iout << iWARN << "Number of atoms in binary file " << simParameters->binRefFile
536  <<" is palindromic, assuming same endian.\n" << endi;
537  }
538  if(filelen!=molecule->numAtoms) {
539  needFlip = 1;
540  memcpy((void *)&filelen, lenbuf,sizeof(int32));
541  }
542  if(filelen!=molecule->numAtoms) {
543  char s[256];
544  sprintf(s, "Incorrect atom count in binary file %s", simParameters->binRefFile);
545  NAMD_die(s);
546  }
547 
548  //step3: read the file specified by the range
549  int64 offsetPos = ((int64)myAtomLIdx)*sizeof(Position);
550 #ifdef WIN32
551  if ( _fseeki64(ifp, offsetPos, SEEK_CUR) )
552 #else
553  if ( fseeko(ifp, offsetPos, SEEK_CUR) )
554 #endif
555  {
556  char s[256];
557  sprintf(s, "Error in seeking binary file %s on proc %d", simParameters->binRefFile, CkMyPe());
558  NAMD_err(s);
559  }
560  totalRead = fread(tmpData, sizeof(Vector), myNumAtoms, ifp);
561  if(totalRead!=myNumAtoms) {
562  char s[256];
563  sprintf(s, "Error in reading binary file %s on proc %d", simParameters->binRefFile, CkMyPe());
564  NAMD_err(s);
565  }
566  if(needFlip) flipNum((char *)tmpData, sizeof(BigReal), myNumAtoms*3);
567  fclose(ifp);
568  for(int i=0; i<myNumAtoms; i++) initAtoms[i].fixedPosition = tmpData[i];
569  }
570 
571  delete [] tmpData;
572 #endif
573 }
574 
575 void ParallelIOMgr::readInfoForParOutput()
576 {
577  int fromIdx, toIdx; //atoms' range
578  getMyAtomsRangeOnOutput(fromIdx,toIdx);
579  int numMyAtoms = toIdx-fromIdx+1;
580 
581  clusterID = new int[numMyAtoms];
582  clusterSize = new int[numMyAtoms];
583 
584  //Since the input proc also reads this file, all the checks
585  //(file version check, atom record size check) are omitted here
586  FILE *ifp = fopen(simParameters->binAtomFile, "rb");
587  //read magic number to set needFlip
588  int needFlip = 0;
589  int magicNum;
590  fread(&magicNum, sizeof(int), 1, ifp);
591  if (magicNum!=COMPRESSED_PSF_MAGICNUM) {
592  needFlip = 1;
593  }
594 
595  //need to load isWater info
596  isWater = new char[numMyAtoms];
597  //seek from the end of the file (note offset is negative!)
598  int64 offset = sizeof(char)*((int64)(fromIdx-molecule->numAtoms));
599 #ifdef WIN32
600  if ( _fseeki64(ifp, offset, SEEK_END) )
601 #else
602  if ( fseeko(ifp, offset, SEEK_END) )
603 #endif
604  {
605  char s[256];
606  sprintf(s, "Error in seeking binary file %s on proc %d", simParameters->binAtomFile, CkMyPe());
607  NAMD_err(s);
608  }
609  fread(isWater, sizeof(char), numMyAtoms, ifp);
610  //there's no need for flipping as it's a char array
611 
612  //seek from the end of the file (note offset is negative!)
613  offset = sizeof(int)*((int64)(fromIdx-molecule->numAtoms))
614  - sizeof(char)*((int64)(molecule->numAtoms));
615 #ifdef WIN32
616  if ( _fseeki64(ifp, offset, SEEK_END) )
617 #else
618  if ( fseeko(ifp, offset, SEEK_END) )
619 #endif
620  {
621  char s[256];
622  sprintf(s, "Error in seeking binary file %s on proc %d", simParameters->binAtomFile, CkMyPe());
623  NAMD_err(s);
624  }
625  fread(clusterID, sizeof(int), numMyAtoms, ifp);
626  if(needFlip) flipNum((char *)clusterID, sizeof(int), numMyAtoms);
627  fclose(ifp);
628 
629  //calculate cluster size (communication may be neccessary)
630  ClusterElem one;
631  for(int i=0; i<numMyAtoms; i++) {
632  clusterSize[i] = 0;
633  int cid = clusterID[i];
634  //check if the cluster id (i.e. the header of the atom of
635  //the cluster) is in my local repository. If the cluster id
636  //is not in my local repository, then it must appear in output
637  //processors that are in front of this one because of the way
638  //of determing cluster info for the molecular system.
639  CmiAssert(cid<=toIdx);
640  if(cid<fromIdx) {
641  //on output procs ahead of me
642  one.clusterId = cid;
643  ClusterElem *ret = remoteClusters.find(one);
644  if(ret==NULL) {
645  one.atomsCnt = 1;
646  remoteClusters.add(one);
647  } else {
648  ret->atomsCnt++;
649  }
650  } else {
651  int lidx = cid-fromIdx;
652  CmiAssert(lidx<=i);
653  clusterSize[lidx]++;
654  }
655  }
656 
657  //Prepare to send msgs to remote output procs to reduce the cluster size
658  //Since the expected number of msgs to be very small, msgs to the same proc
659  //are not aggregated. --Chao Mei
660 #if 0
661  printf("output[%d]=%d: prepare to send %d remote msgs for cluster size\n",
662  myOutputRank, CkMyPe(), remoteClusters.size());
663 #endif
664 
665  numRemoteClusters = remoteClusters.size();
666  numCSMAck = 0; //set to 0 to prepare recving the final cluster size update
667  CProxy_ParallelIOMgr pIO(thisgroup);
668  ClusterSetIter iter(remoteClusters);
669  for(iter=iter.begin(); iter!=iter.end(); iter++) {
670  ClusterSizeMsg *msg = new ClusterSizeMsg;
671  msg->srcRank = myOutputRank;
672  msg->clusterId = iter->clusterId;
673  msg->atomsCnt = iter->atomsCnt;
674  int dstRank = atomRankOnOutput(iter->clusterId);
675  pIO[outputProcArray[dstRank]].recvClusterSize(msg);
676  }
677 }
678 
680 {
681 
682  Molecule *mol = Node::Object()->molecule;
683  std::vector<string> inputs;
684  std::vector<string> outputs;
685  std::vector<uint16> tags;
686  std::vector<int> freqs;
687  std::vector<OUTPUTFILETYPE> types;
688  if(mol->dcdSelectionAtoms)
689  {
690  for(int index=0; index<16;++index)
691  {
692  tags.push_back(mol->dcdSelectionParams[index].tag);
693  freqs.push_back(mol->dcdSelectionParams[index].frequency);
694  types.push_back(mol->dcdSelectionParams[index].type);
695  inputs.push_back(mol->dcdSelectionParams[index].inputFilename);
696  outputs.push_back(mol->dcdSelectionParams[index].outFilename);
697  }
698  CProxy_ParallelIOMgr pIO(thisgroup);
699  pIO.recvDcdParams(tags,
700  inputs,
701  outputs,
702  freqs,
703  types
704  );
705  }
706 }
707 
708 
709 /*
710  */
711 
712 void ParallelIOMgr::recvDcdParams(std::vector<uint16> tags,
713  std::vector<std::string> inputFileNames,
714  std::vector<std::string> outputFileNames,
715  std::vector<int> freqs,
716  std::vector<OUTPUTFILETYPE> types)
717 {
718  // only the 0th rank updates the molecule object
719  if(!CkMyRank())
720  {
721  Molecule *mol = Node::Object()->molecule;
722  mol->dcdSelectionAtoms = true;
723  for(int index=0; index < 16 ;++index)
724  {
725  mol->dcdSelectionParams[index].tag = tags[index];
726  strncpy(mol->dcdSelectionParams[index].inputFilename, inputFileNames[index].c_str(),NAMD_FILENAME_BUFFER_SIZE);
727  strncpy(mol->dcdSelectionParams[index].outFilename, outputFileNames[index].c_str(),NAMD_FILENAME_BUFFER_SIZE);
728  mol->dcdSelectionParams[index].frequency = freqs[index];
729  mol->dcdSelectionParams[index].type = types[index];
730  }
731  }
732 }
733 
734 /* if dcd selection is active we need the output processors
735  * to compute their offsets for each dcd selection file.
736  *
737  */
739 {
740 #ifdef MEM_OPT_VERSION
741  // for each selection file, open, scan to fromIdx
742  // keep counting matches until toIdx
743  if(myOutputRank>=0)
744  { // only do this on output processors
745  Molecule *mol = Node::Object()->molecule;
746  int selFromIdx, selToIdx; //atoms' range
747  getMyAtomsRangeOnOutput(selFromIdx, selToIdx);
748  DebugM(3, "["<<CkMyPe()<<"]"<< " ParallelIOMgr::readInfoForParOutDcdSelection from " << selFromIdx << " to " << selToIdx <<"\n");
749  for(int index=0; index < 16; ++index)
750  {
751  char *dcdSelectionInputFile = mol->dcdSelectionParams[index].inputFilename;
752  if(filesystem::path(dcdSelectionInputFile).extension() == ".idx")
753  {
754  // binary index file, each element in it is in the selection
755 
756  IndexFile dcdSelectionInputIdx(dcdSelectionInputFile);
757  std::vector<uint32> indexVec=dcdSelectionInputIdx.getAllElements();
758  // find the range for our from_to on the selection
759  // lower bound will do a binary search so we avoid scanning
760  // irrelevant elements
761  auto start = std::lower_bound(indexVec.begin(), indexVec.end(), selFromIdx);
762  off_t startOffset = std::distance(indexVec.begin(), start);
763  auto end = std::lower_bound(start, indexVec.end(), selToIdx);
764  size_t size = std::distance(start,end);
765  // compute the map(localOffset -> globalId)
766  std::vector <uint32> dcdSelectionIndexReverseMap(size);
767  for(size_t offset=0; offset<size; ++offset)
768  {
769  dcdSelectionIndexReverseMap[offset] = indexVec[startOffset+offset];
770  }
771  mol->dcdSelectionParams[index].size= indexVec.size();
772  midCM->parOut->setDcdSelectionParams(index, startOffset, size, dcdSelectionIndexReverseMap);
773  }
774  else
775  { // no such file present
776  }
777  }
778  }
779 #endif
780 }
781 
783 {
784  csmBuf.add(msg); //added to buffer for reuse to send back to src
785 
786  //update cluster size has to be delayed to integration to prevent
787  //data racing where the clusterSize has not been created!
788 }
789 
791 {
792  if(myOutputRank==-1) return;
793  if(!(simParameters->wrapAll || simParameters->wrapWater)) return;
794 
795  int fromIdx, toIdx; //atoms' range
796  getMyAtomsRangeOnOutput(fromIdx,toIdx);
797 
798  //calculated the final cluster size
799  for(int i=0; i<csmBuf.size(); i++) {
800  ClusterSizeMsg *msg = csmBuf[i];
801  int lidx = msg->clusterId - fromIdx;
802  clusterSize[lidx] += msg->atomsCnt;
803  }
804 
805  CProxy_ParallelIOMgr pIO(thisgroup);
806  for(int i=0; i<csmBuf.size(); i++) {
807  ClusterSizeMsg *msg = csmBuf[i];
808  int lidx = msg->clusterId - fromIdx;
809  msg->atomsCnt = clusterSize[lidx];
810  pIO[outputProcArray[msg->srcRank]].recvFinalClusterSize(msg);
811  }
812  numRemoteReqs = csmBuf.size();
813  csmBuf.resize(0);
814 
815  //There's a possible msg race problem here that recvFinalClusterSize
816  //executes before integrateClusterSize because other proc finishes faster
817  //in calculating the cluster size. The recvFinalClusterSize should be
818  //executed after integrateClusterSize. To avoid this, a self message is
819  //sent to participate the reduction.
820  if(numRemoteClusters!=0){
821  recvFinalClusterSize(NULL);
822  }else{
823  //this output proc already has the final cluster size for each atom
824  int numMyAtoms = toIdx-fromIdx+1;
825  for(int i=0; i<numMyAtoms; i++) {
826  int lidx = clusterID[i]-fromIdx;
827  clusterSize[i] = clusterSize[lidx];
828  }
829 
830  #if 0 //write out cluster debug info
831  char fname[128];
832  sprintf(fname, "cluster.par.%d", CkMyPe());
833  FILE *ofp = fopen(fname, "w");
834  for(int i=0; i<numMyAtoms; i++) {
835  fprintf(ofp, "%d: %d: %d\n", i+fromIdx, clusterID[i], clusterSize[i]);
836  }
837  fclose(ofp);
838  #endif
839  }
840 }
841 
843 {
844  //only process the message sent by other procs
845  if(msg!=NULL) {
846  //indicating a message from other procs
847  ClusterElem one(msg->clusterId);
848  ClusterElem *ret = remoteClusters.find(one);
849  CmiAssert(ret!=NULL);
850  ret->atomsCnt = msg->atomsCnt;
851  }
852  delete msg;
853 
854  //include a msg sent by itself for reduction
855  if(++numCSMAck == (numRemoteClusters+1)) {
856  //recved all the msgs needed to update the cluster size for each atom finally
857  int fromIdx, toIdx; //atoms' range
858  getMyAtomsRangeOnOutput(fromIdx,toIdx);
859  int numMyAtoms = toIdx-fromIdx+1;
860  ClusterElem tmp;
861  for(int i=0; i<numMyAtoms; i++) {
862  int cid = clusterID[i];
863  int lidx = cid-fromIdx;
864  if(lidx<0) {
865  //this cid should be inside remoteClusters
866  tmp.clusterId = cid;
867  ClusterElem *fone = remoteClusters.find(tmp);
868  clusterSize[i] = fone->atomsCnt;
869  } else {
870  clusterSize[i] = clusterSize[lidx];
871  }
872  }
873  numCSMAck = 0;
874  remoteClusters.clear();
875 
876 #if 0 //write out cluster debug info
877  char fname[128];
878  sprintf(fname, "cluster.par.%d", CkMyPe());
879  FILE *ofp = fopen(fname, "w");
880  for(int i=0; i<numMyAtoms; i++) {
881  fprintf(ofp, "%d: %d: %d\n", i+fromIdx, clusterID[i], clusterSize[i]);
882  }
883  fclose(ofp);
884 #endif
885 
886  }
887 }
888 
890 {
891  if(myInputRank==-1) return;
892 
893  //1. first get the list of atoms to be migrated
894  //which should be few compared with the number of atoms
895  //initially assigned to this input proc.
896  AtomIDList toMigrateList; //the list of atoms to be migrated
897  //the max distance from this processor of atoms to be sent
898  int maxOffset = 0;
899  for(int i=0; i<initAtoms.size(); i++) {
900  //returns the proc id on which atom MPID resides on
901  int parentRank = atomInitRankOnInput(initAtoms[i].MPID);
902  if(parentRank != myInputRank) {
903  toMigrateList.add(i);
904  initAtoms[i].isValid = false;
905  int tmp = parentRank - myInputRank;
906  tmp = tmp>0 ? tmp : -tmp;
907  if(tmp > maxOffset) maxOffset = tmp;
908  }
909  }
910 
911  //2. prepare atom migration messages
912  //the messages are indexed as [-maxOffset,..., -1,0,1,..., maxOffset]
913  //where the "0" is not used at all. It is added for the sake of
914  //computing the index easily.
915  InputAtomList *migLists = new InputAtomList[2*maxOffset+1];
916  for(int i=0; i<toMigrateList.size(); i++) {
917  int idx = toMigrateList[i];
918  int parentRank = atomInitRankOnInput(initAtoms[idx].MPID);
919  //decide which migList to put this atom
920  int offset = parentRank - myInputRank + maxOffset;
921  migLists[offset].add(initAtoms[idx]);
922  }
923 
924  CProxy_ParallelIOMgr pIO(thisgroup);
925  for(int i=0; i<2*maxOffset+1; i++) {
926  int migLen = migLists[i].size();
927  if(migLen>0) {
928  MoveInputAtomsMsg *msg = new (migLen, 0)MoveInputAtomsMsg;
929  msg->length = migLen;
930  memcpy(msg->atomList, migLists[i].begin(), sizeof(InputAtom)*migLen);
931  int destRank = i-maxOffset+myInputRank;
932  pIO[inputProcArray[destRank]].recvAtomsMGrp(msg);
933  migLists[i].clear();
934  }
935  }
936 
937  toMigrateList.clear();
938  delete [] migLists;
939 }
940 
942 {
943  for(int i=0; i<msg->length; i++) {
944  tmpRecvAtoms.add((msg->atomList)[i]);
945  }
946  delete msg;
947 }
948 
950 {
951  if(myInputRank==-1) return;
952 
953  for(int i=0; i<tmpRecvAtoms.size(); i++) {
954  tmpRecvAtoms[i].isValid = true;
955  initAtoms.add(tmpRecvAtoms[i]);
956  }
957  tmpRecvAtoms.clear();
958 
959  //sort atom list based on hydrogenList value
960  std::sort(initAtoms.begin(), initAtoms.end());
961 
962  //now compute the counters inside Molecule such as numFixedRigidBonds
963  //which is based on the hydrogen group info
964 
965  int numFixedRigidBonds = 0;
966  if(molecule->numRigidBonds){
967  int parentIsFixed = 0;
968  for(int i=0; i<initAtoms.size(); i++) {
969  InputAtom *one = &(initAtoms[i]);
970  if(!one->isValid) continue;
971  if(one->isGP) {
972  parentIsFixed = one->atomFixed;
973  InputAtom *a1 = &(initAtoms[i+1]);
974  InputAtom *a2 = &(initAtoms[i+2]);
975  if((one->rigidBondLength>0.0) &&
976  a1->atomFixed && a2->atomFixed) {
977  numFixedRigidBonds++;
978  }
979  }else{
980  if((one->rigidBondLength>0.0) &&
981  one->atomFixed && parentIsFixed) {
982  numFixedRigidBonds++;
983  }
984  }
985  }
986  }
987 
988  int numFixedGroups = 0;
989  if(molecule->numFixedAtoms){
990  for(int i=0; i<initAtoms.size();) {
991  InputAtom *one = &(initAtoms[i]);
992  if(!one->isValid){
993  i++;
994  continue;
995  }
996  if(one->isGP) {
997  int allFixed = 1;
998  for(int j=0; j<one->hydrogenGroupSize; j++){
999  InputAtom *a1 = &(initAtoms[i+j]);
1000  allFixed = allFixed & a1->atomFixed;
1001  if(!allFixed) break;
1002  }
1003  if(allFixed) numFixedGroups++;
1004  i += one->hydrogenGroupSize;
1005  }
1006  }
1007  }
1008 
1009  CProxy_ParallelIOMgr pIO(thisgroup);
1010  HydroBasedMsg *msg = new HydroBasedMsg;
1011  msg->numFixedGroups = numFixedGroups;
1012  msg->numFixedRigidBonds = numFixedRigidBonds;
1013  pIO[0].recvHydroBasedCounter(msg);
1014 }
1015 
1017 {
1018 #ifdef MEM_OPT_VERSION
1019  if(myInputRank==-1) return;
1020 
1021  CProxy_ParallelIOMgr pIO(thisgroup);
1022 
1023  MolInfoMsg *msg = new MolInfoMsg;
1024  msg->numBonds = msg->numCalcBonds = 0;
1025  msg->numAngles = msg->numCalcAngles = 0;
1026  msg->numDihedrals = msg->numCalcDihedrals = 0;
1027  msg->numImpropers = msg->numCalcImpropers = 0;
1028  msg->numCrossterms = msg->numCalcCrossterms = 0;
1029  msg->numExclusions = msg->numCalcExclusions = 0;
1030  int64 numFullExclusions = msg->numCalcFullExclusions = 0;
1031  // JLai
1032  msg->numLJPairs = msg->numCalcLJPairs = 0;
1033  // End of JLai
1034  msg->numRigidBonds = 0;
1035  msg->totalMass = 0.0;
1036  msg->totalCharge = 0.0;
1037 
1038  //calculate the tuples this input processor have
1039  AtomSignature *atomSigPool = molecule->atomSigPool;
1040  ExclusionSignature *exclSigPool = molecule->exclSigPool;
1041  for(int i=0; i<initAtoms.size(); i++) {
1042  AtomSignature *thisSig = &atomSigPool[initAtoms[i].sigId];
1043  msg->numBonds += thisSig->bondCnt;
1044  msg->numAngles += thisSig->angleCnt;
1045  msg->numDihedrals += thisSig->dihedralCnt;
1046  msg->numImpropers += thisSig->improperCnt;
1047  msg->numCrossterms += thisSig->crosstermCnt;
1048  // JLai
1049  msg->numLJPairs += thisSig->gromacsPairCnt;
1050  // End of JLai
1051 
1052  ExclusionSignature *exclSig = &exclSigPool[initAtoms[i].exclId];
1053  msg->numExclusions += (exclSig->fullExclCnt + exclSig->modExclCnt);
1054  numFullExclusions += exclSig->fullExclCnt;
1055 
1056  if(initAtoms[i].rigidBondLength > 0.0) msg->numRigidBonds++;
1057 
1058  msg->totalMass += initAtoms[i].mass;
1059  msg->totalCharge += initAtoms[i].charge;
1060  }
1061 
1062  //deal with numCalc* which is related with fixed atoms!
1063  if(molecule->numFixedAtoms>0 && ! simParameters->fixedAtomsForces) {
1064  //if there's fixed atoms, calcExclusions needs to be calculated
1065  //Since it's possible the atom inside the this exclusion set is on
1066  //another input processor, we have to resort to the global fixed atoms
1067  //info inside the Molecule object. The number of such accesses should
1068  //be very small! --Chao Mei
1069  int sAId = initAtoms[0].id;
1070  int remoteCnt=0; //stats info
1071  for(int i=0; i<initAtoms.size(); i++) {
1072  //When all the atoms in the set are fixed, the elem (Bond etc.)
1073  //is not counted as a calc*.
1074  int myAId = initAtoms[i].id;
1075  AtomSignature *thisSig = &atomSigPool[initAtoms[i].sigId];
1076  ExclusionSignature *exclSig = &exclSigPool[initAtoms[i].exclId];
1077  if(!initAtoms[i].atomFixed) {
1078  msg->numCalcBonds += thisSig->bondCnt;
1079  msg->numCalcAngles += thisSig->angleCnt;
1080  msg->numCalcDihedrals += thisSig->dihedralCnt;
1081  msg->numCalcImpropers += thisSig->improperCnt;
1082  msg->numCalcCrossterms += thisSig->crosstermCnt;
1083  msg->numCalcExclusions+=(exclSig->fullExclCnt+exclSig->modExclCnt);
1084  msg->numCalcFullExclusions+=(exclSig->fullExclCnt);
1085  continue;
1086  }
1087 
1088  //1. Bonds
1089  for(int j=0; j<thisSig->bondCnt; j++) {
1090  TupleSignature *bsig = &(thisSig->bondSigs[j]);
1091  int a1 = myAId + bsig->offset[0];
1092  if(!isAtomFixed(sAId, a1)) msg->numCalcBonds++;
1093  }
1094 
1095  //2. Angles
1096  for(int j=0; j<thisSig->angleCnt; j++) {
1097  TupleSignature *bsig = &(thisSig->angleSigs[j]);
1098  int a1 = myAId + bsig->offset[0];
1099  int a2 = myAId + bsig->offset[1];
1100  if(!isAtomFixed(sAId, a1) || !isAtomFixed(sAId, a2))
1101  msg->numCalcAngles++;
1102  }
1103 
1104  //3. Dihedrals
1105  for(int j=0; j<thisSig->dihedralCnt; j++) {
1106  TupleSignature *bsig = &(thisSig->dihedralSigs[j]);
1107  int a1 = myAId + bsig->offset[0];
1108  int a2 = myAId + bsig->offset[1];
1109  int a3 = myAId + bsig->offset[2];
1110  if(!isAtomFixed(sAId, a1) ||
1111  !isAtomFixed(sAId, a2) ||
1112  !isAtomFixed(sAId, a3))
1113  msg->numCalcDihedrals++;
1114  }
1115 
1116  //4. Impropers
1117  for(int j=0; j<thisSig->improperCnt; j++) {
1118  TupleSignature *bsig = &(thisSig->improperSigs[j]);
1119  int a1 = myAId + bsig->offset[0];
1120  int a2 = myAId + bsig->offset[1];
1121  int a3 = myAId + bsig->offset[2];
1122  if(!isAtomFixed(sAId, a1) ||
1123  !isAtomFixed(sAId, a2) ||
1124  !isAtomFixed(sAId, a3))
1125  msg->numCalcImpropers++;
1126  }
1127 
1128  //5. Crossterms
1129  for(int j=0; j<thisSig->crosstermCnt; j++) {
1130  TupleSignature *bsig = &(thisSig->crosstermSigs[j]);
1131  int a1 = myAId + bsig->offset[0];
1132  int a2 = myAId + bsig->offset[1];
1133  int a3 = myAId + bsig->offset[2];
1134  int a4 = myAId + bsig->offset[3];
1135  int a5 = myAId + bsig->offset[4];
1136  int a6 = myAId + bsig->offset[5];
1137  int a7 = myAId + bsig->offset[6];
1138 
1139  if(!isAtomFixed(sAId, a1) ||
1140  !isAtomFixed(sAId, a2) ||
1141  !isAtomFixed(sAId, a3) ||
1142  !isAtomFixed(sAId, a4) ||
1143  !isAtomFixed(sAId, a5) ||
1144  !isAtomFixed(sAId, a6) ||
1145  !isAtomFixed(sAId, a7))
1146  msg->numCalcDihedrals++;
1147  }
1148 
1149  //6: Exclusions
1150  //this atom is fixed, check atoms in the exclusion set
1151  for(int j=0; j<exclSig->fullExclCnt; j++) {
1152  int thisAId = exclSig->fullOffset[j]+myAId;
1153  if(!isAtomFixed(sAId, thisAId)) { msg->numCalcExclusions++; msg->numCalcFullExclusions++; }
1154  }
1155  for(int j=0; j<exclSig->modExclCnt; j++) {
1156  int thisAId = exclSig->modOffset[j]+myAId;
1157  if(!isAtomFixed(sAId, thisAId)) msg->numCalcExclusions++;
1158  }
1159 
1160  //7: GromacsPair
1161  for(int j=0; j<thisSig->gromacsPairCnt; j++) {
1162  TupleSignature *bsig = &(thisSig->gromacsPairSigs[j]);
1163  int a1 = myAId + bsig->offset[0];
1164  int a2 = myAId + bsig->offset[1];
1165  if(!isAtomFixed(sAId, a1) ||
1166  !isAtomFixed(sAId, a2))
1167  msg->numCalcLJPairs++;
1168  }
1169  }
1170 #if COLLECT_PERFORMANCE_DATA
1171  printf("Num fixedAtom lookup on proc %d is %d\n", CkMyPe(), numFixedAtomLookup);
1172 #endif
1173  } else {
1174  //no fixed atoms, numCalc* is same with numExclusions
1175  msg->numCalcBonds = msg->numBonds;
1176  msg->numCalcAngles = msg->numAngles;
1177  msg->numCalcDihedrals = msg->numDihedrals;
1178  msg->numCalcImpropers = msg->numImpropers;
1179  msg->numCalcCrossterms = msg->numCrossterms;
1180  msg->numCalcExclusions = msg->numExclusions;
1181  msg->numCalcFullExclusions = numFullExclusions;
1182  }
1183 
1184 
1185  if(!simParameters->comMove) {
1186  //to remove the center of mass motion from a molecule.
1187  //first calculate the values on every input proc, then reduce.
1188  //For more info, refer to WorkDistrib::remove_com_motion
1189  //-Chao Mei
1190  (msg->totalMV).x = 0.0;
1191  (msg->totalMV).y = 0.0;
1192  (msg->totalMV).z = 0.0;
1193  for (int i=0; i<initAtoms.size(); i++) {
1194  msg->totalMV += initAtoms[i].mass * initAtoms[i].velocity;
1195  }
1196  }
1197 
1198  //always send to the master processor (proc 0)
1199  pIO[0].recvMolInfo(msg);
1200 #endif
1201 }
1202 
1203 //only executed on proc 0
1205 {
1206  molecule->numBonds += msg->numBonds;
1207  molecule->numCalcBonds += msg->numCalcBonds;
1208  molecule->numAngles += msg->numAngles;
1209  molecule->numCalcAngles += msg->numCalcAngles;
1210  molecule->numDihedrals += msg->numDihedrals;
1211  molecule->numCalcDihedrals += msg->numCalcDihedrals;
1212  molecule->numImpropers += msg->numImpropers;
1213  molecule->numCalcImpropers += msg->numCalcImpropers;
1214  molecule->numCrossterms += msg->numCrossterms;
1215  molecule->numCalcCrossterms += msg->numCalcCrossterms;
1216  numTotalExclusions += msg->numExclusions;
1217  numCalcExclusions += msg->numCalcExclusions;
1218  numCalcFullExclusions += msg->numCalcFullExclusions;
1219  molecule->numRigidBonds += msg->numRigidBonds;
1220 
1221  totalMass += msg->totalMass;
1222  totalCharge += msg->totalCharge;
1223 
1224  if(!simParameters->comMove) {
1225  totalMV += msg->totalMV;
1226  }
1227 
1228  if(++procsReceived == numInputProcs) {
1229  //received all the counters
1230  msg->numBonds = molecule->numBonds;
1231  msg->numCalcBonds = molecule->numCalcBonds;
1232  msg->numAngles = molecule->numAngles;
1233  msg->numCalcAngles = molecule->numCalcAngles;
1234  msg->numDihedrals = molecule->numDihedrals;
1235  msg->numCalcDihedrals = molecule->numCalcDihedrals;
1236  msg->numImpropers = molecule->numImpropers;
1237  msg->numCalcImpropers = molecule->numCalcImpropers;
1238  msg->numCrossterms = molecule->numCrossterms;
1239  msg->numCalcCrossterms = molecule->numCalcCrossterms;
1240  msg->numExclusions = numTotalExclusions/2;
1241  msg->numCalcExclusions = numCalcExclusions/2;
1242  msg->numCalcFullExclusions = numCalcFullExclusions/2;
1243  msg->numRigidBonds = molecule->numRigidBonds;
1244 
1245  msg->totalMass = totalMass;
1246  msg->totalCharge = totalCharge;
1247 
1248  if(!simParameters->comMove) {
1249  msg->totalMV = totalMV;
1250  }
1251 
1252  CProxy_ParallelIOMgr pIO(thisgroup);
1253  pIO.bcastMolInfo(msg);
1254 
1255  //reset to 0 for the next p2p-based reduction on input procs
1256  procsReceived = 0;
1257  } else delete msg;
1258 }
1259 
1261 {
1262 #ifdef MEM_OPT_VERSION
1263  if(myInputRank!=-1) {
1264  if(!simParameters->comMove) {
1265  //needs to remove the center of mass motion from a molecule
1266  Vector val = msg->totalMV / msg->totalMass;
1267  for (int i=0; i<initAtoms.size(); i++) initAtoms[i].velocity -= val;
1268  }
1269  }
1270 
1271  //only the rank 0 in the SMP node update the Molecule object
1272  if(CmiMyRank()) {
1273  delete msg;
1274  return;
1275  }
1276 
1277  molecule->numBonds = msg->numBonds;
1278  molecule->numCalcBonds = msg->numCalcBonds;
1279  molecule->numAngles = msg->numAngles;
1280  molecule->numCalcAngles = msg->numCalcAngles;
1281  molecule->numDihedrals = msg->numDihedrals;
1282  molecule->numCalcDihedrals = msg->numCalcDihedrals;
1283  molecule->numImpropers = msg->numImpropers;
1284  molecule->numCalcImpropers = msg->numCalcImpropers;
1285  molecule->numCrossterms = msg->numCrossterms;
1286  molecule->numCalcCrossterms = msg->numCalcCrossterms;
1287 
1288  molecule->numTotalExclusions = msg->numExclusions;
1289  molecule->numCalcExclusions = msg->numCalcExclusions;
1290  molecule->numCalcFullExclusions = msg->numCalcFullExclusions;
1291 
1292  molecule->numRigidBonds = msg->numRigidBonds;
1293 
1294 
1295  if(!CkMyPe()) {
1296  iout << iINFO << "LOADED " << molecule->numTotalExclusions << " TOTAL EXCLUSIONS\n" << endi;
1297  if(!simParameters->comMove) {
1298  iout << iINFO << "REMOVING COM VELOCITY "
1299  << (PDBVELFACTOR * (msg->totalMV / msg->totalMass))<< "\n" <<endi;
1300  }
1301  }
1302  delete msg;
1303 #endif
1304 }
1305 
1306 //only called on PE0
1308  molecule->numFixedRigidBonds += msg->numFixedRigidBonds;
1309  molecule->numFixedGroups += msg->numFixedGroups;
1310 
1311  if(++hydroMsgRecved == numInputProcs){
1312  msg->numFixedRigidBonds = molecule->numFixedRigidBonds;
1313  msg->numFixedGroups = molecule->numFixedGroups;
1314  CProxy_ParallelIOMgr pIO(thisgroup);
1315  pIO.bcastHydroBasedCounter(msg);
1316  hydroMsgRecved = 0;
1317  }else delete msg;
1318 }
1319 
1321 #ifdef MEM_OPT_VERSION
1322  //only the rank 0 in the SMP node update the Molecule object
1323  if(CmiMyRank()) {
1324  delete msg;
1325  return;
1326  }
1327  molecule->numFixedRigidBonds = msg->numFixedRigidBonds;
1328  molecule->numFixedGroups = msg->numFixedGroups;
1329  delete msg;
1330 
1331  if(!CkMyPe()) {
1332  iout << iINFO << "****************************\n";
1333  iout << iINFO << "STRUCTURE SUMMARY:\n";
1334  iout << iINFO << molecule->numAtoms << " ATOMS\n";
1335  iout << iINFO << molecule->numBonds << " BONDS\n";
1336  iout << iINFO << molecule->numAngles << " ANGLES\n";
1337  iout << iINFO << molecule->numDihedrals << " DIHEDRALS\n";
1338  iout << iINFO << molecule->numImpropers << " IMPROPERS\n";
1339  iout << iINFO << molecule->numCrossterms << " CROSSTERMS\n";
1340  iout << iINFO << molecule->numExclusions << " EXCLUSIONS\n";
1341 
1342  //****** BEGIN CHARMM/XPLOR type changes
1343  if ((molecule->numMultipleDihedrals) && (simParameters->paraTypeXplorOn)){
1344  iout << iINFO << molecule->numMultipleDihedrals
1345  << " DIHEDRALS WITH MULTIPLE PERIODICITY (BASED ON PSF FILE)\n";
1346  }
1347  if ((molecule->numMultipleDihedrals) && (simParameters->paraTypeCharmmOn)){
1348  iout << iINFO << molecule->numMultipleDihedrals
1349  << " DIHEDRALS WITH MULTIPLE PERIODICITY IGNORED (BASED ON PSF FILE) \n";
1350  iout << iINFO
1351  << " CHARMM MULTIPLICITIES BASED ON PARAMETER FILE INFO! \n";
1352  }
1353  //****** END CHARMM/XPLOR type changes
1354 
1355  if (molecule->numMultipleImpropers){
1356  iout << iINFO << molecule->numMultipleImpropers
1357  << " IMPROPERS WITH MULTIPLE PERIODICITY\n";
1358  }
1359 
1360  if (simParameters->fixedAtomsOn)
1361  iout << iINFO << molecule->numFixedAtoms << " FIXED ATOMS\n";
1362 
1363 
1364  if (simParameters->rigidBonds)
1365  iout << iINFO << molecule->numRigidBonds << " RIGID BONDS\n";
1366 
1367  if (simParameters->fixedAtomsOn && simParameters->rigidBonds)
1368  iout << iINFO << molecule->numFixedRigidBonds <<
1369  " RIGID BONDS BETWEEN FIXED ATOMS\n";
1370 
1371  iout << iINFO << molecule->num_deg_freedom(1)
1372  << " DEGREES OF FREEDOM\n";
1373 
1374  iout << iINFO << molecule->numHydrogenGroups << " HYDROGEN GROUPS\n";
1375  iout << iINFO << molecule->maxHydrogenGroupSize
1376  << " ATOMS IN LARGEST HYDROGEN GROUP\n";
1377  iout << iINFO << molecule->numMigrationGroups << " MIGRATION GROUPS\n";
1378  iout << iINFO << molecule->maxMigrationGroupSize
1379  << " ATOMS IN LARGEST MIGRATION GROUP\n";
1380  if (simParameters->fixedAtomsOn)
1381  {
1382  iout << iINFO << molecule->numFixedGroups <<
1383  " HYDROGEN GROUPS WITH ALL ATOMS FIXED\n";
1384  }
1385 
1386  iout << iINFO << "TOTAL MASS = " << totalMass << " amu\n";
1387  iout << iINFO << "TOTAL CHARGE = " << totalCharge << " e\n";
1388 
1389  BigReal volume = simParameters->lattice.volume();
1390  if ( volume ) {
1391  iout << iINFO << "MASS DENSITY = "
1392  << ((totalMass/volume) / 0.6022) << " g/cm^3\n";
1393  iout << iINFO << "ATOM DENSITY = "
1394  << (molecule->numAtoms/volume) << " atoms/A^3\n";
1395  }
1396 
1397  iout << iINFO << "*****************************\n";
1398  iout << endi;
1399  fflush(stdout);
1400  }
1401 #endif
1402 }
1403 
1405 {
1406  if(myInputRank==-1) return;
1407 
1408  PatchMap *patchMap = PatchMap::Object();
1409  int numPatches = patchMap->numPatches();
1410 
1411  patchMap->initTmpPatchAtomsList();
1412 
1413  //each list contains the atom index to the initAtoms
1414  vector<int> *eachPatchAtomList = patchMap->getTmpPatchAtomsList();
1415 
1416  CProxy_PatchMgr pm(CkpvAccess(BOCclass_group).patchMgr);
1417  PatchMgr *patchMgr = pm.ckLocalBranch();
1418 
1419  int pid=0;
1420  const Lattice lattice = simParameters->lattice;
1421  for(int i=0; i<initAtoms.size(); i++) {
1422  InputAtom *atom = &(initAtoms[i]);
1423  if(!atom->isValid) continue;
1424  if(atom->isMP) {
1425  pid = patchMap->assignToPatch(atom->position, lattice);
1426  }
1427  eachPatchAtomList[pid].push_back(i);
1428  }
1429 
1430  CProxy_ParallelIOMgr pIO(thisgroup);
1431 
1432  int patchCnt = 0;
1433  for(int i=0; i<numPatches; i++) {
1434  int cursize = eachPatchAtomList[i].size();
1435  if(cursize>0) patchCnt++;
1436  }
1437 
1438  AtomsCntPerPatchMsg *msg = NULL;
1439  if(simParameters->fixedAtomsOn) {
1440  msg = new (patchCnt, patchCnt, patchCnt, 0)AtomsCntPerPatchMsg;
1441  } else {
1442  msg = new (patchCnt, patchCnt, 0, 0)AtomsCntPerPatchMsg;
1443  }
1444 
1445  msg->length = patchCnt;
1446  patchCnt = 0;
1447  for(int i=0; i<numPatches; i++) {
1448  int cursize = eachPatchAtomList[i].size();
1449  if(cursize>0) {
1450  if ( cursize > USHRT_MAX ) {
1451  char errstr[512];
1452  sprintf(errstr, "Patch %d exceeds %d atoms.", i, USHRT_MAX);
1453  NAMD_die(errstr);
1454  }
1455  msg->pidList[patchCnt] = i;
1456  msg->atomsCntList[patchCnt] = cursize;
1457  patchCnt++;
1458  }
1459  }
1460 
1461  if(simParameters->fixedAtomsOn) {
1462  patchCnt = 0;
1463  for(int i=0; i<numPatches; i++) {
1464  int cursize = eachPatchAtomList[i].size();
1465  if(cursize>0) {
1466  int fixedCnt = 0;
1467  for(int j=0; j<cursize; j++) {
1468  int aid = eachPatchAtomList[i][j];
1469  //atomFixed is either 0 or 1
1470  fixedCnt += initAtoms[aid].atomFixed;
1471  }
1472  msg->fixedAtomsCntList[patchCnt] = fixedCnt;
1473  patchCnt++;
1474  }
1475  }
1476  }
1477 
1478  pIO[0].recvAtomsCntPerPatch(msg);
1479 
1480 }
1481 
1483 {
1484 #ifdef MEM_OPT_VERSION
1485  PatchMap *patchMap = PatchMap::Object();
1486  for(int i=0; i<msg->length; i++) {
1487  int pid = msg->pidList[i];
1488  int oldNum = patchMap->numAtoms(pid);
1489  if ( oldNum + msg->atomsCntList[i] > USHRT_MAX ) {
1490  char errstr[512];
1491  sprintf(errstr, "Patch %d exceeds %d atoms.", pid, USHRT_MAX);
1492  NAMD_die(errstr);
1493  }
1494  patchMap->setNumAtoms(pid, oldNum+msg->atomsCntList[i]);
1495  if(simParameters->fixedAtomsOn) {
1496  oldNum = patchMap->numFixedAtoms(pid);
1497  patchMap->setNumFixedAtoms(pid, oldNum+msg->fixedAtomsCntList[i]);
1498  }
1499  }
1500  delete msg;
1501 
1502  if(++procsReceived == numInputProcs) {
1503  //print max PATCH info
1504  int maxAtoms = -1;
1505  int maxPatch = -1;
1506  int totalAtoms = 0;
1507  for(int i=0; i<patchMap->numPatches(); i++) {
1508  int cnt = patchMap->numAtoms(i);
1509  totalAtoms += cnt;
1510  if(cnt>maxAtoms) {
1511  maxAtoms = cnt;
1512  maxPatch = i;
1513  }
1514  }
1515  procsReceived = 0;
1516  iout << iINFO << "LARGEST PATCH (" << maxPatch <<
1517  ") HAS " << maxAtoms << " ATOMS\n" << endi;
1518  if ( totalAtoms != Node::Object()->molecule->numAtoms ) {
1519  char errstr[512];
1520  sprintf(errstr, "Incorrect atom count in void ParallelIOMgr::recvAtomsCntPerPatch: %d vs %d", totalAtoms, Node::Object()->molecule->numAtoms);
1521  NAMD_die(errstr);
1522  }
1523  }
1524 #endif
1525 }
1526 
1528 {
1529  ((ParallelIOMgr*)arg)->sendAtomsToHomePatchProcs();
1530 }
1531 
1533 {
1534 #ifdef MEM_OPT_VERSION
1535  if(myInputRank==-1) return;
1536 
1537  if ( sendAtomsThread == 0 ) {
1538  sendAtomsThread = CthCreate((CthVoidFn)call_sendAtomsToHomePatchProcs,this,0);
1539  CthAwaken(sendAtomsThread);
1540  return;
1541  }
1542  sendAtomsThread = 0;
1543  numAcksOutstanding = 0;
1544 
1545  PatchMap *patchMap = PatchMap::Object();
1546  int numPatches = patchMap->numPatches();
1547  vector<int> *eachPatchAtomList = patchMap->getTmpPatchAtomsList();
1548 
1549  //each element (proc) contains the list of ids of patches which will stay
1550  //on that processor
1551  ResizeArray<int> *procList = new ResizeArray<int>[CkNumPes()];
1552  ResizeArray<int> pesToSend;
1553  for(int i=0; i<numPatches; i++) {
1554  if(eachPatchAtomList[i].size()==0) continue;
1555  int onPE = patchMap->node(i);
1556  if ( procList[onPE].size() == 0 ) pesToSend.add(onPE);
1557  procList[onPE].add(i);
1558  }
1559 
1560  Random(CkMyPe()).reorder(pesToSend.begin(),pesToSend.size());
1561  //CkPrintf("Pe %d ParallelIOMgr::sendAtomsToHomePatchProcs sending to %d pes\n",CkMyPe(),pesToSend.size());
1562 
1563  //go over every processor to send a message if necessary
1564  //TODO: Optimization for local home patches to save temp memory usage??? -CHAOMEI
1565  CProxy_ParallelIOMgr pIO(thisgroup);
1566  for(int k=0; k<pesToSend.size(); k++) {
1567  const int i = pesToSend[k];
1568  int len = procList[i].size();
1569  if(len==0) continue;
1570 
1571  // Sending one message per pe can result in very large messages
1572  // that break Converse so send one message per patch instead.
1573  for(int j=0; j<len; j++) {
1574  int pid = procList[i][j];
1575  int atomCnt = eachPatchAtomList[pid].size();
1576 
1577  if ( numAcksOutstanding >= 10 ) {
1578  sendAtomsThread = CthSelf();
1579  CthSuspend();
1580  }
1581  ++numAcksOutstanding;
1582 
1583  MovePatchAtomsMsg *msg = new (1, 1, atomCnt, 0)MovePatchAtomsMsg;
1584  msg->from = CkMyPe();
1585  msg->patchCnt = 1;
1586  int atomIdx = 0;
1587  msg->pidList[0] = pid;
1588  msg->sizeList[0] = atomCnt;
1589  for(int k=0; k<atomCnt; k++, atomIdx++) {
1590  int aid = eachPatchAtomList[pid][k];
1591  FullAtom one = initAtoms[aid];
1592  //HACK to re-sort the atom list after receiving the atom list on
1593  //home patch processor -Chao Mei
1594  one.hydVal = initAtoms[aid].hydList;
1595  msg->allAtoms[atomIdx] = one;
1596  }
1597  pIO[i].recvAtomsToHomePatchProcs(msg);
1598  }
1599 
1600  procList[i].clear();
1601  }
1602 
1603  //clean up to free space
1604  delete [] procList;
1605  patchMap->delTmpPatchAtomsList();
1606 
1607  //free the space occupied by the list that contains the input atoms
1608  initAtoms.clear();
1609 #endif
1610 }
1611 
1613 {
1614  --numAcksOutstanding;
1615  if ( sendAtomsThread ) {
1616  CthAwaken(sendAtomsThread);
1617  sendAtomsThread = 0;
1618  }
1619 }
1620 
1622 {
1623  CProxy_ParallelIOMgr pIO(thisgroup);
1624  pIO[msg->from].ackAtomsToHomePatchProcs();
1625 
1626  if(!isOKToRecvHPAtoms) {
1627  prepareHomePatchAtomList();
1628  isOKToRecvHPAtoms = true;
1629  }
1630 
1631  int numRecvPatches = msg->patchCnt;
1632  int aid = 0;
1633  for(int i=0; i<numRecvPatches; i++) {
1634  int pid = msg->pidList[i];
1635  int size = msg->sizeList[i];
1636  int idx = binaryFindHPID(pid);
1637  for(int j=0; j<size; j++, aid++) {
1638  hpAtomsList[idx].add(msg->allAtoms[aid]);
1639  }
1640  }
1641  //CkPrintf("Pe %d recvAtomsToHomePatchProcs for %d patches %d atoms\n",CkMyPe(),numRecvPatches,aid);
1642  delete msg;
1643 }
1644 
1645 void ParallelIOMgr::prepareHomePatchAtomList()
1646 {
1647  PatchMap *patchMap = PatchMap::Object();
1648  for(int i=0; i<patchMap->numPatches(); i++) {
1649  if(patchMap->node(i)==CkMyPe()) {
1650  hpIDList.add(i);
1651  }
1652  }
1653  if(hpIDList.size()>0)
1654  hpAtomsList = new FullAtomList[hpIDList.size()];
1655 }
1656 
1657 int ParallelIOMgr::binaryFindHPID(int pid)
1658 {
1659  //hpIDList should be in increasing order!!
1660  int lIdx, rIdx;
1661  int retIdx = -1;
1662 
1663  rIdx=0;
1664  lIdx=hpIDList.size()-1;
1665 
1666  while(rIdx<=lIdx ) {
1667  int idx = (rIdx+lIdx)/2;
1668  int curPid = hpIDList[idx];
1669  if(pid>curPid) {
1670  //in the left
1671  rIdx = idx+1;
1672  } else if(pid<curPid) {
1673  //in the right
1674  lIdx = idx-1;
1675  } else {
1676  //found!
1677  retIdx = idx;
1678  break;
1679  }
1680  }
1681  CmiAssert(retIdx!=-1);
1682  return retIdx;
1683 }
1684 
1686 {
1687 #ifdef MEM_OPT_VERSION
1688 
1689  int assignedPids = PatchMap::Object()->numPatchesOnNode(CkMyPe());
1690  int numPids = hpIDList.size();
1691  if(numPids==0){
1692  //this node actually contains no homepatches
1693  if(assignedPids == 0) return;
1694 
1695  //Entering the rare condition that all the homepatches this node has
1696  //are empty so that "recvAtomsToHomePatchProcs" is never called!
1697  //But we still need to create those empty homepatches!
1698  CmiAssert(isOKToRecvHPAtoms == false);
1699  PatchMap *patchMap = PatchMap::Object();
1700  CProxy_PatchMgr pm(CkpvAccess(BOCclass_group).patchMgr);
1701  PatchMgr *patchMgr = pm.ckLocalBranch();
1702  for(int i=0; i<patchMap->numPatches(); i++) {
1703  if(patchMap->node(i)==CkMyPe()) {
1704  FullAtomList emptyone;
1705  patchMgr->createHomePatch(i, emptyone);
1706  }
1707  }
1708  return;
1709  }
1710 
1711  CProxy_PatchMgr pm(CkpvAccess(BOCclass_group).patchMgr);
1712  PatchMgr *patchMgr = pm.ckLocalBranch();
1713 
1714  //go through the home patch list
1715  for(int i=0; i<numPids; i++) {
1716  int pid = hpIDList[i];
1717 
1718  //re-sort the atom list of this patch
1719  std::sort(hpAtomsList[i].begin(), hpAtomsList[i].end());
1720  Node::Object()->workDistrib->fillAtomListForOnePatch(pid, hpAtomsList[i]);
1721  patchMgr->createHomePatch(pid, hpAtomsList[i]);
1722  }
1723 
1724  hpIDList.clear();
1725  delete [] hpAtomsList;
1726 
1727  hpAtomsList = NULL;
1728 #endif
1729 }
1730 
1732 {
1733 #ifdef MEM_OPT_VERSION
1734  molecule->delAtomNames();
1735  molecule->delChargeSpace();
1736 
1737  //???TODO NOT SURE WHETHER freeEnergyOn is support in MEM_OPT_VERSION
1738  //-CHAOMEI
1739  if(!CkMyPe() && !simParameters->freeEnergyOn)
1740  molecule->delMassSpace();
1741 
1742  molecule->delFixedAtoms();
1743 #endif
1744 }
1745 
1746 //The initial distribution of atoms:
1747 //(avgNum+1),...,(avgNum+1),avgNum,...,avgNum where
1748 //avgNum equals to the total number of atoms in the system
1749 //divided by number of input processors --Chao Mei
1750 int ParallelIOMgr::numMyAtoms(int rank, int numProcs)
1751 {
1752  if(rank==-1) return -1;
1753  int avgNum = molecule->numAtoms/numProcs;
1754  int remainder = molecule->numAtoms%numProcs;
1755  if(rank<remainder) return avgNum+1;
1756  else return avgNum;
1757 }
1758 
1759 int ParallelIOMgr::atomRank(int atomID, int numProcs)
1760 {
1761  int avgNum = molecule->numAtoms/numProcs;
1762  int remainder = molecule->numAtoms%numProcs;
1763  int midLimit = remainder*(avgNum+1);
1764  int idx;
1765  if(atomID<midLimit) {
1766  idx = atomID/(avgNum+1);
1767  } else {
1768  idx = remainder+(atomID-midLimit)/avgNum;
1769  }
1770  return idx;
1771 }
1772 
1773 void ParallelIOMgr::getMyAtomsRange(int &lowerIdx, int &upperIdx, int rank, int numProcs)
1774 {
1775  if(rank==-1) {
1776  //just to make sure upperIdx-lowerIdx+1 == -1
1777  lowerIdx=-1;
1778  upperIdx=-3;
1779  return;
1780  }
1781  int avgNum = molecule->numAtoms/numProcs;
1782  int remainder = molecule->numAtoms%numProcs;
1783  if(rank<remainder) {
1784  lowerIdx = rank*(avgNum+1);
1785  upperIdx = lowerIdx+avgNum;
1786  } else {
1787  int midLimit = remainder*(avgNum+1);
1788  lowerIdx = midLimit+(rank-remainder)*avgNum;
1789  upperIdx = lowerIdx+avgNum-1;
1790  }
1791 }
1792 
1793 int ParallelIOMgr::calcMyOutputProxyClients() {
1794  PatchMap *patchMap = PatchMap::Object();
1795  int myOutputProxyClients = 0;
1796  int myset = myOutputProxyRank / numOutputProcs;
1797  for(int i=0; i<CkNumPes(); ++i) {
1798  if ( (i*numProxiesPerOutputProc)/CkNumPes() == myset &&
1800  ++myOutputProxyClients;
1801  }
1802  }
1803  return myOutputProxyClients;
1804 }
1805 
1807 {
1808 #ifdef MEM_OPT_VERSION
1809  if ( myOutputRank != -1 ) {
1810  int ready = midCM->receivePositions(msg);
1811  if(ready) {
1812  CProxy_CollectionMaster cm(mainMaster);
1813  cm.receiveOutputPosReady(msg->seq);
1814  }
1815  delete msg;
1816  } else if ( myOutputProxyRank != -1 ) {
1817  if ( ! myOutputProxyPositions ) {
1818  myOutputProxyPositions = new CollectProxyVectorSequence(calcMyOutputProxyClients());
1819  }
1820  CollectVectorVarMsg *newmsg = myOutputProxyPositions->submitData(msg);
1821  if ( newmsg ) thisProxy[outputProcArray[myOutputProxyRank%numOutputProcs]].receivePositions(newmsg);
1822  delete msg;
1823  } else {
1824  NAMD_bug("ParallelIOMgr::receivePositions on bad pe");
1825  }
1826 #endif
1827 }
1828 
1830 {
1831 #ifdef MEM_OPT_VERSION
1832  if ( myOutputRank != -1 ) {
1833  int ready = midCM->receiveVelocities(msg);
1834  if(ready) {
1835  CProxy_CollectionMaster cm(mainMaster);
1836  cm.receiveOutputVelReady(msg->seq);
1837  }
1838  delete msg;
1839  } else if ( myOutputProxyRank != -1 ) {
1840  if ( ! myOutputProxyVelocities ) {
1841  myOutputProxyVelocities = new CollectProxyVectorSequence(calcMyOutputProxyClients());
1842  }
1843  CollectVectorVarMsg *newmsg = myOutputProxyVelocities->submitData(msg);
1844  if ( newmsg ) thisProxy[outputProcArray[myOutputProxyRank%numOutputProcs]].receiveVelocities(newmsg);
1845  delete msg;
1846  } else {
1847  NAMD_bug("ParallelIOMgr::receiveVelocities on bad pe");
1848  }
1849 #endif
1850 }
1851 
1853 {
1854 #ifdef MEM_OPT_VERSION
1855  if ( myOutputRank != -1 ) {
1856  int ready = midCM->receiveForces(msg);
1857  if(ready) {
1858  CProxy_CollectionMaster cm(mainMaster);
1859  cm.receiveOutputForceReady(msg->seq);
1860  }
1861  delete msg;
1862  } else if ( myOutputProxyRank != -1 ) {
1863  if ( ! myOutputProxyForces ) {
1864  myOutputProxyForces = new CollectProxyVectorSequence(calcMyOutputProxyClients());
1865  }
1866  CollectVectorVarMsg *newmsg = myOutputProxyForces->submitData(msg);
1867  if ( newmsg ) thisProxy[outputProcArray[myOutputProxyRank%numOutputProcs]].receiveForces(newmsg);
1868  delete msg;
1869  } else {
1870  NAMD_bug("ParallelIOMgr::receiveForces on bad pe");
1871  }
1872 #endif
1873 }
1874 
1875 
1876 void ParallelIOMgr::disposePositions(int seq, double prevT)
1877 {
1878 #ifdef MEM_OPT_VERSION
1879  DebugM(3, "["<<CkMyPe()<<"]"<<"ParallelIOMgr::disposePositions"<<"\n");
1880  double iotime = CmiWallTimer();
1881  midCM->disposePositions(seq);
1882  iotime = CmiWallTimer()-iotime+prevT;
1883 
1884 #if OUTPUT_SINGLE_FILE
1885  //Token-based file output
1886  if(myOutputRank == getMyOutputGroupHighestRank()) {
1887  //notify the CollectionMaster to start the next round
1888  CProxy_CollectionMaster cm(mainMaster);
1889  cm.startNextRoundOutputPos(iotime);
1890  } else {
1891  CProxy_ParallelIOMgr io(thisgroup);
1892  io[outputProcArray[myOutputRank+1]].disposePositions(seq, iotime);
1893  }
1894 #else
1895  //notify the CollectionMaster to start the next round
1896  CProxy_CollectionMaster cm(mainMaster);
1897  cm.startNextRoundOutputPos(iotime);
1898 #endif
1899 
1900 #endif
1901 }
1902 
1903 void ParallelIOMgr::disposeVelocities(int seq, double prevT)
1904 {
1905 #ifdef MEM_OPT_VERSION
1906  double iotime = CmiWallTimer();
1907  midCM->disposeVelocities(seq);
1908  iotime = CmiWallTimer()-iotime+prevT;
1909 
1910 #if OUTPUT_SINGLE_FILE
1911  //Token-based file output
1912  if(myOutputRank==getMyOutputGroupHighestRank()) {
1913  //notify the CollectionMaster to start the next round
1914  CProxy_CollectionMaster cm(mainMaster);
1915  cm.startNextRoundOutputVel(iotime);
1916  } else {
1917  CProxy_ParallelIOMgr io(thisgroup);
1918  io[outputProcArray[myOutputRank+1]].disposeVelocities(seq, iotime);
1919  }
1920 #else
1921  //notify the CollectionMaster to start the next round
1922  CProxy_CollectionMaster cm(mainMaster);
1923  cm.startNextRoundOutputVel(iotime);
1924 #endif
1925 
1926 #endif
1927 }
1928 
1929 void ParallelIOMgr::disposeForces(int seq, double prevT)
1930 {
1931 #ifdef MEM_OPT_VERSION
1932  double iotime = CmiWallTimer();
1933  midCM->disposeForces(seq);
1934  iotime = CmiWallTimer()-iotime+prevT;
1935 
1936 #if OUTPUT_SINGLE_FILE
1937  //Token-based file output
1938  if(myOutputRank==getMyOutputGroupHighestRank()) {
1939  //notify the CollectionMaster to start the next round
1940  CProxy_CollectionMaster cm(mainMaster);
1941  cm.startNextRoundOutputForce(iotime);
1942  } else {
1943  CProxy_ParallelIOMgr io(thisgroup);
1944  io[outputProcArray[myOutputRank+1]].disposeForces(seq, iotime);
1945  }
1946 #else
1947  //notify the CollectionMaster to start the next round
1948  CProxy_CollectionMaster cm(mainMaster);
1949  cm.startNextRoundOutputForce(iotime);
1950 #endif
1951 
1952 #endif
1953 }
1954 
1955 
1957 {
1958 #ifdef MEM_OPT_VERSION
1959  coorInstance = midCM->getReadyPositions(seq);
1960 
1961  coorInstance->lattice = lat; //record the lattice to use for wrapAll/Water!
1962  int fromAtomID = coorInstance->fromAtomID;
1963  int toAtomID = coorInstance->toAtomID;
1964 
1965  //only reference copies
1966  ResizeArray<Vector> &data = coorInstance->data;
1967  ResizeArray<FloatVector> &fdata = coorInstance->fdata;
1968  //if both data and fdata are not empty, they contain exact values, the only
1969  //difference lies in their precisions. Therefore, we only need to compute
1970  //the higher precision coordinate array. -Chao Mei
1971  int dsize = data.size();
1972  int numMyAtoms = toAtomID-fromAtomID+1;
1973  tmpCoorCon = new Vector[numMyAtoms];
1974  ClusterCoorElem one;
1975  //1. compute wrapped coordinates locally
1976  for(int i=0; i<numMyAtoms; i++){
1977  tmpCoorCon[i] = 0.0;
1978  int cid = clusterID[i];
1979  if(cid<fromAtomID){
1980  //on output procs ahead of me
1981  one.clusterId = cid;
1982  ClusterCoorElem *ret = remoteCoors.find(one);
1983  if(ret==NULL){
1984  if(dsize==0)
1985  one.dsum = fdata[i];
1986  else
1987  one.dsum = data[i];
1988 
1989  remoteCoors.add(one);
1990  }else{
1991  if(dsize==0)
1992  ret->dsum += fdata[i];
1993  else
1994  ret->dsum += data[i];
1995  }
1996  }else{
1997  if(dsize==0)
1998  tmpCoorCon[cid-fromAtomID] += fdata[i];
1999  else
2000  tmpCoorCon[cid-fromAtomID] += data[i];
2001  }
2002  }
2003 
2004  //2. Prepare to send msgs to remote output procs to reduce coordinates
2005  //values of a cluster
2006  CmiAssert(numRemoteClusters == remoteCoors.size());
2007  numCSMAck = 0; //set to 0 to prepare recving the final coor update
2008  CProxy_ParallelIOMgr pIO(thisgroup);
2009  ClusterCoorSetIter iter(remoteCoors);
2010  for(iter=iter.begin(); iter!=iter.end(); iter++){
2011  ClusterCoorMsg *msg = new ClusterCoorMsg;
2012  msg->srcRank = myOutputRank;
2013  msg->clusterId = iter->clusterId;
2014  msg->dsum = iter->dsum;
2015  int dstRank = atomRankOnOutput(iter->clusterId);
2016  pIO[outputProcArray[dstRank]].recvClusterCoor(msg);
2017  }
2018 
2019  //Just send a local NULL msg to indicate the local wrapping
2020  //coordinates has finished.
2021  recvClusterCoor(NULL);
2022 #endif
2023 }
2024 
2025 //On the output proc, it's possible (could be a rare case) that recvClusterCoor
2026 //is executed before wrapCoor, so we need to make sure the local tmpCoorCon has
2027 //been calculated. This is why (numRemoteReqs+1) msgs are expected as the
2028 //additional one is sent by itself when it finishes wrapping coordinates.
2029 // --Chao Mei
2031  //only add the msg from remote procs
2032  if(msg!=NULL) ccmBuf.add(msg);
2033 
2034  //include a msg sent by itself
2035  if(++numReqRecved == (numRemoteReqs+1)){
2036  numReqRecved = 0;
2037  integrateClusterCoor();
2038  }
2039 }
2040 
2041 void ParallelIOMgr::integrateClusterCoor(){
2042 #ifdef MEM_OPT_VERSION
2043  int fromIdx = coorInstance->fromAtomID;
2044  int toIdx = coorInstance->toAtomID;
2045  for(int i=0; i<ccmBuf.size(); i++){
2046  ClusterCoorMsg *msg = ccmBuf[i];
2047  int lidx = msg->clusterId - fromIdx;
2048  tmpCoorCon[lidx] += msg->dsum;
2049  }
2050 
2051  //send back those msgs
2052  CProxy_ParallelIOMgr pIO(thisgroup);
2053  for(int i=0; i<ccmBuf.size(); i++){
2054  ClusterCoorMsg *msg = ccmBuf[i];
2055  int lidx = msg->clusterId - fromIdx;
2056  if(simParameters->wrapAll || isWater[lidx]) {
2057  Lattice *lat = &(coorInstance->lattice);
2058  Vector coni = tmpCoorCon[lidx]/clusterSize[lidx];
2059  msg->dsum = (simParameters->wrapNearest ?
2060  lat->wrap_nearest_delta(coni) : lat->wrap_delta(coni));
2061  }else{
2062  msg->dsum = 0.0;
2063  }
2064  pIO[outputProcArray[msg->srcRank]].recvFinalClusterCoor(msg);
2065  }
2066  ccmBuf.resize(0);
2067 
2068  //It's possible that recvFinalClusterCoor is executed before integrateClusterCoor
2069  //on this processor (the other proc executes faster in doing wrapCoor). So avoid
2070  //this msg race, do send a message to itself to participate the reduction.
2071  if(numRemoteClusters!=0){
2072  recvFinalClusterCoor(NULL);
2073  } else {
2074  //this output proc is ready has the sum of coordinates of each cluster
2075  //on it, so it is ready to do the final wrap coor computation
2076  int numMyAtoms = toIdx-fromIdx+1;
2077  ResizeArray<Vector> &data = coorInstance->data;
2078  ResizeArray<FloatVector> &fdata = coorInstance->fdata;
2079  for(int i=0; i<numMyAtoms; i++){
2080  if(!simParameters->wrapAll && !isWater[i]) continue;
2081  int lidx = clusterID[i]-fromIdx;
2082  if(lidx==i){
2083  //the head atom of the cluster
2084  Lattice *lat = &(coorInstance->lattice);
2085  Vector coni = tmpCoorCon[lidx]/clusterSize[lidx];
2086  tmpCoorCon[lidx] = (simParameters->wrapNearest ?
2087  lat->wrap_nearest_delta(coni) : lat->wrap_delta(coni));
2088  }
2089  if(data.size()) data[i] += tmpCoorCon[lidx];
2090  //no operator += (FloatVector, Vector)
2091  if(fdata.size()) fdata[i] = fdata[i] + tmpCoorCon[lidx];
2092  }
2093 
2094  delete [] tmpCoorCon;
2095  tmpCoorCon = NULL;
2096  CProxy_CollectionMaster cm(mainMaster);
2097  cm.wrapCoorFinished();
2098  }
2099 #endif
2100 }
2101 
2103 #ifdef MEM_OPT_VERSION
2104  if(msg!=NULL){
2105  //only process the message sent from other procs!
2106  ClusterCoorElem one(msg->clusterId);
2107  ClusterCoorElem *ret = remoteCoors.find(one);
2108  ret->dsum = msg->dsum;
2109  delete msg;
2110  }
2111 
2112  if(++numCSMAck == (numRemoteClusters+1)){
2113  //final wrap coor computation
2114  int fromIdx = coorInstance->fromAtomID;
2115  int toIdx = coorInstance->toAtomID;
2116  int numMyAtoms = toIdx-fromIdx+1;
2117  ResizeArray<Vector> &data = coorInstance->data;
2118  ResizeArray<FloatVector> &fdata = coorInstance->fdata;
2119  ClusterCoorElem tmp;
2120  for(int i=0; i<numMyAtoms; i++){
2121  if(!simParameters->wrapAll && !isWater[i]) continue;
2122  int cid = clusterID[i];
2123  int lidx = cid-fromIdx;
2124  if(lidx<0){
2125  //this cid should be inside remoteCoors
2126  tmp.clusterId = cid;
2127  ClusterCoorElem *fone = remoteCoors.find(tmp);
2128  if(data.size()) data[i] += fone->dsum;
2129  if(fdata.size()) fdata[i] = fdata[i] + fone->dsum;
2130  }else{
2131  if(lidx==i){
2132  Lattice *lat = &(coorInstance->lattice);
2133  Vector coni = tmpCoorCon[lidx]/clusterSize[lidx];
2134  tmpCoorCon[lidx] = (simParameters->wrapNearest ?
2135  lat->wrap_nearest_delta(coni) : lat->wrap_delta(coni));
2136  }
2137  if(data.size()) data[i] += tmpCoorCon[lidx];
2138  if(fdata.size()) fdata[i] = fdata[i] + tmpCoorCon[lidx];
2139  }
2140  }
2141 
2142  delete [] tmpCoorCon;
2143  tmpCoorCon = NULL;
2144  CProxy_CollectionMaster cm(mainMaster);
2145  cm.wrapCoorFinished();
2146  numCSMAck = 0;
2147  remoteCoors.clear();
2148  }
2149 #endif
2150 }
2151 #include "ParallelIOMgr.def.h"
static Node * Object()
Definition: Node.h:86
int64 numCalcFullExclusions
Definition: ParallelIOMgr.h:49
DCDParams dcdSelectionParams[16]
Definition: Molecule.h:482
static CollectionMgr * Object()
Definition: CollectionMgr.h:30
unsigned short * fixedAtomsCntList
Definition: ParallelIOMgr.h:86
std::ostream & iINFO(std::ostream &s)
Definition: InfoStream.C:81
int frequency
Definition: common.h:255
void sendAtomsToHomePatchProcs()
int size(void) const
Definition: ResizeArray.h:131
void flipNum(char *elem, int elemSize, int numElems)
Definition: CompressPsf.C:406
PatchID assignToPatch(Position p, const Lattice &l)
Definition: PatchMap.inl:14
#define COMPRESSED_PSF_MAGICNUM
Definition: CompressPsf.h:13
void NAMD_err(const char *err_msg)
Definition: common.C:170
char inputFilename[NAMD_FILENAME_BUFFER_SIZE]
Definition: common.h:253
void initTmpPatchAtomsList()
Definition: PatchMap.h:223
Definition: Node.h:78
static int * peCompactOrdering
Definition: WorkDistrib.h:118
uint16 tag
Definition: common.h:252
void recvClusterSize(ClusterSizeMsg *msg)
int numCalcCrossterms
Definition: ParallelIOMgr.h:46
static PatchMap * Object()
Definition: PatchMap.h:27
ResizeArray< CollectProxyVectorInstance * > data
TupleSignature * improperSigs
Definition: structures.h:346
Definition: Vector.h:72
SimParameters * simParameters
Definition: Node.h:181
void integrateClusterSize()
CollectVectorVarMsg * buildMsg()
Definition: ParallelIOMgr.C:83
void recvAtomsMGrp(MoveInputAtomsMsg *msg)
void createHomePatch(PatchID pid, FullAtomList &a)
Definition: PatchMgr.C:74
TupleSignature * dihedralSigs
Definition: structures.h:345
int32_t int32
Definition: common.h:38
void readPerAtomInfo()
#define DebugM(x, y)
Definition: Debug.h:75
PUPbytes(Lattice)
std::ostream & endi(std::ostream &s)
Definition: InfoStream.C:54
InputAtom * atomList
Definition: ParallelIOMgr.h:76
Position position
Definition: NamdTypes.h:78
void disposePositions(int seq, double prevT)
std::ostream & iWARN(std::ostream &s)
Definition: InfoStream.C:82
int64 numCalcExclusions
Definition: ParallelIOMgr.h:48
TupleSignature * crosstermSigs
Definition: structures.h:347
#define iout
Definition: InfoStream.h:51
void migrateAtomsMGrp()
#define NAMD_FILENAME_BUFFER_SIZE
Definition: common.h:45
void clear()
Definition: ResizeArray.h:91
int add(const Elem &elem)
Definition: ResizeArray.h:101
int numRigidBonds
Definition: ParallelIOMgr.h:53
Molecule stores the structural information for the system.
Definition: Molecule.h:174
void wrapCoor(int seq, Lattice lat)
int numCalcDihedrals
Definition: ParallelIOMgr.h:42
OUTPUTFILETYPE type
Definition: common.h:256
UniqueSetIter< T > begin(void) const
Definition: UniqueSetIter.h:55
void receiveForces(CollectVectorVarMsg *msg)
CollectVectorVarMsg::DataStatus vstatus
void recvAtomsToHomePatchProcs(MovePatchAtomsMsg *msg)
void integrateMigratedAtoms()
void reorder(Elem *a, int n)
Definition: Random.h:234
CkChareID getMasterChareID()
Definition: CollectionMgr.h:43
int numDihedrals
Definition: ParallelIOMgr.h:41
TupleSignature * gromacsPairSigs
Definition: structures.h:349
void disposeForces(int seq, double prevT)
void sendDcdParams()
char outFilename[NAMD_FILENAME_BUFFER_SIZE]
Definition: common.h:254
Definition: Random.h:37
int numPatches(void) const
Definition: PatchMap.h:59
BigReal totalCharge
Definition: ParallelIOMgr.h:60
void recvClusterCoor(ClusterCoorMsg *msg)
void updateMolInfo()
int append(CollectVectorVarMsg *msg)
Definition: ParallelIOMgr.C:61
void NAMD_bug(const char *err_msg)
Definition: common.C:195
void recvDcdParams(std::vector< uint16 > tags, std::vector< std::string > inputFileNames, std::vector< std::string > outputFileNames, std::vector< int > freqs, std::vector< OUTPUTFILETYPE > types)
int16 isMP
Definition: NamdTypes.h:256
void call_sendAtomsToHomePatchProcs(void *arg)
void recvAtomsCntPerPatch(AtomsCntPerPatchMsg *msg)
bool isValid
Definition: NamdTypes.h:254
int64 numExclusions
Definition: ParallelIOMgr.h:47
BigReal totalMass
Definition: ParallelIOMgr.h:57
void recvHydroBasedCounter(HydroBasedMsg *msg)
FullAtom * allAtoms
Definition: ParallelIOMgr.h:97
void recvFinalClusterCoor(ClusterCoorMsg *msg)
std::vector< int > * getTmpPatchAtomsList()
Definition: PatchMap.h:233
bool dcdSelectionAtoms
Definition: Molecule.h:481
Vector Position
Definition: NamdTypes.h:24
uint8 hydrogenGroupSize
Definition: NamdTypes.h:89
void readInfoForParOutDcdSelection()
TupleSignature * bondSigs
Definition: structures.h:343
int numCalcImpropers
Definition: ParallelIOMgr.h:44
void NAMD_die(const char *err_msg)
Definition: common.C:147
int numCrossterms
Definition: ParallelIOMgr.h:45
UniqueSetIter< T > end(void) const
Definition: UniqueSetIter.h:64
int numCalcLJPairs
Definition: ParallelIOMgr.h:51
void ackAtomsToHomePatchProcs()
static int * peDiffuseOrdering
Definition: WorkDistrib.h:116
void initialize(Node *node)
static int * peCompactOrderingIndex
Definition: WorkDistrib.h:119
Vector Velocity
Definition: NamdTypes.h:27
Real rigidBondLength
Definition: NamdTypes.h:231
WorkDistrib * workDistrib
Definition: Node.h:169
iterator begin(void)
Definition: ResizeArray.h:36
int numFixedRigidBonds
Definition: ParallelIOMgr.h:68
int size
Definition: common.h:259
void recvFinalClusterSize(ClusterSizeMsg *msg)
ResizeArray< FloatVector > fdata
NAMD_HOST_DEVICE Vector wrap_nearest_delta(Position pos1) const
Definition: Lattice.h:233
void disposeVelocities(int seq, double prevT)
#define PDBVELFACTOR
Definition: common.h:57
void calcAtomsInEachPatch()
unsigned short * atomsCntList
Definition: ParallelIOMgr.h:85
bool isOutputProcessor(int pe)
int numCalcBonds
Definition: ParallelIOMgr.h:38
int numPatchesOnNode(int node)
Definition: PatchMap.h:60
std::vector< unsigned int > getAllElements()
Definition: IndexFile.h:64
ResizeArray< Vector > data
int16 isGP
Definition: NamdTypes.h:255
void delTmpPatchAtomsList()
Definition: PatchMap.h:226
void bcastHydroBasedCounter(HydroBasedMsg *msg)
int node(int pid) const
Definition: PatchMap.h:114
Vector totalMV
Definition: ParallelIOMgr.h:58
void receiveVelocities(CollectVectorVarMsg *msg)
int numImpropers
Definition: ParallelIOMgr.h:43
void recvMolInfo(MolInfoMsg *msg)
int isOutputProcessor(int pe)
uint32 atomFixed
Definition: NamdTypes.h:162
int64_t int64
Definition: common.h:39
int gromacsPairCnt
Definition: structures.h:341
NAMD_HOST_DEVICE Vector wrap_delta(const Position &pos1) const
Definition: Lattice.h:222
Molecule * molecule
Definition: Node.h:179
void bcastMolInfo(MolInfoMsg *msg)
TupleSignature * angleSigs
Definition: structures.h:344
void reset(int s, CollectVectorVarMsg::DataStatus v, int numClients)
Definition: ParallelIOMgr.C:50
CollectVectorVarMsg * submitData(CollectVectorVarMsg *msg)
void createHomePatches()
double BigReal
Definition: common.h:123
void receivePositions(CollectVectorVarMsg *msg)
HashPool< AtomSigInfo > atomSigPool
Definition: CompressPsf.C:313
int numCalcAngles
Definition: ParallelIOMgr.h:40