NAMD
CollectionMaster.C
Go to the documentation of this file.
1 
7 #include "largefiles.h" // must be first!
8 
9 #include "InfoStream.h"
10 #include "CollectionMaster.h"
11 #include "ProcessorPrivate.h"
12 #include "SimParameters.h"
13 #include "packmsg.h"
14 #include "CollectionMaster.decl.h"
15 #include "Molecule.h"
16 
17 #include "memusage.h"
18 #define MIN_DEBUG_LEVEL 3
19 //#define DEBUGM
20 #include "Debug.h"
21 
23 {
24  if (CkpvAccess(CollectionMaster_instance) == 0) {
25  CkpvAccess(CollectionMaster_instance) = this;
26  } else {
27  DebugM(1, "CollectionMaster::CollectionMaster() - another instance of CollectionMaster exists!\n");
28  }
29  dataStreamFile = 0;
30 
31 #ifdef MEM_OPT_VERSION
32  wrapCoorDoneCnt = 0;
33  posDoneCnt = 0;
34  velDoneCnt = 0;
35  parOut = new ParOutput();
36 #endif
37 
38  posTimings = 10; velTimings = forceTimings = 5;
39 #ifdef NODEGROUP_FORCE_REGISTER
40  cmlock = CmiCreateLock();
41 #endif
42 }
43 
44 
46 {
47 #ifdef NODEGROUP_FORCE_REGISTER
48  CmiDestroyLock(cmlock);
49 #endif
50 }
51 
53 {
54  DebugM(3, "["<<CkMyPe()<<"]"<< " receivePositions\n");
55 #ifndef MEM_OPT_VERSION
56  positions.submitData(msg,Node::Object()->molecule->numAtoms);
57 
58  delete msg;
59 
61  while ( ( c = positions.removeReady() ) ) { disposePositions(c); }
62 #endif
63 }
64 
65 
67 {
68  DebugM(3, "["<<CkMyPe()<<"]"<< " receivePositionsDcdSelection index "<<msg->index<<"\n");
69 #ifndef MEM_OPT_VERSION
70  positionsDcdSelection.index=msg->index;
71  positionsDcdSelection.submitData(msg,Node::Object()->molecule->get_dcd_selection_size(msg->index));
72  delete msg;
73 
75  while ( ( c = positionsDcdSelection.removeReady() ) ) { disposePositions(c); }
76 #endif
77 }
78 
79 
81 {
82  DebugM(3, "["<<CkMyPe()<<"]"<< " enqueuePositions\n");
83  positions.enqueue(seq,lattice);
84 #ifndef MEM_OPT_VERSION
85 
87  while ( ( c = positions.removeReady() ) ) { disposePositions(c); }
88 #else
89  checkPosReady();
90 #endif
91 }
92 
94 {
95  DebugM(3, "["<<CkMyPe()<<"]"<< " enqueuePositionsDcdSelection\n");
96 #ifndef MEM_OPT_VERSION
97  positionsDcdSelection.enqueue(seq,lattice);
98 
100  while ( ( c = positionsDcdSelection.removeReady() ) ) { disposePositions(c); }
101 #else
102  checkPosReady();
103 #endif
104 }
105 
106 
108 {
109  DebugM(3, "["<<CkMyPe()<<"]"<< " disposePositions\n");
110 #ifndef MEM_OPT_VERSION
111  int seq = c->seq;
112  int size = c->data.size();
113  if ( ! size ) size = c->fdata.size();
114  Vector *data = c->data.begin();
115  FloatVector *fdata = c->fdata.begin();
116  double exectime = CmiWallTimer();
117  double mem = memusage_MB();
118 #ifdef NODEGROUP_FORCE_REGISTER
120  if (simParams->CUDASOAintegrate && simParams->GPUresidentSingleProcessMode) {
121  CProxy_PatchData cpdata(CkpvAccess(BOCclass_group).patchData);
122  Output *pout = cpdata.ckLocalBranch()->ptrOutput;
123  pout->coordinate(seq,size,data,fdata,c->lattice);
124  }
125  else
126 #endif
127  {
128  Node::Object()->output->coordinate(seq,size,data,fdata,c->lattice);
129  }
130  c->free();
131  exectime = CmiWallTimer()-exectime;
132  if ( posTimings ) {
133  CkPrintf("The last position output (seq=%d) takes %.3f seconds, %.3f MB of memory in use\n", seq, exectime, mem);
134  --posTimings;
135  }
136 #endif
137 }
138 
139 
141 {
142 #ifndef MEM_OPT_VERSION
143  velocities.submitData(msg,Node::Object()->molecule->numAtoms);
144  delete msg;
145 
147  while ( ( c = velocities.removeReady() ) ) { disposeVelocities(c); }
148 #endif
149 }
150 
152 {
153  Lattice dummy;
154  velocities.enqueue(seq,dummy);
155 #ifndef MEM_OPT_VERSION
157  while ( ( c = velocities.removeReady() ) ) { disposeVelocities(c); }
158 #else
159  checkVelReady();
160 #endif
161 }
162 
164 {
165 #ifndef MEM_OPT_VERSION
166  DebugM(3,"Collected velocities at " << c->seq << std::endl);
167  int seq = c->seq;
168  int size = c->data.size();
169  if ( ! size ) size = c->fdata.size();
170  Vector *data = c->data.begin();
171  FloatVector *fdata = c->fdata.begin();
172  double exectime = CmiWallTimer();
173  double mem = memusage_MB();
174 #ifdef NODEGROUP_FORCE_REGISTER
175  if (Node::Object()->simParameters->CUDASOAintegrate) {
176  CProxy_PatchData cpdata(CkpvAccess(BOCclass_group).patchData);
177  Output *pout = cpdata.ckLocalBranch()->ptrOutput;
178  pout->velocity(seq,size,data,fdata);
179  }
180  else
181 #endif
182  {
183  Node::Object()->output->velocity(seq,size,data,fdata);
184  }
185  c->free();
186  exectime = CmiWallTimer()-exectime;
187  if ( velTimings ) {
188  CkPrintf("The last velocity output (seq=%d) takes %.3f seconds, %.3f MB of memory in use\n", seq, exectime, mem);
189  --velTimings;
190  }
191 #endif
192 }
193 
194 
196 {
197 #ifndef MEM_OPT_VERSION
198  forces.submitData(msg,Node::Object()->molecule->numAtoms);
199  delete msg;
200 
202  while ( ( c = forces.removeReady() ) ) { disposeForces(c); }
203 #endif
204 }
205 
207 {
208  Lattice dummy;
209  forces.enqueue(seq,dummy);
210 #ifndef MEM_OPT_VERSION
212  while ( ( c = forces.removeReady() ) ) { disposeForces(c); }
213 #else
214  checkForceReady();
215 #endif
216 }
217 
219 {
220 #ifndef MEM_OPT_VERSION
221  DebugM(3,"Collected forces at " << c->seq << std::endl);
222  int seq = c->seq;
223  int size = c->data.size();
224  if ( ! size ) size = c->fdata.size();
225  Vector *data = c->data.begin();
226  FloatVector *fdata = c->fdata.begin();
227  double exectime = CmiWallTimer();
228  double mem = memusage_MB();
229 #ifdef NODEGROUP_FORCE_REGISTER
230  if (Node::Object()->simParameters->CUDASOAintegrate) {
231  CProxy_PatchData cpdata(CkpvAccess(BOCclass_group).patchData);
232  Output *pout = cpdata.ckLocalBranch()->ptrOutput;
233  pout->force(seq,size,data,fdata);
234  }
235  else
236 #endif
237  {
238  Node::Object()->output->force(seq,size,data,fdata);
239  }
240  c->free();
241  exectime = CmiWallTimer()-exectime;
242  if ( forceTimings ) {
243  CkPrintf("The last force output (seq=%d) takes %.3f seconds, %.3f MB of memory in use\n", seq, exectime, mem);
244  --forceTimings;
245  }
246 #endif
247 }
248 
249 
251  if ( ! dataStreamFile ) {
252  char *fname = Node::Object()->simParameters->auxFilename;
253  // iout has large file linking issues on AIX
254  // iout << iINFO << "OPENING AUXILIARY DATA STREAM FILE "
255  // << fname << "\n" << endi;
256  CkPrintf("Info: OPENING AUXILIARY DATA STREAM FILE %s\n", fname);
257  NAMD_backup_file(fname);
258  dataStreamFile = fopen(fname,"w");
259  if ( ! dataStreamFile )
260  NAMD_die("Can't open auxiliary data stream file!");
261  }
262  fprintf(dataStreamFile,"%s",msg->data.begin());
263  fflush(dataStreamFile);
264  delete msg;
265 }
266 
268  PACK_RESIZE(data);
269 )
270 
271 //The timesteps on CollectionMaster will be always in increasing order
272 //because they are enqueued on PE0 by controller in order. -Chao Mei
273 //
274 //The computation of wrap_coor is also serialized for the sake of easy
275 //implementation (easier management of output)
277 #ifdef MEM_OPT_VERSION
278  DebugM(3, "["<<CkMyPe()<<"]"<< " receiveOutputPosReady\n");
279  positions.submitData(seq);
280  checkPosReady();
281 #endif
282 }
283 
285 #ifdef MEM_OPT_VERSION
286  velocities.submitData(seq);
287  checkVelReady();
288 #endif
289 }
290 
292 #ifdef MEM_OPT_VERSION
293  forces.submitData(seq);
294  checkForceReady();
295 #endif
296 }
297 
298 
300 #ifdef MEM_OPT_VERSION
301  DebugM(3, "["<<CkMyPe()<<"]"<< " startNextRoundOutputPos\n");
302  if(totalT > posIOTime) posIOTime = totalT;
303 
304 #ifndef OUTPUT_SINGLE_FILE
305 #error OUTPUT_SINGLE_FILE not defined!
306 #endif
307 
308 #if OUTPUT_SINGLE_FILE
309  if(++posDoneCnt < Node::Object()->simParameters->numoutputwrts) return;
310 #else
311  if(++posDoneCnt < Node::Object()->simParameters->numoutputprocs) return;
312 #endif
313 
314  posDoneCnt = 0;
315 
316  //retrieve the last ready instance
317  CollectVectorInstance *c = positions.getReady();
318  int seq = c->seq;
319  CmiAssert(c->status == IN_PROCESS);
320  double mem = memusage_MB();
321  positions.removeFirstReady();
322  c->free();
323  posOutTime = CmiWallTimer()-posOutTime;
324  if ( posTimings ) {
325  CkPrintf("The last position output (seq=%d) takes %.3f seconds(file I/O: %.3f secs), %.3f MB of memory in use\n", seq, posOutTime, posIOTime, mem);
326  --posTimings;
327  }
328 
329  //Actually the c->status doesn't need to be checked because it is
330  //certain that the new ready one will not be in IN_PROCESS status
331  checkPosReady();
332 #endif
333 }
334 
336 #ifdef MEM_OPT_VERSION
337 
338  if(totalT > velIOTime) velIOTime = totalT;
339 
340 #if OUTPUT_SINGLE_FILE
341  if(++velDoneCnt < Node::Object()->simParameters->numoutputwrts) return;
342 #else
343  if(++velDoneCnt < Node::Object()->simParameters->numoutputprocs) return;
344 #endif
345 
346  velDoneCnt = 0;
347 
348  //retrieve the last ready instance
349  CollectVectorInstance *c = velocities.getReady();
350  int seq = c->seq;
351  CmiAssert(c->status == IN_PROCESS);
352  double mem = memusage_MB();
353  velocities.removeFirstReady();
354  c->free();
355  velOutTime = CmiWallTimer()-velOutTime;
356  if ( velTimings ) {
357  CkPrintf("The last velocity output (seq=%d) takes %.3f seconds(file I/O: %.3f secs), %.3f MB of memory in use\n", seq, velOutTime, velIOTime, mem);
358  --velTimings;
359  }
360 
361  //Actually the c->status doesn't need to be checked because it is
362  //certain that the new ready one will not be in IN_PROCESS status
363  checkVelReady();
364 #endif
365 }
366 
368 #ifdef MEM_OPT_VERSION
369  DebugM(3, "["<<CkMyPe()<<"]"<< " startNextRoundOutputForce\n");
370  if(totalT > forceIOTime) forceIOTime = totalT;
371 
372 #if OUTPUT_SINGLE_FILE
373  if(++forceDoneCnt < Node::Object()->simParameters->numoutputwrts) return;
374 #else
375  if(++forceDoneCnt < Node::Object()->simParameters->numoutputprocs) return;
376 #endif
377 
378  forceDoneCnt = 0;
379 
380  //retrieve the last ready instance
381  CollectVectorInstance *c = forces.getReady();
382  int seq = c->seq;
383  CmiAssert(c->status == IN_PROCESS);
384  double mem = memusage_MB();
385  forces.removeFirstReady();
386  c->free();
387  forceOutTime = CmiWallTimer()-forceOutTime;
388  if ( forceTimings ) {
389  CkPrintf("The last force output (seq=%d) takes %.3f seconds(file I/O: %.3f secs), %.3f MB of memory in use\n", seq, forceOutTime, forceIOTime, mem);
390  --forceTimings;
391  }
392 
393  //Actually the c->status doesn't need to be checked because it is
394  //certain that the new ready one will not be in IN_PROCESS status
395  checkForceReady();
396 #endif
397 }
398 
399 
401 #ifdef MEM_OPT_VERSION
402  DebugM(3, "["<<CkMyPe()<<"]"<< " wrapCoorFinished\n");
403  if(++wrapCoorDoneCnt == Node::Object()->simParameters->numoutputprocs){
404  wrapCoorDoneCnt = 0;
405 
406  //count the wrapping-coor time into master writing time
407  posIOTime = CmiWallTimer()-posOutTime;
408 
409  //it's ready to output positions
410  CollectVectorInstance *c = positions.getReady();
411 
412  CProxy_ParallelIOMgr io(CkpvAccess(BOCclass_group).ioMgr);
413  ParallelIOMgr *ioMgr = io.ckLocalBranch();
414 
415 #if OUTPUT_SINGLE_FILE
416  //notify output procs to do Token based output
417  int grpsize = ioMgr->numOutputProcs / ioMgr->numOutputWrts;
418  int remains = ioMgr->numOutputProcs % ioMgr->numOutputWrts;
419  int outrank = 0;
420  int i;
421  for(i=0; i<remains; i++){
422  io[ioMgr->outputProcArray[outrank]].disposePositions(c->seq, posIOTime);
423  outrank += (grpsize+1);
424  }
425  for(; i<ioMgr->numOutputWrts; i++){
426  io[ioMgr->outputProcArray[outrank]].disposePositions(c->seq, posIOTime);
427  outrank += grpsize;
428  }
429 #else
430  //output multiple files
431  for(int i=0; i<ioMgr->numOutputProcs; i++) {
432  io[ioMgr->outputProcArray[i]].disposePositions(c->seq, posIOTime);
433  }
434 #endif
435 
436  }
437 #endif
438 }
439 
440 #ifdef MEM_OPT_VERSION
441 void CollectionMaster::checkPosReady(){
442  CollectVectorInstance *c;
443  DebugM(3, "["<<CkMyPe()<<"]"<< " checkPosReady\n");
444  if((c = positions.getReady())){
445  if(c->status == IN_PROCESS){
446  //indicating in the process of outputing coordinates
447  return;
448  }
449  c->status = IN_PROCESS;
450 
451  posOutTime = CmiWallTimer();
453  CProxy_ParallelIOMgr io(CkpvAccess(BOCclass_group).ioMgr);
454  ParallelIOMgr *ioMgr = io.ckLocalBranch();
455  if(simParam->wrapAll || simParam->wrapWater){
456  for(int i=0; i<ioMgr->numOutputProcs; i++){
457  io[ioMgr->outputProcArray[i]].wrapCoor(c->seq, c->lattice);
458  }
459  //write the header to overlap with the computation of
460  //wrapping coordinates
461  parOut->coordinateMaster(c->seq,Node::Object()->molecule->numAtoms,c->lattice);
462  }else{
463  //write the header
464  parOut->coordinateMaster(c->seq,Node::Object()->molecule->numAtoms,c->lattice);
465  posIOTime = CmiWallTimer() - posOutTime;
466 
467  #if OUTPUT_SINGLE_FILE
468  int grpsize = ioMgr->numOutputProcs / ioMgr->numOutputWrts;
469  int remains = ioMgr->numOutputProcs % ioMgr->numOutputWrts;
470  int outrank = 0;
471  int i;
472  for(i=0; i<remains; i++){
473  io[ioMgr->outputProcArray[outrank]].disposePositions(c->seq, posIOTime);
474  outrank += (grpsize+1);
475  }
476  for(; i<ioMgr->numOutputWrts; i++){
477  io[ioMgr->outputProcArray[outrank]].disposePositions(c->seq, posIOTime);
478  outrank += grpsize;
479  }
480  #else
481  //output multiple files
482  for(int i=0; i<ioMgr->numOutputProcs; i++) {
483  io[ioMgr->outputProcArray[i]].disposePositions(c->seq, posIOTime);
484  }
485  #endif
486  }
487  //this instance c is freed in the next round of output invocation.
488  }
489 }
490 
491 void CollectionMaster::checkVelReady(){
492  CollectVectorInstance *c;
493  if((c = velocities.getReady())){
494  if(c->status == IN_PROCESS){
495  //indicating in the process of outputing velocities
496  return;
497  }
498 
499  c->status = IN_PROCESS;
500 
501  velOutTime = CmiWallTimer();
502  //write the header
503  parOut->velocityMaster(c->seq, Node::Object()->molecule->numAtoms);
504  velIOTime = CmiWallTimer() - velOutTime;
505 
506  //notify output procs to do Token based output
507  CProxy_ParallelIOMgr io(CkpvAccess(BOCclass_group).ioMgr);
508  ParallelIOMgr *ioMgr = io.ckLocalBranch();
509 
510  #if OUTPUT_SINGLE_FILE
511  int grpsize = ioMgr->numOutputProcs / ioMgr->numOutputWrts;
512  int remains = ioMgr->numOutputProcs % ioMgr->numOutputWrts;
513  int outrank = 0;
514  int i;
515  for(i=0; i<remains; i++){
516  io[ioMgr->outputProcArray[outrank]].disposeVelocities(c->seq, velIOTime);
517  outrank += (grpsize+1);
518  }
519  for(; i<ioMgr->numOutputWrts; i++){
520  io[ioMgr->outputProcArray[outrank]].disposeVelocities(c->seq, velIOTime);
521  outrank += grpsize;
522  }
523  #else
524  //output multiple files
525  for(int i=0; i<ioMgr->numOutputProcs; i++) {
526  io[ioMgr->outputProcArray[i]].disposeVelocities(c->seq, velIOTime);
527  }
528  #endif
529  //this instance c is freed in the next round of output invocation.
530  }
531 }
532 
533 void CollectionMaster::checkForceReady(){
534  CollectVectorInstance *c;
535  if((c = forces.getReady())){
536  if(c->status == IN_PROCESS){
537  //indicating in the process of outputing forces
538  return;
539  }
540 
541  c->status = IN_PROCESS;
542 
543  forceOutTime = CmiWallTimer();
544  //write the header
545  parOut->forceMaster(c->seq, Node::Object()->molecule->numAtoms);
546  forceIOTime = CmiWallTimer() - forceOutTime;
547 
548  //notify output procs to do Token based output
549  CProxy_ParallelIOMgr io(CkpvAccess(BOCclass_group).ioMgr);
550  ParallelIOMgr *ioMgr = io.ckLocalBranch();
551 
552  #if OUTPUT_SINGLE_FILE
553  int grpsize = ioMgr->numOutputProcs / ioMgr->numOutputWrts;
554  int remains = ioMgr->numOutputProcs % ioMgr->numOutputWrts;
555  int outrank = 0;
556  int i;
557  for(i=0; i<remains; i++){
558  io[ioMgr->outputProcArray[outrank]].disposeForces(c->seq, forceIOTime);
559  outrank += (grpsize+1);
560  }
561  for(; i<ioMgr->numOutputWrts; i++){
562  io[ioMgr->outputProcArray[outrank]].disposeForces(c->seq, forceIOTime);
563  outrank += grpsize;
564  }
565  #else
566  //output multiple files
567  for(int i=0; i<ioMgr->numOutputProcs; i++) {
568  io[ioMgr->outputProcArray[i]].disposeForces(c->seq, forceIOTime);
569  }
570  #endif
571  //this instance c is freed in the next round of output invocation.
572  }
573 }
574 
575 
576 void CollectionMidMaster::disposePositions(int seq)
577 {
578  DebugM(3, "["<<CkMyPe()<<"]"<< " disposePositions\n");
579  CollectMidVectorInstance *c = positions.getReady(seq);
580  CmiAssert(c!=NULL);
581  parOut->coordinateSlave(seq,c->fromAtomID,c->toAtomID,
582  c->data.begin(),c->fdata.begin());
583  c->free();
584 }
585 
586 void CollectionMidMaster::disposeVelocities(int seq)
587 {
588  CollectMidVectorInstance *c = velocities.getReady(seq);
589  CmiAssert(c!=NULL);
590  parOut->velocitySlave(seq,c->fromAtomID,c->toAtomID,c->data.begin());
591  c->free();
592 }
593 
594 void CollectionMidMaster::disposeForces(int seq)
595 {
596  CollectMidVectorInstance *c = forces.getReady(seq);
597  CmiAssert(c!=NULL);
598  parOut->forceSlave(seq,c->fromAtomID,c->toAtomID,c->data.begin());
599  c->free();
600 }
601 #endif
602 
603 #include "CollectionMaster.def.h"
604 
static Node * Object()
Definition: Node.h:86
CollectVectorInstance * removeReady(void)
int size(void) const
Definition: ResizeArray.h:131
void force(int timestep, int n, Vector *frc, FloatVector *ffrc)
Produce appropriate force for the current timestep.
Definition: Output.C:646
void disposeForces(CollectVectorInstance *c)
void receiveVelocities(CollectVectorMsg *msg)
Definition: Vector.h:72
Output * output
Definition: Node.h:185
SimParameters * simParameters
Definition: Node.h:181
void receivePositions(CollectVectorMsg *msg)
void receivePositionsDcdSelection(CollectVectorMsg *msg)
#define DebugM(x, y)
Definition: Debug.h:75
void enqueueVelocities(int seq)
void receiveOutputPosReady(int seq)
void submitData(CollectVectorMsg *msg, int max_index)
void enqueue(int seq, Lattice &lattice)
char auxFilename[NAMD_FILENAME_BUFFER_SIZE]
void startNextRoundOutputPos(double totalT)
void enqueuePositions(int seq, Lattice &lattice)
void wrapCoor(int seq, Lattice lat)
void enqueuePositionsDcdSelection(int seq, Lattice &lattice)
void receiveDataStream(DataStreamMsg *msg)
double memusage_MB()
Definition: memusage.h:13
Definition: Output.h:35
void receiveOutputVelReady(int seq)
PACK_MSG(DataStreamMsg, PACK_RESIZE(data);)
void velocity(int timestep, int n, Vector *vel, FloatVector *fvel)
Produce appropriate velocity for the current timestep.
Definition: Output.C:541
void coordinate(int timestep, int n, Vector *coor, FloatVector *fcoor, Lattice &lattice)
Produce appropriate coordinate output for the current timestep.
Definition: Output.C:334
void disposePositions(CollectVectorInstance *c)
void submitData(CollectVectorMsg *msg, int max_index)
void startNextRoundOutputVel(double totalT)
int numAtoms
Definition: Molecule.h:586
void NAMD_die(const char *err_msg)
Definition: common.C:147
void receiveForces(CollectVectorMsg *msg)
void disposeVelocities(CollectVectorInstance *c)
void NAMD_backup_file(const char *filename, const char *extension)
Definition: common.C:235
CollectVectorInstanceDcdSelection * removeReady(void)
#define PACK_RESIZE(DATA)
Definition: packmsg.h:125
ResizeArray< char > data
#define simParams
Definition: Output.C:131
iterator begin(void)
Definition: ResizeArray.h:36
void startNextRoundOutputForce(double totalT)
void enqueueForces(int seq)
Molecule * molecule
Definition: Node.h:179
void receiveOutputForceReady(int seq)