NAMD
CollectionMaster.h
Go to the documentation of this file.
1 
7 #ifndef COLLECTIONMASTER_H
8 #define COLLECTIONMASTER_H
9 
10 #include "charm++.h"
11 #include "main.h"
12 #include "NamdTypes.h"
13 #include "Lattice.h"
14 #include "ProcessorPrivate.h"
15 #include "PatchMap.h"
16 #include "PatchMap.inl"
17 #include "CollectionMaster.decl.h"
18 #include <stdio.h>
19 
20 #include "Node.h"
21 #include "ParallelIOMgr.decl.h"
22 #include "ParallelIOMgr.h"
23 #include "Output.h"
24 
25 #include "PatchData.h"
26 
27 class CollectVectorMsg : public CMessage_CollectVectorMsg
28 {
29 public:
30 
31  int seq;
32  int aid_size;
33  int data_size;
35  int index;
39 };
40 
41 class DataStreamMsg;
42 
43 class CollectionMaster : public Chare
44 {
45 public:
46  static CollectionMaster *Object() {
47  return CkpvAccess(CollectionMaster_instance);
48  }
50  ~CollectionMaster(void);
51 
56 
58 
59  void enqueuePositions(int seq, Lattice &lattice);
60  void enqueuePositionsDcdSelection(int seq, Lattice &lattice);
61  void enqueueVelocities(int seq);
62  void enqueueForces(int seq);
63 
64  class CollectVectorInstance;
65  void disposePositions(CollectVectorInstance *c);
66  void disposeVelocities(CollectVectorInstance *c);
67  void disposeForces(CollectVectorInstance *c);
68 
69  void blockPositions() { positions.block(); }
70  void unblockPositions() { positions.unblock(); }
71 
73  void receiveOutputPosReady(int seq);
74  void receiveOutputVelReady(int seq);
75  void receiveOutputForceReady(int seq);
76  //totalT is the time taken to do file I/O for each output workflow -Chao Mei
77  void startNextRoundOutputPos(double totalT);
78  void startNextRoundOutputVel(double totalT);
79  void startNextRoundOutputForce(double totalT);
80 
81  void wrapCoorFinished();
82 
85 #ifdef MEM_OPT_VERSION
86  class CollectVectorInstance
87  {
88  public:
89  CollectVectorInstance(void) : seq(-10) { ; }
90 
91  CollectVectorInstance(int s) { reset(s); }
92 
93  void free() { seq = -10; status = HAS_PROCESSED; }
94  int notfree() { return ( seq != -10 ); }
95 
96  void reset(int s) {
97  if ( s == -10 ) NAMD_bug("seq == free in CollectionMaster");
98  seq = s;
99  CProxy_ParallelIOMgr io(CkpvAccess(BOCclass_group).ioMgr);
100  ParallelIOMgr *ioMgr = io.ckLocalBranch();
101  remaining = ioMgr->getNumOutputProcs();
102  status = NOT_PROCESSED;
103  }
104 
105  void append(){ --remaining; }
106 
107  int ready(void) { return ( ! remaining ); }
108 
109  int seq;
111  //mainly used for tracking the progress of wrap_coor operation
112  //the write to files will not happen until the wrap_coor is finished,
113  //and the CollectVectorInstance is freed. -Chao Mei
114  OperationStatus status;
115  private:
116  int remaining;
117 
118  }; //end of declaration for CollectionMaster::CollectVectorInstance
119 
120  class CollectVectorSequence
121  {
122  public:
123 
124  void submitData(int seq){
125  CollectVectorInstance **c = data.begin();
126  CollectVectorInstance **c_e = data.end();
127  for( ; c != c_e && (*c)->seq != seq; ++c );
128  if ( c == c_e )
129  {
130  c = data.begin();
131  for( ; c != c_e && (*c)->notfree(); ++c );
132  if ( c == c_e ) {
133  data.add(new CollectVectorInstance(seq));
134  c = data.end() - 1;
135  }
136  (*c)->reset(seq);
137  }
138  (*c)->append();
139  }
140 
141  void enqueue(int seq, Lattice &lattice) {
142  queue.add(seq);
143  latqueue.add(lattice);
144  }
145 
146  CollectVectorInstance* removeReady(void)
147  {
148  //it is equal to
149  //if(getReady()) removeFirstReady();
150  //But it should be not used in memory
151  //optimized version as the ready instance
152  //is delayed to be freed at the start of
153  //next round of output -Chao Mei
154  return NULL;
155  }
156  void block() { ; } // unimplemented
157  void unblock() { ; } // unimplemented
158 
159  //only get the ready instance, not remove their info
160  //from timestep queue and lattice queue
161  CollectVectorInstance* getReady(void)
162  {
163  CollectVectorInstance *o = 0;
164  if ( queue.size() )
165  {
166  int seq = queue[0];
167  CollectVectorInstance **c = data.begin();
168  CollectVectorInstance **c_e = data.end();
169  for( ; c != c_e && (*c)->seq != seq; ++c );
170  if ( c != c_e && (*c)->ready() )
171  {
172  o = *c;
173  o->lattice = latqueue[0];
174  }
175  }
176  return o;
177  }
178 
179  //the function is intended to be used after "getReady"
180  //to remove the info regarding the timestep and lattice.
181  //So, it removes the front ready one. -Chao Mei
182  int removeFirstReady(){
183  int seq = queue[0];
184  queue.del(0,1);
185  latqueue.del(0,1);
186  return seq;
187  }
188 
192  }; //end of declaration for CollectionMaster::CollectVectorSequence
193 #else
195  {
196  public:
197 
198  CollectVectorInstance(void) : seq(-10) { ; }
199 
201 
202  void free() { seq = -10; }
203  int notfree() { return ( seq != -10 ); }
204 
205  void reset(int s) {
206  if ( s == -10 ) NAMD_bug("seq == free in CollectionMaster");
207  seq = s;
208  remaining = (PatchMap::Object())->numNodesWithPatches();
209  data.resize(0);
210  fdata.resize(0);
211  }
212 
213  // true -> send it and delete it!
214  void append(CollectVectorMsg *msg, int max_index)
215  {
216  AtomID *a = msg->aid;
217  Vector *d = msg->data;
218  FloatVector *fd = msg->fdata;
219  int size = msg->aid_size;
220  if ( msg->data_size ) {
221  data.resize(max_index);
222  Vector *ptr = data.begin();
223  for( int i = 0; i < size; ++i ) { ptr[a[i]] = d[i]; }
224  }
225  if ( msg->fdata_size ) {
226  fdata.resize(max_index);
227  FloatVector *ptr = fdata.begin();
228  for( int i = 0; i < size; ++i ) { ptr[a[i]] = fd[i]; }
229  }
230  --remaining;
231  }
232 
233  // true -> send it and delete it!
234  int ready(void) { return ( ! remaining ); }
235 
236  int seq;
238 
241 
243 
244  };
245 
247  {
248  public:
251  void append(CollectVectorMsg *msg, int max_index)
252  {
253  AtomID *a = msg->aid;
254  Vector *d = msg->data;
255  FloatVector *fd = msg->fdata;
256  int size = msg->aid_size;
257  if ( msg->data_size ) {
258  data.resize(max_index);
259  Vector *ptr = data.begin();
260  for( int i = 0; i < size; ++i ) { ptr[Node::Object()->molecule->get_dcd_selection_index_from_atom_id(msg->index,a[i])] = d[i]; }
261  }
262  if ( msg->fdata_size ) {
263  fdata.resize(max_index);
264  FloatVector *ptr = fdata.begin();
265  for( int i = 0; i < size; ++i ) { ptr[Node::Object()->molecule->get_dcd_selection_index_from_atom_id(msg->index,a[i])] = fd[i]; }
266  }
267  --remaining;
268  }
269 
270  };
271 
273  {
274  public:
275 
276  void submitData(CollectVectorMsg *msg, int max_index)
277  {
278  int seq = msg->seq;
279  CollectVectorInstance **c = data.begin();
280  CollectVectorInstance **c_e = data.end();
281  for( ; c != c_e && (*c)->seq != seq; ++c );
282  if ( c == c_e )
283  {
284  c = data.begin();
285  for( ; c != c_e && (*c)->notfree(); ++c );
286  if ( c == c_e ) {
287  data.add(new CollectVectorInstance(seq));
288  c = data.end() - 1;
289  }
290  (*c)->reset(seq);
291  }
292  (*c)->append(msg, max_index);
293  }
294 
295  void enqueue(int seq, Lattice &lattice) {
296  queue.add(seq);
297  latqueue.add(lattice);
298  }
299 
301  {
302  CollectVectorInstance *o = 0;
303  if ( queue.size() && ! blocked )
304  {
305  int seq = queue[0];
306  CollectVectorInstance **c = data.begin();
307  CollectVectorInstance **c_e = data.end();
308  for( ; c != c_e && (*c)->seq != seq; ++c );
309  if ( c != c_e && (*c)->ready() )
310  {
311  o = *c;
312  o->lattice = latqueue[0];
313  queue.del(0,1);
314  latqueue.del(0,1);
315  }
316  }
317  return o;
318  }
319 
320  void block() { blocked = 1; }
321  void unblock() { blocked = 0; }
323 
324 
327  int blocked;
328  private:
330  };
331 
333  {
334  public:
335  void submitData(CollectVectorMsg *msg, int max_index)
336  {
337  int seq = msg->seq;
340  for( ; c != c_e && (*c)->seq != seq; ++c );
341  if ( c == c_e )
342  {
343  c = data.begin();
344  for( ; c != c_e && (*c)->notfree(); ++c );
345  if ( c == c_e ) {
347  c = data.end() - 1;
348  }
349  (*c)->reset(seq);
350  }
351  (*c)->append(msg, max_index);
352  }
354  {
356  if ( queue.size() && ! blocked )
357  {
358  int seq = queue[0];
361  for( ; c != c_e && (*c)->seq != seq; ++c );
362  if ( c != c_e && (*c)->ready() )
363  {
364  o = *c;
365  o->lattice = latqueue[0];
366  queue.del(0,1);
367  latqueue.del(0,1);
368  }
369  }
370  return o;
371  }
372 
374  CollectVectorSequenceDcdSelection(int _index) : index(_index), blocked(0) { ; }
376  int index;
377  int blocked;
378  };
379 #endif
380 
381 #ifdef NODEGROUP_FORCE_REGISTER
382  void lock() { CmiLock(cmlock); }
383  void unlock() { CmiUnlock(cmlock); }
384 #endif
385 
386 private:
387 #ifdef NODEGROUP_FORCE_REGISTER
388  CmiNodeLock cmlock; // mutex to prevent output collisions for single gpu code
389 #endif
390 
391  CollectVectorSequence positions;
392  CollectVectorSequence velocities;
393  CollectVectorSequence forces;
394  int posTimings, velTimings, forceTimings;
395  FILE *dataStreamFile;
396 
397 #ifdef MEM_OPT_VERSION
398  int wrapCoorDoneCnt;
399  ParOutput *parOut;
400  double posOutTime; //record the output time
401  double velOutTime; //record the output time
402  double forceOutTime; //record the output time
403  double posIOTime; //record the max time spent on real file IO for one output
404  double velIOTime; //record the max time spent on real file IO for one output
405  double forceIOTime; //record the max time spent on real file IO for one output
406 
407  //for the sake of simultaneous writing to the same file
408  int posDoneCnt;
409  int velDoneCnt;
410  int forceDoneCnt;
411 
412  void checkPosReady();
413  void checkVelReady();
414  void checkForceReady();
415 #else
416  CollectVectorSequenceDcdSelection positionsDcdSelection;
417 #endif
418 };
419 
420 class DataStreamMsg : public CMessage_DataStreamMsg {
421 public:
422 
424 
425  static void* pack(DataStreamMsg* msg);
426  static DataStreamMsg* unpack(void *ptr);
427 
428 };
429 
430 //Use varsize message to be more SMP safe
431 class CollectVectorVarMsg : public CMessage_CollectVectorVarMsg
432 {
433 public:
435 public:
436  int seq;
437  int size;
442 };
443 
444 #ifdef MEM_OPT_VERSION
445 class CollectMidVectorInstance{
446  public:
447 
448  CollectMidVectorInstance(void) : seq(-10) {
449  CProxy_ParallelIOMgr io(CkpvAccess(BOCclass_group).ioMgr);
450  ParallelIOMgr *ioMgr = io.ckLocalBranch();
451  ioMgr->getMyAtomsRangeOnOutput(fromAtomID, toAtomID);
452  }
453 
454  CollectMidVectorInstance(int s) {
455  CProxy_ParallelIOMgr io(CkpvAccess(BOCclass_group).ioMgr);
456  ParallelIOMgr *ioMgr = io.ckLocalBranch();
457  ioMgr->getMyAtomsRangeOnOutput(fromAtomID, toAtomID);
458  reset(s);
459  }
460 
461  void free() { seq = -10; }
462  int notfree() { return ( seq != -10 ); }
463 
464  void reset(int s) {
465  if ( s == -10 ) NAMD_bug("seq == free in CollectionMidMaster");
466  seq = s;
467  remaining = toAtomID-fromAtomID+1;
468  data.resize(0);
469  fdata.resize(0);
470  }
471 
472  // if 1 is returned, indicates all the expected atoms for a
473  // timestep have been received
474  int append(int size, AtomID *a, Vector *d, FloatVector *fd){
475  if (d) {
476  if ( size ) data.resize(toAtomID-fromAtomID+1);
477  Vector *ptr = data.begin();
478  for(int i = 0; i < size; ++i) { ptr[a[i]-fromAtomID] = d[i]; }
479  }
480  if (fd) {
481  if ( size ) fdata.resize(toAtomID-fromAtomID+1);
482  FloatVector *ptr = fdata.begin();
483  for(int i = 0; i < size; ++i) { ptr[a[i]-fromAtomID] = fd[i]; }
484  }
485  remaining -= size;
486 
487  return ready();
488  }
489 
490  int ready(void) { return ( ! remaining ); }
491 
492  int seq;
493  Lattice lattice;
494  ResizeArray<Vector> data;
496 
497  //indicates the range of atoms this object is responsible for collecting
498  int fromAtomID;
499  int toAtomID;
500 
501  private:
502  int remaining;
503 };//end of declaration for CollectMidVectorInstance
504 
505 
506 //An object of this class will be a member of Parallel IO Manager
507 //It is responsible to buffer the position/coordinates data that is
508 //going to be written to file system.
509 //In particular, the instance of this class will be on output procs.
510 //It will communicate via parallel IO manager with the CollectionMaster
511 //object on PE0 to be notified on when and what to write --Chao Mei
512 class CollectionMidMaster{
513 public:
514 
515  CollectionMidMaster(ParallelIOMgr *pIO_) : pIO(pIO_) { parOut = new ParOutput(pIO_->myOutputRank); }
516  ~CollectionMidMaster(void) { delete parOut; }
517 
518  int receivePositions(CollectVectorVarMsg *msg) {return positions.submitData(msg);}
519  int receiveVelocities(CollectVectorVarMsg *msg) {return velocities.submitData(msg);}
520  int receiveForces(CollectVectorVarMsg *msg) {return forces.submitData(msg);}
521 
522  void disposePositions(int seq);
523  void disposeVelocities(int seq);
524  void disposeForces(int seq);
525 
526  CollectMidVectorInstance *getReadyPositions(int seq) { return positions.getReady(seq); }
527 
528  //containing an array of CollectVectorInstance and their corresponding
529  //timestep value and lattice value
530  class CollectVectorSequence{
531  public:
532  int submitData(CollectVectorVarMsg *msg){
533  int seq = msg->seq;
534  CollectMidVectorInstance **c = data.begin();
535  CollectMidVectorInstance **c_e = data.end();
536  for( ; c != c_e && (*c)->seq != seq; ++c );
537  if ( c == c_e ){
538  c = data.begin();
539  for( ; c != c_e && (*c)->notfree(); ++c );
540  if ( c == c_e ) {
541  data.add(new CollectMidVectorInstance(seq));
542  c = data.end() - 1;
543  }
544  (*c)->reset(seq);
545  }
546  AtomID *i = msg->aid;
547  Vector *d = msg->data;
548  FloatVector *fd = msg->fdata;
550  fd = NULL;
552  d = NULL;
553  }
554  return (*c)->append(msg->size,i,d,fd);
555  }
556 
557  CollectMidVectorInstance* getReady(int seq){
558  CollectMidVectorInstance **c = data.begin();
559  CollectMidVectorInstance **c_e = data.end();
560  for( ; c != c_e && (*c)->seq != seq; ++c );
561  CmiAssert(c != c_e);
562  return *c;
563  }
564 
566  };//end of declaration for CollectionMidMaster::CollectVectorSequence
567 
568 private:
569  CollectVectorSequence positions;
570  CollectVectorSequence velocities;
571  CollectVectorSequence forces;
572  ParallelIOMgr *pIO;
573  ParOutput *parOut;
574 }; //end of declaration for CollectionMidMaster
575 #endif
576 
577 #endif
578 
static Node * Object()
Definition: Node.h:86
CollectVectorInstance * removeReady(void)
int size(void) const
Definition: ResizeArray.h:131
ResizeArray< CollectVectorInstanceDcdSelection * > data
int getNumOutputProcs()
static PatchMap * Object()
Definition: PatchMap.h:27
void disposeForces(CollectVectorInstance *c)
void receiveVelocities(CollectVectorMsg *msg)
Definition: Vector.h:72
void receivePositions(CollectVectorMsg *msg)
void receivePositionsDcdSelection(CollectVectorMsg *msg)
static DataStreamMsg * unpack(void *ptr)
void enqueueVelocities(int seq)
void receiveOutputPosReady(int seq)
void submitData(CollectVectorMsg *msg, int max_index)
void enqueue(int seq, Lattice &lattice)
void startNextRoundOutputPos(double totalT)
void append(CollectVectorMsg *msg, int max_index)
int add(const Elem &elem)
Definition: ResizeArray.h:101
void enqueuePositions(int seq, Lattice &lattice)
void enqueuePositionsDcdSelection(int seq, Lattice &lattice)
void receiveDataStream(DataStreamMsg *msg)
void resize(int i)
Definition: ResizeArray.h:84
static void * pack(DataStreamMsg *msg)
void receiveOutputVelReady(int seq)
void NAMD_bug(const char *err_msg)
Definition: common.C:195
void append(CollectVectorMsg *msg, int max_index)
void disposePositions(CollectVectorInstance *c)
int get_dcd_selection_index_from_atom_id(const int index, const int atomIndex)
Definition: Molecule.h:881
void submitData(CollectVectorMsg *msg, int max_index)
void startNextRoundOutputVel(double totalT)
void receiveForces(CollectVectorMsg *msg)
void disposeVelocities(CollectVectorInstance *c)
static CollectionMaster * Object()
CollectVectorInstanceDcdSelection * removeReady(void)
ResizeArray< char > data
iterator begin(void)
Definition: ResizeArray.h:36
void startNextRoundOutputForce(double totalT)
int32 AtomID
Definition: NamdTypes.h:35
void del(int index, int num=1)
Definition: ResizeArray.h:108
void enqueueForces(int seq)
Molecule * molecule
Definition: Node.h:179
FloatVector * fdata
void receiveOutputForceReady(int seq)