NAMD
CollectionMaster.C
Go to the documentation of this file.
1 
7 #include "largefiles.h" // must be first!
8 
9 #include "InfoStream.h"
10 #include "CollectionMaster.h"
11 #include "ProcessorPrivate.h"
12 #include "SimParameters.h"
13 #include "packmsg.h"
14 #include "CollectionMaster.decl.h"
15 #include "Molecule.h"
16 
17 #include "memusage.h"
18 #define MIN_DEBUG_LEVEL 3
19 //#define DEBUGM
20 #include "Debug.h"
21 
23 {
24  if (CkpvAccess(CollectionMaster_instance) == 0) {
25  CkpvAccess(CollectionMaster_instance) = this;
26  } else {
27  DebugM(1, "CollectionMaster::CollectionMaster() - another instance of CollectionMaster exists!\n");
28  }
29  dataStreamFile = 0;
30 
31 #ifdef MEM_OPT_VERSION
32  wrapCoorDoneCnt = 0;
33  posDoneCnt = 0;
34  velDoneCnt = 0;
35  parOut = new ParOutput();
36 #endif
37 
38  posTimings = 10; velTimings = forceTimings = 5;
39 #ifdef NODEGROUP_FORCE_REGISTER
40  cmlock = CmiCreateLock();
41 #endif
42 }
43 
44 
46 {
47 #ifdef NODEGROUP_FORCE_REGISTER
48  CmiDestroyLock(cmlock);
49 #endif
50 }
51 
53 {
54  DebugM(3, "["<<CkMyPe()<<"]"<< " receivePositions\n");
55 #ifndef MEM_OPT_VERSION
56  positions.submitData(msg,Node::Object()->molecule->numAtoms);
57 
58  delete msg;
59 
61  while ( ( c = positions.removeReady() ) ) { disposePositions(c); }
62 #endif
63 }
64 
65 
67 {
68  DebugM(3, "["<<CkMyPe()<<"]"<< " receivePositionsDcdSelection index "<<msg->index<<"\n");
69 #ifndef MEM_OPT_VERSION
70  positionsDcdSelection.index=msg->index;
71  positionsDcdSelection.submitData(msg,Node::Object()->molecule->get_dcd_selection_size(msg->index));
72  delete msg;
73 
75  while ( ( c = positionsDcdSelection.removeReady() ) ) { disposePositions(c); }
76 #endif
77 }
78 
79 
81 {
82  DebugM(3, "["<<CkMyPe()<<"]"<< " enqueuePositions\n");
83  positions.enqueue(seq,lattice);
84 
85 #ifndef MEM_OPT_VERSION
87  while ( ( c = positions.removeReady() ) ) { disposePositions(c); }
88 #else
89  checkPosReady();
90 #endif
91 }
92 
94 {
95  DebugM(3, "["<<CkMyPe()<<"]"<< " enqueuePositionsDcdSelection\n");
96 #ifndef MEM_OPT_VERSION
97  positionsDcdSelection.enqueue(seq,lattice);
98 
100  while ( ( c = positionsDcdSelection.removeReady() ) ) { disposePositions(c); }
101 #else
102  checkPosReady();
103 #endif
104 }
105 
106 
108 {
109  DebugM(3, "["<<CkMyPe()<<"]"<< " disposePositions\n");
110 #ifndef MEM_OPT_VERSION
111  int seq = c->seq;
112  int size = c->data.size();
113  if ( ! size ) size = c->fdata.size();
114  Vector *data = c->data.begin();
115  FloatVector *fdata = c->fdata.begin();
116  double exectime = CmiWallTimer();
117  double mem = memusage_MB();
118 #ifdef NODEGROUP_FORCE_REGISTER
119  if (Node::Object()->simParameters->CUDASOAintegrate) {
120  CProxy_PatchData cpdata(CkpvAccess(BOCclass_group).patchData);
121  Output *pout = cpdata.ckLocalBranch()->ptrOutput;
122  pout->coordinate(seq,size,data,fdata,c->lattice);
123  }
124  else
125 #endif
126  {
127  Node::Object()->output->coordinate(seq,size,data,fdata,c->lattice);
128  }
129  c->free();
130  exectime = CmiWallTimer()-exectime;
131  if ( posTimings ) {
132  CkPrintf("The last position output (seq=%d) takes %.3f seconds, %.3f MB of memory in use\n", seq, exectime, mem);
133  --posTimings;
134  }
135 #endif
136 }
137 
138 
140 {
141 #ifndef MEM_OPT_VERSION
142  velocities.submitData(msg,Node::Object()->molecule->numAtoms);
143  delete msg;
144 
146  while ( ( c = velocities.removeReady() ) ) { disposeVelocities(c); }
147 #endif
148 }
149 
151 {
152  Lattice dummy;
153  velocities.enqueue(seq,dummy);
154 #ifndef MEM_OPT_VERSION
156  while ( ( c = velocities.removeReady() ) ) { disposeVelocities(c); }
157 #else
158  checkVelReady();
159 #endif
160 }
161 
163 {
164 #ifndef MEM_OPT_VERSION
165  DebugM(3,"Collected velocities at " << c->seq << std::endl);
166  int seq = c->seq;
167  int size = c->data.size();
168  Vector *data = c->data.begin();
169  double exectime = CmiWallTimer();
170  double mem = memusage_MB();
171 #ifdef NODEGROUP_FORCE_REGISTER
172  if (Node::Object()->simParameters->CUDASOAintegrate) {
173  CProxy_PatchData cpdata(CkpvAccess(BOCclass_group).patchData);
174  Output *pout = cpdata.ckLocalBranch()->ptrOutput;
175  pout->velocity(seq,size,data);
176  }
177  else
178 #endif
179  {
180  Node::Object()->output->velocity(seq,size,data);
181  }
182  c->free();
183  exectime = CmiWallTimer()-exectime;
184  if ( velTimings ) {
185  CkPrintf("The last velocity output (seq=%d) takes %.3f seconds, %.3f MB of memory in use\n", seq, exectime, mem);
186  --velTimings;
187  }
188 #endif
189 }
190 
191 
193 {
194 #ifndef MEM_OPT_VERSION
195  forces.submitData(msg,Node::Object()->molecule->numAtoms);
196  delete msg;
197 
199  while ( ( c = forces.removeReady() ) ) { disposeForces(c); }
200 #endif
201 }
202 
204 {
205  Lattice dummy;
206  forces.enqueue(seq,dummy);
207 #ifndef MEM_OPT_VERSION
209  while ( ( c = forces.removeReady() ) ) { disposeForces(c); }
210 #else
211  checkForceReady();
212 #endif
213 }
214 
216 {
217 #ifndef MEM_OPT_VERSION
218  DebugM(3,"Collected forces at " << c->seq << std::endl);
219  int seq = c->seq;
220  int size = c->data.size();
221  Vector *data = c->data.begin();
222  double exectime = CmiWallTimer();
223  double mem = memusage_MB();
224 #ifdef NODEGROUP_FORCE_REGISTER
225  if (Node::Object()->simParameters->CUDASOAintegrate) {
226  CProxy_PatchData cpdata(CkpvAccess(BOCclass_group).patchData);
227  Output *pout = cpdata.ckLocalBranch()->ptrOutput;
228  pout->force(seq,size,data);
229  }
230  else
231 #endif
232  {
233  Node::Object()->output->force(seq,size,data);
234  }
235  c->free();
236  exectime = CmiWallTimer()-exectime;
237  if ( forceTimings ) {
238  CkPrintf("The last force output (seq=%d) takes %.3f seconds, %.3f MB of memory in use\n", seq, exectime, mem);
239  --forceTimings;
240  }
241 #endif
242 }
243 
244 
246  if ( ! dataStreamFile ) {
247  char *fname = Node::Object()->simParameters->auxFilename;
248  // iout has large file linking issues on AIX
249  // iout << iINFO << "OPENING AUXILIARY DATA STREAM FILE "
250  // << fname << "\n" << endi;
251  CkPrintf("Info: OPENING AUXILIARY DATA STREAM FILE %s\n", fname);
252  NAMD_backup_file(fname);
253  dataStreamFile = fopen(fname,"w");
254  if ( ! dataStreamFile )
255  NAMD_die("Can't open auxiliary data stream file!");
256  }
257  fprintf(dataStreamFile,"%s",msg->data.begin());
258  fflush(dataStreamFile);
259  delete msg;
260 }
261 
263  PACK_RESIZE(data);
264 )
265 
266 //The timesteps on CollectionMaster will be always in increasing order
267 //because they are enqueued on PE0 by controller in order. -Chao Mei
268 //
269 //The computation of wrap_coor is also serialized for the sake of easy
270 //implementation (easier management of output)
272 #ifdef MEM_OPT_VERSION
273  positions.submitData(seq);
274  checkPosReady();
275 #endif
276 }
277 
279 #ifdef MEM_OPT_VERSION
280  velocities.submitData(seq);
281  checkVelReady();
282 #endif
283 }
284 
286 #ifdef MEM_OPT_VERSION
287  forces.submitData(seq);
288  checkForceReady();
289 #endif
290 }
291 
292 
294 #ifdef MEM_OPT_VERSION
295 
296  if(totalT > posIOTime) posIOTime = totalT;
297 
298 #ifndef OUTPUT_SINGLE_FILE
299 #error OUTPUT_SINGLE_FILE not defined!
300 #endif
301 
302 #if OUTPUT_SINGLE_FILE
303  if(++posDoneCnt < Node::Object()->simParameters->numoutputwrts) return;
304 #else
305  if(++posDoneCnt < Node::Object()->simParameters->numoutputprocs) return;
306 #endif
307 
308  posDoneCnt = 0;
309 
310  //retrieve the last ready instance
311  CollectVectorInstance *c = positions.getReady();
312  int seq = c->seq;
313  CmiAssert(c->status == IN_PROCESS);
314  double mem = memusage_MB();
315  positions.removeFirstReady();
316  c->free();
317  posOutTime = CmiWallTimer()-posOutTime;
318  if ( posTimings ) {
319  CkPrintf("The last position output (seq=%d) takes %.3f seconds(file I/O: %.3f secs), %.3f MB of memory in use\n", seq, posOutTime, posIOTime, mem);
320  --posTimings;
321  }
322 
323  //Actually the c->status doesn't need to be checked because it is
324  //certain that the new ready one will not be in IN_PROCESS status
325  checkPosReady();
326 #endif
327 }
328 
330 #ifdef MEM_OPT_VERSION
331 
332  if(totalT > velIOTime) velIOTime = totalT;
333 
334 #if OUTPUT_SINGLE_FILE
335  if(++velDoneCnt < Node::Object()->simParameters->numoutputwrts) return;
336 #else
337  if(++velDoneCnt < Node::Object()->simParameters->numoutputprocs) return;
338 #endif
339 
340  velDoneCnt = 0;
341 
342  //retrieve the last ready instance
343  CollectVectorInstance *c = velocities.getReady();
344  int seq = c->seq;
345  CmiAssert(c->status == IN_PROCESS);
346  double mem = memusage_MB();
347  velocities.removeFirstReady();
348  c->free();
349  velOutTime = CmiWallTimer()-velOutTime;
350  if ( velTimings ) {
351  CkPrintf("The last velocity output (seq=%d) takes %.3f seconds(file I/O: %.3f secs), %.3f MB of memory in use\n", seq, velOutTime, velIOTime, mem);
352  --velTimings;
353  }
354 
355  //Actually the c->status doesn't need to be checked because it is
356  //certain that the new ready one will not be in IN_PROCESS status
357  checkVelReady();
358 #endif
359 }
360 
362 #ifdef MEM_OPT_VERSION
363 
364  if(totalT > forceIOTime) forceIOTime = totalT;
365 
366 #if OUTPUT_SINGLE_FILE
367  if(++forceDoneCnt < Node::Object()->simParameters->numoutputwrts) return;
368 #else
369  if(++forceDoneCnt < Node::Object()->simParameters->numoutputprocs) return;
370 #endif
371 
372  forceDoneCnt = 0;
373 
374  //retrieve the last ready instance
375  CollectVectorInstance *c = forces.getReady();
376  int seq = c->seq;
377  CmiAssert(c->status == IN_PROCESS);
378  double mem = memusage_MB();
379  forces.removeFirstReady();
380  c->free();
381  forceOutTime = CmiWallTimer()-forceOutTime;
382  if ( forceTimings ) {
383  CkPrintf("The last force output (seq=%d) takes %.3f seconds(file I/O: %.3f secs), %.3f MB of memory in use\n", seq, forceOutTime, forceIOTime, mem);
384  --forceTimings;
385  }
386 
387  //Actually the c->status doesn't need to be checked because it is
388  //certain that the new ready one will not be in IN_PROCESS status
389  checkForceReady();
390 #endif
391 }
392 
393 
395 #ifdef MEM_OPT_VERSION
396  if(++wrapCoorDoneCnt == Node::Object()->simParameters->numoutputprocs){
397  wrapCoorDoneCnt = 0;
398 
399  //count the wrapping-coor time into master writing time
400  posIOTime = CmiWallTimer()-posOutTime;
401 
402  //it's ready to output positions
403  CollectVectorInstance *c = positions.getReady();
404 
405  CProxy_ParallelIOMgr io(CkpvAccess(BOCclass_group).ioMgr);
406  ParallelIOMgr *ioMgr = io.ckLocalBranch();
407 
408 #if OUTPUT_SINGLE_FILE
409  //notify output procs to do Token based output
410  int grpsize = ioMgr->numOutputProcs / ioMgr->numOutputWrts;
411  int remains = ioMgr->numOutputProcs % ioMgr->numOutputWrts;
412  int outrank = 0;
413  int i;
414  for(i=0; i<remains; i++){
415  io[ioMgr->outputProcArray[outrank]].disposePositions(c->seq, posIOTime);
416  outrank += (grpsize+1);
417  }
418  for(; i<ioMgr->numOutputWrts; i++){
419  io[ioMgr->outputProcArray[outrank]].disposePositions(c->seq, posIOTime);
420  outrank += grpsize;
421  }
422 #else
423  //output multiple files
424  for(int i=0; i<ioMgr->numOutputProcs; i++) {
425  io[ioMgr->outputProcArray[i]].disposePositions(c->seq, posIOTime);
426  }
427 #endif
428 
429  }
430 #endif
431 }
432 
433 #ifdef MEM_OPT_VERSION
434 void CollectionMaster::checkPosReady(){
435  CollectVectorInstance *c;
436  if((c = positions.getReady())){
437  if(c->status == IN_PROCESS){
438  //indicating in the process of outputing coordinates
439  return;
440  }
441  c->status = IN_PROCESS;
442 
443  posOutTime = CmiWallTimer();
445  CProxy_ParallelIOMgr io(CkpvAccess(BOCclass_group).ioMgr);
446  ParallelIOMgr *ioMgr = io.ckLocalBranch();
447  if(simParam->wrapAll || simParam->wrapWater){
448  for(int i=0; i<ioMgr->numOutputProcs; i++){
449  io[ioMgr->outputProcArray[i]].wrapCoor(c->seq, c->lattice);
450  }
451  //write the header to overlap with the computation of
452  //wrapping coordinates
453  parOut->coordinateMaster(c->seq,Node::Object()->molecule->numAtoms,c->lattice);
454  }else{
455  //write the header
456  parOut->coordinateMaster(c->seq,Node::Object()->molecule->numAtoms,c->lattice);
457  posIOTime = CmiWallTimer() - posOutTime;
458 
459  #if OUTPUT_SINGLE_FILE
460  int grpsize = ioMgr->numOutputProcs / ioMgr->numOutputWrts;
461  int remains = ioMgr->numOutputProcs % ioMgr->numOutputWrts;
462  int outrank = 0;
463  int i;
464  for(i=0; i<remains; i++){
465  io[ioMgr->outputProcArray[outrank]].disposePositions(c->seq, posIOTime);
466  outrank += (grpsize+1);
467  }
468  for(; i<ioMgr->numOutputWrts; i++){
469  io[ioMgr->outputProcArray[outrank]].disposePositions(c->seq, posIOTime);
470  outrank += grpsize;
471  }
472  #else
473  //output multiple files
474  for(int i=0; i<ioMgr->numOutputProcs; i++) {
475  io[ioMgr->outputProcArray[i]].disposePositions(c->seq, posIOTime);
476  }
477  #endif
478  }
479  //this instance c is freed in the next round of output invocation.
480  }
481 }
482 
483 void CollectionMaster::checkVelReady(){
484  CollectVectorInstance *c;
485  if((c = velocities.getReady())){
486  if(c->status == IN_PROCESS){
487  //indicating in the process of outputing velocities
488  return;
489  }
490 
491  c->status = IN_PROCESS;
492 
493  velOutTime = CmiWallTimer();
494  //write the header
495  parOut->velocityMaster(c->seq, Node::Object()->molecule->numAtoms);
496  velIOTime = CmiWallTimer() - velOutTime;
497 
498  //notify output procs to do Token based output
499  CProxy_ParallelIOMgr io(CkpvAccess(BOCclass_group).ioMgr);
500  ParallelIOMgr *ioMgr = io.ckLocalBranch();
501 
502  #if OUTPUT_SINGLE_FILE
503  int grpsize = ioMgr->numOutputProcs / ioMgr->numOutputWrts;
504  int remains = ioMgr->numOutputProcs % ioMgr->numOutputWrts;
505  int outrank = 0;
506  int i;
507  for(i=0; i<remains; i++){
508  io[ioMgr->outputProcArray[outrank]].disposeVelocities(c->seq, velIOTime);
509  outrank += (grpsize+1);
510  }
511  for(; i<ioMgr->numOutputWrts; i++){
512  io[ioMgr->outputProcArray[outrank]].disposeVelocities(c->seq, velIOTime);
513  outrank += grpsize;
514  }
515  #else
516  //output multiple files
517  for(int i=0; i<ioMgr->numOutputProcs; i++) {
518  io[ioMgr->outputProcArray[i]].disposeVelocities(c->seq, velIOTime);
519  }
520  #endif
521  //this instance c is freed in the next round of output invocation.
522  }
523 }
524 
525 void CollectionMaster::checkForceReady(){
526  CollectVectorInstance *c;
527  if((c = forces.getReady())){
528  if(c->status == IN_PROCESS){
529  //indicating in the process of outputing forces
530  return;
531  }
532 
533  c->status = IN_PROCESS;
534 
535  forceOutTime = CmiWallTimer();
536  //write the header
537  parOut->forceMaster(c->seq, Node::Object()->molecule->numAtoms);
538  forceIOTime = CmiWallTimer() - forceOutTime;
539 
540  //notify output procs to do Token based output
541  CProxy_ParallelIOMgr io(CkpvAccess(BOCclass_group).ioMgr);
542  ParallelIOMgr *ioMgr = io.ckLocalBranch();
543 
544  #if OUTPUT_SINGLE_FILE
545  int grpsize = ioMgr->numOutputProcs / ioMgr->numOutputWrts;
546  int remains = ioMgr->numOutputProcs % ioMgr->numOutputWrts;
547  int outrank = 0;
548  int i;
549  for(i=0; i<remains; i++){
550  io[ioMgr->outputProcArray[outrank]].disposeForces(c->seq, forceIOTime);
551  outrank += (grpsize+1);
552  }
553  for(; i<ioMgr->numOutputWrts; i++){
554  io[ioMgr->outputProcArray[outrank]].disposeForces(c->seq, forceIOTime);
555  outrank += grpsize;
556  }
557  #else
558  //output multiple files
559  for(int i=0; i<ioMgr->numOutputProcs; i++) {
560  io[ioMgr->outputProcArray[i]].disposeForces(c->seq, forceIOTime);
561  }
562  #endif
563  //this instance c is freed in the next round of output invocation.
564  }
565 }
566 
567 
568 void CollectionMidMaster::disposePositions(int seq)
569 {
570  DebugM(3, "["<<CkMyPe()<<"]"<< " disposePositions\n");
571  CollectMidVectorInstance *c = positions.getReady(seq);
572  CmiAssert(c!=NULL);
573  parOut->coordinateSlave(seq,c->fromAtomID,c->toAtomID,
574  c->data.begin(),c->fdata.begin());
575  c->free();
576 }
577 
578 void CollectionMidMaster::disposeVelocities(int seq)
579 {
580  CollectMidVectorInstance *c = velocities.getReady(seq);
581  CmiAssert(c!=NULL);
582  parOut->velocitySlave(seq,c->fromAtomID,c->toAtomID,c->data.begin());
583  c->free();
584 }
585 
586 void CollectionMidMaster::disposeForces(int seq)
587 {
588  CollectMidVectorInstance *c = forces.getReady(seq);
589  CmiAssert(c!=NULL);
590  parOut->forceSlave(seq,c->fromAtomID,c->toAtomID,c->data.begin());
591  c->free();
592 }
593 #endif
594 
595 #include "CollectionMaster.def.h"
596 
static Node * Object()
Definition: Node.h:86
CollectVectorInstance * removeReady(void)
int size(void) const
Definition: ResizeArray.h:131
void disposeForces(CollectVectorInstance *c)
void receiveVelocities(CollectVectorMsg *msg)
Definition: Vector.h:72
Output * output
Definition: Node.h:185
SimParameters * simParameters
Definition: Node.h:181
void receivePositions(CollectVectorMsg *msg)
void receivePositionsDcdSelection(CollectVectorMsg *msg)
#define DebugM(x, y)
Definition: Debug.h:75
void enqueueVelocities(int seq)
void receiveOutputPosReady(int seq)
void submitData(CollectVectorMsg *msg, int max_index)
void enqueue(int seq, Lattice &lattice)
void coordinate(int, int, Vector *, FloatVector *, Lattice &)
Definition: Output.C:345
char auxFilename[NAMD_FILENAME_BUFFER_SIZE]
void startNextRoundOutputPos(double totalT)
void enqueuePositions(int seq, Lattice &lattice)
void wrapCoor(int seq, Lattice lat)
void enqueuePositionsDcdSelection(int seq, Lattice &lattice)
void receiveDataStream(DataStreamMsg *msg)
double memusage_MB()
Definition: memusage.h:13
Definition: Output.h:35
void receiveOutputVelReady(int seq)
PACK_MSG(DataStreamMsg, PACK_RESIZE(data);)
void disposePositions(CollectVectorInstance *c)
void velocity(int, int, Vector *)
Definition: Output.C:531
void submitData(CollectVectorMsg *msg, int max_index)
void startNextRoundOutputVel(double totalT)
int numAtoms
Definition: Molecule.h:585
void NAMD_die(const char *err_msg)
Definition: common.C:147
void receiveForces(CollectVectorMsg *msg)
void disposeVelocities(CollectVectorInstance *c)
void NAMD_backup_file(const char *filename, const char *extension)
Definition: common.C:235
CollectVectorInstanceDcdSelection * removeReady(void)
#define PACK_RESIZE(DATA)
Definition: packmsg.h:125
ResizeArray< char > data
iterator begin(void)
Definition: ResizeArray.h:36
void startNextRoundOutputForce(double totalT)
void enqueueForces(int seq)
Molecule * molecule
Definition: Node.h:179
void force(int, int, Vector *)
Definition: Output.C:621
void receiveOutputForceReady(int seq)