NAMD
CollectionMaster.C
Go to the documentation of this file.
1 
7 #include "largefiles.h" // must be first!
8 
9 #include "InfoStream.h"
10 #include "CollectionMaster.h"
11 #include "ProcessorPrivate.h"
12 #include "SimParameters.h"
13 #include "packmsg.h"
14 #include "CollectionMaster.decl.h"
15 #include "Molecule.h"
16 
17 #include "memusage.h"
18 
19 // #define DEBUGM
20 #include "Debug.h"
21 
23 {
24  if (CkpvAccess(CollectionMaster_instance) == 0) {
25  CkpvAccess(CollectionMaster_instance) = this;
26  } else {
27  DebugM(1, "CollectionMaster::CollectionMaster() - another instance of CollectionMaster exists!\n");
28  }
29  dataStreamFile = 0;
30 
31 #ifdef MEM_OPT_VERSION
32  wrapCoorDoneCnt = 0;
33  posDoneCnt = 0;
34  velDoneCnt = 0;
35  parOut = new ParOutput();
36 #endif
37 
38  posTimings = 10; velTimings = forceTimings = 5;
39 }
40 
41 
43 {
44 }
45 
47 {
48 #ifndef MEM_OPT_VERSION
49  positions.submitData(msg,Node::Object()->molecule->numAtoms);
50  delete msg;
51 
53  while ( ( c = positions.removeReady() ) ) { disposePositions(c); }
54 #endif
55 }
56 
58 {
59  positions.enqueue(seq,lattice);
60 
61 #ifndef MEM_OPT_VERSION
63  while ( ( c = positions.removeReady() ) ) { disposePositions(c); }
64 #else
65  checkPosReady();
66 #endif
67 }
68 
70 {
71 #ifndef MEM_OPT_VERSION
72  DebugM(3,"Collected positions at " << c->seq << std::endl);
73  int seq = c->seq;
74  int size = c->data.size();
75  if ( ! size ) size = c->fdata.size();
76  Vector *data = c->data.begin();
77  FloatVector *fdata = c->fdata.begin();
78  double exectime = CmiWallTimer();
79  double mem = memusage_MB();
80  Node::Object()->output->coordinate(seq,size,data,fdata,c->lattice);
81  c->free();
82  exectime = CmiWallTimer()-exectime;
83  if ( posTimings ) {
84  CkPrintf("The last position output (seq=%d) takes %.3f seconds, %.3f MB of memory in use\n", seq, exectime, mem);
85  --posTimings;
86  }
87 #endif
88 }
89 
90 
92 {
93 #ifndef MEM_OPT_VERSION
94  velocities.submitData(msg,Node::Object()->molecule->numAtoms);
95  delete msg;
96 
98  while ( ( c = velocities.removeReady() ) ) { disposeVelocities(c); }
99 #endif
100 }
101 
103 {
104  Lattice dummy;
105  velocities.enqueue(seq,dummy);
106 #ifndef MEM_OPT_VERSION
108  while ( ( c = velocities.removeReady() ) ) { disposeVelocities(c); }
109 #else
110  checkVelReady();
111 #endif
112 }
113 
115 {
116 #ifndef MEM_OPT_VERSION
117  DebugM(3,"Collected velocities at " << c->seq << std::endl);
118  int seq = c->seq;
119  int size = c->data.size();
120  Vector *data = c->data.begin();
121  double exectime = CmiWallTimer();
122  double mem = memusage_MB();
123  Node::Object()->output->velocity(seq,size,data);
124  c->free();
125  exectime = CmiWallTimer()-exectime;
126  if ( velTimings ) {
127  CkPrintf("The last velocity output (seq=%d) takes %.3f seconds, %.3f MB of memory in use\n", seq, exectime, mem);
128  --velTimings;
129  }
130 #endif
131 }
132 
133 
135 {
136 #ifndef MEM_OPT_VERSION
137  forces.submitData(msg,Node::Object()->molecule->numAtoms);
138  delete msg;
139 
141  while ( ( c = forces.removeReady() ) ) { disposeForces(c); }
142 #endif
143 }
144 
146 {
147  Lattice dummy;
148  forces.enqueue(seq,dummy);
149 #ifndef MEM_OPT_VERSION
151  while ( ( c = forces.removeReady() ) ) { disposeForces(c); }
152 #else
153  checkForceReady();
154 #endif
155 }
156 
158 {
159 #ifndef MEM_OPT_VERSION
160  DebugM(3,"Collected forces at " << c->seq << std::endl);
161  int seq = c->seq;
162  int size = c->data.size();
163  Vector *data = c->data.begin();
164  double exectime = CmiWallTimer();
165  double mem = memusage_MB();
166  Node::Object()->output->force(seq,size,data);
167  c->free();
168  exectime = CmiWallTimer()-exectime;
169  if ( forceTimings ) {
170  CkPrintf("The last force output (seq=%d) takes %.3f seconds, %.3f MB of memory in use\n", seq, exectime, mem);
171  --forceTimings;
172  }
173 #endif
174 }
175 
176 
178  if ( ! dataStreamFile ) {
179  char *fname = Node::Object()->simParameters->auxFilename;
180  // iout has large file linking issues on AIX
181  // iout << iINFO << "OPENING AUXILIARY DATA STREAM FILE "
182  // << fname << "\n" << endi;
183  CkPrintf("Info: OPENING AUXILIARY DATA STREAM FILE %s\n", fname);
184  NAMD_backup_file(fname);
185  dataStreamFile = fopen(fname,"w");
186  if ( ! dataStreamFile )
187  NAMD_die("Can't open auxiliary data stream file!");
188  }
189  fprintf(dataStreamFile,"%s",msg->data.begin());
190  fflush(dataStreamFile);
191  delete msg;
192 }
193 
195  PACK_RESIZE(data);
196 )
197 
198 //The timesteps on CollectionMaster will be always in increasing order
199 //because they are enqueued on PE0 by controller in order. -Chao Mei
200 //
201 //The computation of wrap_coor is also serialized for the sake of easy
202 //implementation (easier management of output)
204 #ifdef MEM_OPT_VERSION
205  positions.submitData(seq);
206  checkPosReady();
207 #endif
208 }
209 
211 #ifdef MEM_OPT_VERSION
212  velocities.submitData(seq);
213  checkVelReady();
214 #endif
215 }
216 
218 #ifdef MEM_OPT_VERSION
219  forces.submitData(seq);
220  checkForceReady();
221 #endif
222 }
223 
224 
226 #ifdef MEM_OPT_VERSION
227 
228  if(totalT > posIOTime) posIOTime = totalT;
229 
230 #ifndef OUTPUT_SINGLE_FILE
231 #error OUTPUT_SINGLE_FILE not defined!
232 #endif
233 
234 #if OUTPUT_SINGLE_FILE
235  if(++posDoneCnt < Node::Object()->simParameters->numoutputwrts) return;
236 #else
237  if(++posDoneCnt < Node::Object()->simParameters->numoutputprocs) return;
238 #endif
239 
240  posDoneCnt = 0;
241 
242  //retrieve the last ready instance
243  CollectVectorInstance *c = positions.getReady();
244  int seq = c->seq;
245  CmiAssert(c->status == IN_PROCESS);
246  double mem = memusage_MB();
247  positions.removeFirstReady();
248  c->free();
249  posOutTime = CmiWallTimer()-posOutTime;
250  if ( posTimings ) {
251  CkPrintf("The last position output (seq=%d) takes %.3f seconds(file I/O: %.3f secs), %.3f MB of memory in use\n", seq, posOutTime, posIOTime, mem);
252  --posTimings;
253  }
254 
255  //Actually the c->status doesn't need to be checked because it is
256  //certain that the new ready one will not be in IN_PROCESS status
257  checkPosReady();
258 #endif
259 }
260 
262 #ifdef MEM_OPT_VERSION
263 
264  if(totalT > velIOTime) velIOTime = totalT;
265 
266 #if OUTPUT_SINGLE_FILE
267  if(++velDoneCnt < Node::Object()->simParameters->numoutputwrts) return;
268 #else
269  if(++velDoneCnt < Node::Object()->simParameters->numoutputprocs) return;
270 #endif
271 
272  velDoneCnt = 0;
273 
274  //retrieve the last ready instance
275  CollectVectorInstance *c = velocities.getReady();
276  int seq = c->seq;
277  CmiAssert(c->status == IN_PROCESS);
278  double mem = memusage_MB();
279  velocities.removeFirstReady();
280  c->free();
281  velOutTime = CmiWallTimer()-velOutTime;
282  if ( velTimings ) {
283  CkPrintf("The last velocity output (seq=%d) takes %.3f seconds(file I/O: %.3f secs), %.3f MB of memory in use\n", seq, velOutTime, velIOTime, mem);
284  --velTimings;
285  }
286 
287  //Actually the c->status doesn't need to be checked because it is
288  //certain that the new ready one will not be in IN_PROCESS status
289  checkVelReady();
290 #endif
291 }
292 
294 #ifdef MEM_OPT_VERSION
295 
296  if(totalT > forceIOTime) forceIOTime = totalT;
297 
298 #if OUTPUT_SINGLE_FILE
299  if(++forceDoneCnt < Node::Object()->simParameters->numoutputwrts) return;
300 #else
301  if(++forceDoneCnt < Node::Object()->simParameters->numoutputprocs) return;
302 #endif
303 
304  forceDoneCnt = 0;
305 
306  //retrieve the last ready instance
307  CollectVectorInstance *c = forces.getReady();
308  int seq = c->seq;
309  CmiAssert(c->status == IN_PROCESS);
310  double mem = memusage_MB();
311  forces.removeFirstReady();
312  c->free();
313  forceOutTime = CmiWallTimer()-forceOutTime;
314  if ( forceTimings ) {
315  CkPrintf("The last force output (seq=%d) takes %.3f seconds(file I/O: %.3f secs), %.3f MB of memory in use\n", seq, forceOutTime, forceIOTime, mem);
316  --forceTimings;
317  }
318 
319  //Actually the c->status doesn't need to be checked because it is
320  //certain that the new ready one will not be in IN_PROCESS status
321  checkForceReady();
322 #endif
323 }
324 
325 
327 #ifdef MEM_OPT_VERSION
328  if(++wrapCoorDoneCnt == Node::Object()->simParameters->numoutputprocs){
329  wrapCoorDoneCnt = 0;
330 
331  //count the wrapping-coor time into master writing time
332  posIOTime = CmiWallTimer()-posOutTime;
333 
334  //it's ready to output positions
335  CollectVectorInstance *c = positions.getReady();
336 
337  CProxy_ParallelIOMgr io(CkpvAccess(BOCclass_group).ioMgr);
338  ParallelIOMgr *ioMgr = io.ckLocalBranch();
339 
340 #if OUTPUT_SINGLE_FILE
341  //notify output procs to do Token based output
342  int grpsize = ioMgr->numOutputProcs / ioMgr->numOutputWrts;
343  int remains = ioMgr->numOutputProcs % ioMgr->numOutputWrts;
344  int outrank = 0;
345  int i;
346  for(i=0; i<remains; i++){
347  io[ioMgr->outputProcArray[outrank]].disposePositions(c->seq, posIOTime);
348  outrank += (grpsize+1);
349  }
350  for(; i<ioMgr->numOutputWrts; i++){
351  io[ioMgr->outputProcArray[outrank]].disposePositions(c->seq, posIOTime);
352  outrank += grpsize;
353  }
354 #else
355  //output multiple files
356  for(int i=0; i<ioMgr->numOutputProcs; i++) {
357  io[ioMgr->outputProcArray[i]].disposePositions(c->seq, posIOTime);
358  }
359 #endif
360 
361  }
362 #endif
363 }
364 
365 #ifdef MEM_OPT_VERSION
366 void CollectionMaster::checkPosReady(){
367  CollectVectorInstance *c;
368  if((c = positions.getReady())){
369  if(c->status == IN_PROCESS){
370  //indicating in the process of outputing coordinates
371  return;
372  }
373  c->status = IN_PROCESS;
374 
375  posOutTime = CmiWallTimer();
377  CProxy_ParallelIOMgr io(CkpvAccess(BOCclass_group).ioMgr);
378  ParallelIOMgr *ioMgr = io.ckLocalBranch();
379  if(simParam->wrapAll || simParam->wrapWater){
380  for(int i=0; i<ioMgr->numOutputProcs; i++){
381  io[ioMgr->outputProcArray[i]].wrapCoor(c->seq, c->lattice);
382  }
383  //write the header to overlap with the computation of
384  //wrapping coordinates
385  parOut->coordinateMaster(c->seq,Node::Object()->molecule->numAtoms,c->lattice);
386  }else{
387  //write the header
388  parOut->coordinateMaster(c->seq,Node::Object()->molecule->numAtoms,c->lattice);
389  posIOTime = CmiWallTimer() - posOutTime;
390 
391  #if OUTPUT_SINGLE_FILE
392  int grpsize = ioMgr->numOutputProcs / ioMgr->numOutputWrts;
393  int remains = ioMgr->numOutputProcs % ioMgr->numOutputWrts;
394  int outrank = 0;
395  int i;
396  for(i=0; i<remains; i++){
397  io[ioMgr->outputProcArray[outrank]].disposePositions(c->seq, posIOTime);
398  outrank += (grpsize+1);
399  }
400  for(; i<ioMgr->numOutputWrts; i++){
401  io[ioMgr->outputProcArray[outrank]].disposePositions(c->seq, posIOTime);
402  outrank += grpsize;
403  }
404  #else
405  //output multiple files
406  for(int i=0; i<ioMgr->numOutputProcs; i++) {
407  io[ioMgr->outputProcArray[i]].disposePositions(c->seq, posIOTime);
408  }
409  #endif
410  }
411  //this instance c is freed in the next round of output invocation.
412  }
413 }
414 
415 void CollectionMaster::checkVelReady(){
416  CollectVectorInstance *c;
417  if((c = velocities.getReady())){
418  if(c->status == IN_PROCESS){
419  //indicating in the process of outputing velocities
420  return;
421  }
422 
423  c->status = IN_PROCESS;
424 
425  velOutTime = CmiWallTimer();
426  //write the header
427  parOut->velocityMaster(c->seq, Node::Object()->molecule->numAtoms);
428  velIOTime = CmiWallTimer() - velOutTime;
429 
430  //notify output procs to do Token based output
431  CProxy_ParallelIOMgr io(CkpvAccess(BOCclass_group).ioMgr);
432  ParallelIOMgr *ioMgr = io.ckLocalBranch();
433 
434  #if OUTPUT_SINGLE_FILE
435  int grpsize = ioMgr->numOutputProcs / ioMgr->numOutputWrts;
436  int remains = ioMgr->numOutputProcs % ioMgr->numOutputWrts;
437  int outrank = 0;
438  int i;
439  for(i=0; i<remains; i++){
440  io[ioMgr->outputProcArray[outrank]].disposeVelocities(c->seq, velIOTime);
441  outrank += (grpsize+1);
442  }
443  for(; i<ioMgr->numOutputWrts; i++){
444  io[ioMgr->outputProcArray[outrank]].disposeVelocities(c->seq, velIOTime);
445  outrank += grpsize;
446  }
447  #else
448  //output multiple files
449  for(int i=0; i<ioMgr->numOutputProcs; i++) {
450  io[ioMgr->outputProcArray[i]].disposeVelocities(c->seq, velIOTime);
451  }
452  #endif
453  //this instance c is freed in the next round of output invocation.
454  }
455 }
456 
457 void CollectionMaster::checkForceReady(){
458  CollectVectorInstance *c;
459  if((c = forces.getReady())){
460  if(c->status == IN_PROCESS){
461  //indicating in the process of outputing forces
462  return;
463  }
464 
465  c->status = IN_PROCESS;
466 
467  forceOutTime = CmiWallTimer();
468  //write the header
469  parOut->forceMaster(c->seq, Node::Object()->molecule->numAtoms);
470  forceIOTime = CmiWallTimer() - forceOutTime;
471 
472  //notify output procs to do Token based output
473  CProxy_ParallelIOMgr io(CkpvAccess(BOCclass_group).ioMgr);
474  ParallelIOMgr *ioMgr = io.ckLocalBranch();
475 
476  #if OUTPUT_SINGLE_FILE
477  int grpsize = ioMgr->numOutputProcs / ioMgr->numOutputWrts;
478  int remains = ioMgr->numOutputProcs % ioMgr->numOutputWrts;
479  int outrank = 0;
480  int i;
481  for(i=0; i<remains; i++){
482  io[ioMgr->outputProcArray[outrank]].disposeForces(c->seq, forceIOTime);
483  outrank += (grpsize+1);
484  }
485  for(; i<ioMgr->numOutputWrts; i++){
486  io[ioMgr->outputProcArray[outrank]].disposeForces(c->seq, forceIOTime);
487  outrank += grpsize;
488  }
489  #else
490  //output multiple files
491  for(int i=0; i<ioMgr->numOutputProcs; i++) {
492  io[ioMgr->outputProcArray[i]].disposeForces(c->seq, forceIOTime);
493  }
494  #endif
495  //this instance c is freed in the next round of output invocation.
496  }
497 }
498 
499 
500 void CollectionMidMaster::disposePositions(int seq)
501 {
502  CollectMidVectorInstance *c = positions.getReady(seq);
503  CmiAssert(c!=NULL);
504  parOut->coordinateSlave(seq,c->fromAtomID,c->toAtomID,
505  c->data.begin(),c->fdata.begin());
506  c->free();
507 }
508 
509 void CollectionMidMaster::disposeVelocities(int seq)
510 {
511  CollectMidVectorInstance *c = velocities.getReady(seq);
512  CmiAssert(c!=NULL);
513  parOut->velocitySlave(seq,c->fromAtomID,c->toAtomID,c->data.begin());
514  c->free();
515 }
516 
517 void CollectionMidMaster::disposeForces(int seq)
518 {
519  CollectMidVectorInstance *c = forces.getReady(seq);
520  CmiAssert(c!=NULL);
521  parOut->forceSlave(seq,c->fromAtomID,c->toAtomID,c->data.begin());
522  c->free();
523 }
524 #endif
525 
526 #include "CollectionMaster.def.h"
527 
static Node * Object()
Definition: Node.h:86
CollectVectorInstance * removeReady(void)
void disposeForces(CollectVectorInstance *c)
void receiveVelocities(CollectVectorMsg *msg)
Definition: Vector.h:64
Output * output
Definition: Node.h:182
SimParameters * simParameters
Definition: Node.h:178
void receivePositions(CollectVectorMsg *msg)
static __thread float4 * forces
#define DebugM(x, y)
Definition: Debug.h:59
void enqueueVelocities(int seq)
void receiveOutputPosReady(int seq)
void submitData(CollectVectorMsg *msg, int max_index)
void enqueue(int seq, Lattice &lattice)
void coordinate(int, int, Vector *, FloatVector *, Lattice &)
Definition: Output.C:277
char auxFilename[NAMD_FILENAME_BUFFER_SIZE]
void startNextRoundOutputPos(double totalT)
#define PACK_MSG(MSGTYPE, MSGDATA)
Definition: packmsg.h:35
void enqueuePositions(int seq, Lattice &lattice)
void wrapCoor(int seq, Lattice lat)
void receiveDataStream(DataStreamMsg *msg)
double memusage_MB()
Definition: memusage.h:13
void receiveOutputVelReady(int seq)
void disposePositions(CollectVectorInstance *c)
void velocity(int, int, Vector *)
Definition: Output.C:396
void startNextRoundOutputVel(double totalT)
int numAtoms
Definition: Molecule.h:557
void NAMD_die(const char *err_msg)
Definition: common.C:85
void receiveForces(CollectVectorMsg *msg)
void disposeVelocities(CollectVectorInstance *c)
void NAMD_backup_file(const char *filename, const char *extension)
Definition: common.C:167
#define PACK_RESIZE(DATA)
Definition: packmsg.h:125
ResizeArray< char > data
void dummy()
void startNextRoundOutputForce(double totalT)
void enqueueForces(int seq)
int size(void) const
Definition: ResizeArray.h:127
Molecule * molecule
Definition: Node.h:176
void force(int, int, Vector *)
Definition: Output.C:486
void receiveOutputForceReady(int seq)
iterator begin(void)
Definition: ResizeArray.h:36