NAMD
CollectionMaster.h
Go to the documentation of this file.
1 
7 #ifndef COLLECTIONMASTER_H
8 #define COLLECTIONMASTER_H
9 
10 #include "charm++.h"
11 #include "main.h"
12 #include "NamdTypes.h"
13 #include "Lattice.h"
14 #include "ProcessorPrivate.h"
15 #include "PatchMap.h"
16 #include "PatchMap.inl"
17 #include "CollectionMaster.decl.h"
18 #include <stdio.h>
19 
20 #include "Node.h"
21 #include "ParallelIOMgr.decl.h"
22 #include "ParallelIOMgr.h"
23 #include "Output.h"
24 
25 class CollectVectorMsg : public CMessage_CollectVectorMsg
26 {
27 public:
28 
29  int seq;
30  int aid_size;
31  int data_size;
36 };
37 
38 class DataStreamMsg;
39 
40 class CollectionMaster : public Chare
41 {
42 public:
43  static CollectionMaster *Object() {
44  return CkpvAccess(CollectionMaster_instance);
45  }
47  ~CollectionMaster(void);
48 
52 
54 
55  void enqueuePositions(int seq, Lattice &lattice);
56  void enqueueVelocities(int seq);
57  void enqueueForces(int seq);
58 
59  class CollectVectorInstance;
60  void disposePositions(CollectVectorInstance *c);
61  void disposeVelocities(CollectVectorInstance *c);
62  void disposeForces(CollectVectorInstance *c);
63 
64  void blockPositions() { positions.block(); }
65  void unblockPositions() { positions.unblock(); }
66 
68  void receiveOutputPosReady(int seq);
69  void receiveOutputVelReady(int seq);
70  void receiveOutputForceReady(int seq);
71  //totalT is the time taken to do file I/O for each output workflow -Chao Mei
72  void startNextRoundOutputPos(double totalT);
73  void startNextRoundOutputVel(double totalT);
74  void startNextRoundOutputForce(double totalT);
75 
76  void wrapCoorFinished();
77 
80 #ifdef MEM_OPT_VERSION
81  class CollectVectorInstance
82  {
83  public:
84  CollectVectorInstance(void) : seq(-10) { ; }
85 
86  CollectVectorInstance(int s) { reset(s); }
87 
88  void free() { seq = -10; status = HAS_PROCESSED; }
89  int notfree() { return ( seq != -10 ); }
90 
91  void reset(int s) {
92  if ( s == -10 ) NAMD_bug("seq == free in CollectionMaster");
93  seq = s;
94  CProxy_ParallelIOMgr io(CkpvAccess(BOCclass_group).ioMgr);
95  ParallelIOMgr *ioMgr = io.ckLocalBranch();
96  remaining = ioMgr->getNumOutputProcs();
97  status = NOT_PROCESSED;
98  }
99 
100  void append(){ --remaining; }
101 
102  int ready(void) { return ( ! remaining ); }
103 
104  int seq;
106  //mainly used for tracking the progress of wrap_coor operation
107  //the write to files will not happen until the wrap_coor is finished,
108  //and the CollectVectorInstance is freed. -Chao Mei
109  OperationStatus status;
110  private:
111  int remaining;
112 
113  }; //end of declaration for CollectionMaster::CollectVectorInstance
114 
115  class CollectVectorSequence
116  {
117  public:
118 
119  void submitData(int seq){
120  CollectVectorInstance **c = data.begin();
121  CollectVectorInstance **c_e = data.end();
122  for( ; c != c_e && (*c)->seq != seq; ++c );
123  if ( c == c_e )
124  {
125  c = data.begin();
126  for( ; c != c_e && (*c)->notfree(); ++c );
127  if ( c == c_e ) {
128  data.add(new CollectVectorInstance(seq));
129  c = data.end() - 1;
130  }
131  (*c)->reset(seq);
132  }
133  (*c)->append();
134  }
135 
136  void enqueue(int seq, Lattice &lattice) {
137  queue.add(seq);
138  latqueue.add(lattice);
139  }
140 
141  CollectVectorInstance* removeReady(void)
142  {
143  //it is equal to
144  //if(getReady()) removeFirstReady();
145  //But it should be not used in memory
146  //optimized version as the ready instance
147  //is delayed to be freed at the start of
148  //next round of output -Chao Mei
149  return NULL;
150  }
151  void block() { ; } // unimplemented
152  void unblock() { ; } // unimplemented
153 
154  //only get the ready instance, not remove their info
155  //from timestep queue and lattice queue
156  CollectVectorInstance* getReady(void)
157  {
158  CollectVectorInstance *o = 0;
159  if ( queue.size() )
160  {
161  int seq = queue[0];
162  CollectVectorInstance **c = data.begin();
163  CollectVectorInstance **c_e = data.end();
164  for( ; c != c_e && (*c)->seq != seq; ++c );
165  if ( c != c_e && (*c)->ready() )
166  {
167  o = *c;
168  o->lattice = latqueue[0];
169  }
170  }
171  return o;
172  }
173 
174  //the function is intended to be used after "getReady"
175  //to remove the info regarding the timestep and lattice.
176  //So, it removes the front ready one. -Chao Mei
177  int removeFirstReady(){
178  int seq = queue[0];
179  queue.del(0,1);
180  latqueue.del(0,1);
181  return seq;
182  }
183 
187  }; //end of declaration for CollectionMaster::CollectVectorSequence
188 #else
190  {
191  public:
192 
193  CollectVectorInstance(void) : seq(-10) { ; }
194 
196 
197  void free() { seq = -10; }
198  int notfree() { return ( seq != -10 ); }
199 
200  void reset(int s) {
201  if ( s == -10 ) NAMD_bug("seq == free in CollectionMaster");
202  seq = s;
203  remaining = (PatchMap::Object())->numNodesWithPatches();
204  data.resize(0);
205  fdata.resize(0);
206  }
207 
208  // true -> send it and delete it!
209  void append(CollectVectorMsg *msg, int max_index)
210  {
211  AtomID *a = msg->aid;
212  Vector *d = msg->data;
213  FloatVector *fd = msg->fdata;
214  int size = msg->aid_size;
215  if ( msg->data_size ) {
216  data.resize(max_index);
217  Vector *ptr = data.begin();
218  for( int i = 0; i < size; ++i ) { ptr[a[i]] = d[i]; }
219  }
220  if ( msg->fdata_size ) {
221  fdata.resize(max_index);
222  FloatVector *ptr = fdata.begin();
223  for( int i = 0; i < size; ++i ) { ptr[a[i]] = fd[i]; }
224  }
225  --remaining;
226  }
227 
228  int ready(void) { return ( ! remaining ); }
229 
230  int seq;
232 
235 
236  private:
237  int remaining;
238 
239  };
240 
242  {
243  public:
244 
245  void submitData(CollectVectorMsg *msg, int max_index)
246  {
247  int seq = msg->seq;
248  CollectVectorInstance **c = data.begin();
249  CollectVectorInstance **c_e = data.end();
250  for( ; c != c_e && (*c)->seq != seq; ++c );
251  if ( c == c_e )
252  {
253  c = data.begin();
254  for( ; c != c_e && (*c)->notfree(); ++c );
255  if ( c == c_e ) {
256  data.add(new CollectVectorInstance(seq));
257  c = data.end() - 1;
258  }
259  (*c)->reset(seq);
260  }
261  (*c)->append(msg, max_index);
262  }
263 
264  void enqueue(int seq, Lattice &lattice) {
265  queue.add(seq);
266  latqueue.add(lattice);
267  }
268 
270  {
271  CollectVectorInstance *o = 0;
272  if ( queue.size() && ! blocked )
273  {
274  int seq = queue[0];
275  CollectVectorInstance **c = data.begin();
276  CollectVectorInstance **c_e = data.end();
277  for( ; c != c_e && (*c)->seq != seq; ++c );
278  if ( c != c_e && (*c)->ready() )
279  {
280  o = *c;
281  o->lattice = latqueue[0];
282  queue.del(0,1);
283  latqueue.del(0,1);
284  }
285  }
286  return o;
287  }
288 
289  void block() { blocked = 1; }
290  void unblock() { blocked = 0; }
292 
296  int blocked;
297 
298  };
299 #endif
300 private:
301 
302  CollectVectorSequence positions;
303  CollectVectorSequence velocities;
304  CollectVectorSequence forces;
305  int posTimings, velTimings, forceTimings;
306  FILE *dataStreamFile;
307 
308 #ifdef MEM_OPT_VERSION
309  int wrapCoorDoneCnt;
310  ParOutput *parOut;
311  double posOutTime; //record the output time
312  double velOutTime; //record the output time
313  double forceOutTime; //record the output time
314  double posIOTime; //record the max time spent on real file IO for one output
315  double velIOTime; //record the max time spent on real file IO for one output
316  double forceIOTime; //record the max time spent on real file IO for one output
317 
318  //for the sake of simultaneous writing to the same file
319  int posDoneCnt;
320  int velDoneCnt;
321  int forceDoneCnt;
322 
323  void checkPosReady();
324  void checkVelReady();
325  void checkForceReady();
326 #endif
327 };
328 
329 class DataStreamMsg : public CMessage_DataStreamMsg {
330 public:
331 
333 
334  static void* pack(DataStreamMsg* msg);
335  static DataStreamMsg* unpack(void *ptr);
336 
337 };
338 
339 //Use varsize message to be more SMP safe
340 class CollectVectorVarMsg : public CMessage_CollectVectorVarMsg
341 {
342 public:
344 public:
345  int seq;
346  int size;
351 };
352 
353 #ifdef MEM_OPT_VERSION
354 class CollectMidVectorInstance{
355  public:
356 
357  CollectMidVectorInstance(void) : seq(-10) {
358  CProxy_ParallelIOMgr io(CkpvAccess(BOCclass_group).ioMgr);
359  ParallelIOMgr *ioMgr = io.ckLocalBranch();
360  ioMgr->getMyAtomsRangeOnOutput(fromAtomID, toAtomID);
361  }
362 
363  CollectMidVectorInstance(int s) {
364  CProxy_ParallelIOMgr io(CkpvAccess(BOCclass_group).ioMgr);
365  ParallelIOMgr *ioMgr = io.ckLocalBranch();
366  ioMgr->getMyAtomsRangeOnOutput(fromAtomID, toAtomID);
367  reset(s);
368  }
369 
370  void free() { seq = -10; }
371  int notfree() { return ( seq != -10 ); }
372 
373  void reset(int s) {
374  if ( s == -10 ) NAMD_bug("seq == free in CollectionMidMaster");
375  seq = s;
376  remaining = toAtomID-fromAtomID+1;
377  data.resize(0);
378  fdata.resize(0);
379  }
380 
381  // if 1 is returned, indicates all the expected atoms for a
382  // timestep have been received
383  int append(int size, AtomID *a, Vector *d, FloatVector *fd){
384  if (d) {
385  if ( size ) data.resize(toAtomID-fromAtomID+1);
386  Vector *ptr = data.begin();
387  for(int i = 0; i < size; ++i) { ptr[a[i]-fromAtomID] = d[i]; }
388  }
389  if (fd) {
390  if ( size ) fdata.resize(toAtomID-fromAtomID+1);
391  FloatVector *ptr = fdata.begin();
392  for(int i = 0; i < size; ++i) { ptr[a[i]-fromAtomID] = fd[i]; }
393  }
394  remaining -= size;
395 
396  return ready();
397  }
398 
399  int ready(void) { return ( ! remaining ); }
400 
401  int seq;
402  Lattice lattice;
403  ResizeArray<Vector> data;
405 
406  //indicates the range of atoms this object is responsible for collecting
407  int fromAtomID;
408  int toAtomID;
409 
410  private:
411  int remaining;
412 };//end of declaration for CollectMidVectorInstance
413 
414 
415 //An object of this class will be a member of Parallel IO Manager
416 //It is responsible to buffer the position/coordinates data that is
417 //going to be written to file system.
418 //In particular, the instance of this class will be on output procs.
419 //It will communicate via parallel IO manager with the CollectionMaster
420 //object on PE0 to be notified on when and what to write --Chao Mei
421 class CollectionMidMaster{
422 public:
423 
424  CollectionMidMaster(ParallelIOMgr *pIO_) : pIO(pIO_) { parOut = new ParOutput(pIO_->myOutputRank); }
425  ~CollectionMidMaster(void) { delete parOut; }
426 
427  int receivePositions(CollectVectorVarMsg *msg) {return positions.submitData(msg);}
428  int receiveVelocities(CollectVectorVarMsg *msg) {return velocities.submitData(msg);}
429  int receiveForces(CollectVectorVarMsg *msg) {return forces.submitData(msg);}
430 
431  void disposePositions(int seq);
432  void disposeVelocities(int seq);
433  void disposeForces(int seq);
434 
435  CollectMidVectorInstance *getReadyPositions(int seq) { return positions.getReady(seq); }
436 
437  //containing an array of CollectVectorInstance and their corresponding
438  //timestep value and lattice value
439  class CollectVectorSequence{
440  public:
441  int submitData(CollectVectorVarMsg *msg){
442  int seq = msg->seq;
443  CollectMidVectorInstance **c = data.begin();
444  CollectMidVectorInstance **c_e = data.end();
445  for( ; c != c_e && (*c)->seq != seq; ++c );
446  if ( c == c_e ){
447  c = data.begin();
448  for( ; c != c_e && (*c)->notfree(); ++c );
449  if ( c == c_e ) {
450  data.add(new CollectMidVectorInstance(seq));
451  c = data.end() - 1;
452  }
453  (*c)->reset(seq);
454  }
455  AtomID *i = msg->aid;
456  Vector *d = msg->data;
457  FloatVector *fd = msg->fdata;
459  fd = NULL;
461  d = NULL;
462  }
463  return (*c)->append(msg->size,i,d,fd);
464  }
465 
466  CollectMidVectorInstance* getReady(int seq){
467  CollectMidVectorInstance **c = data.begin();
468  CollectMidVectorInstance **c_e = data.end();
469  for( ; c != c_e && (*c)->seq != seq; ++c );
470  CmiAssert(c != c_e);
471  return *c;
472  }
473 
475  };//end of declaration for CollectionMidMaster::CollectVectorSequence
476 
477 private:
478  CollectVectorSequence positions;
479  CollectVectorSequence velocities;
480  CollectVectorSequence forces;
481  ParallelIOMgr *pIO;
482  ParOutput *parOut;
483 }; //end of declaration for CollectionMidMaster
484 #endif
485 
486 #endif
487 
CollectVectorInstance * removeReady(void)
int AtomID
Definition: NamdTypes.h:29
void del(int index, int num=1)
Definition: ResizeArray.h:104
int getNumOutputProcs()
static PatchMap * Object()
Definition: PatchMap.h:27
void disposeForces(CollectVectorInstance *c)
void receiveVelocities(CollectVectorMsg *msg)
Definition: Vector.h:64
void receivePositions(CollectVectorMsg *msg)
static __thread float4 * forces
static DataStreamMsg * unpack(void *ptr)
void enqueueVelocities(int seq)
void receiveOutputPosReady(int seq)
void submitData(CollectVectorMsg *msg, int max_index)
void enqueue(int seq, Lattice &lattice)
void startNextRoundOutputPos(double totalT)
void append(CollectVectorMsg *msg, int max_index)
void enqueuePositions(int seq, Lattice &lattice)
void receiveDataStream(DataStreamMsg *msg)
static void * pack(DataStreamMsg *msg)
void receiveOutputVelReady(int seq)
void NAMD_bug(const char *err_msg)
Definition: common.C:129
void disposePositions(CollectVectorInstance *c)
void startNextRoundOutputVel(double totalT)
void receiveForces(CollectVectorMsg *msg)
void disposeVelocities(CollectVectorInstance *c)
static CollectionMaster * Object()
int add(const Elem &elem)
Definition: ResizeArray.h:97
ResizeArray< char > data
void resize(int i)
Definition: ResizeArray.h:84
void startNextRoundOutputForce(double totalT)
void enqueueForces(int seq)
ResizeArray< CollectVectorInstance * > data
int size(void) const
Definition: ResizeArray.h:127
FloatVector * fdata
void receiveOutputForceReady(int seq)
iterator begin(void)
Definition: ResizeArray.h:36