Main Page | Namespace List | Class Hierarchy | Alphabetical List | Class List | File List | Namespace Members | Class Members | File Members

CollectionMaster.h

Go to the documentation of this file.
00001 
00007 #ifndef COLLECTIONMASTER_H
00008 #define COLLECTIONMASTER_H
00009 
00010 #include "charm++.h"
00011 #include "main.h"
00012 #include "NamdTypes.h"
00013 #include "Lattice.h"
00014 #include "ProcessorPrivate.h"
00015 #include "PatchMap.h"
00016 #include "PatchMap.inl"
00017 #include "CollectionMaster.decl.h"
00018 #include <stdio.h>
00019 
00020 #include "Node.h"
00021 #include "ParallelIOMgr.decl.h"
00022 #include "ParallelIOMgr.h"
00023 #include "Output.h"
00024 
00025 class CollectVectorMsg : public CMessage_CollectVectorMsg
00026 {
00027 public:
00028 
00029   int seq;
00030   int aid_size;
00031   int data_size;
00032   int fdata_size;
00033   AtomID *aid;
00034   Vector *data;
00035   FloatVector *fdata;
00036 };
00037 
00038 class DataStreamMsg;
00039 
00040 class CollectionMaster : public Chare
00041 {
00042 public:
00043   static CollectionMaster *Object() { 
00044     return CkpvAccess(CollectionMaster_instance); 
00045   }
00046   CollectionMaster();
00047   ~CollectionMaster(void);
00048 
00049   void receivePositions(CollectVectorMsg *msg);
00050   void receiveVelocities(CollectVectorMsg *msg);
00051   void receiveForces(CollectVectorMsg *msg);
00052 
00053   void receiveDataStream(DataStreamMsg *msg);
00054 
00055   void enqueuePositions(int seq, Lattice &lattice);
00056   void enqueueVelocities(int seq);
00057   void enqueueForces(int seq);
00058 
00059   class CollectVectorInstance;
00060   void disposePositions(CollectVectorInstance *c);
00061   void disposeVelocities(CollectVectorInstance *c);
00062   void disposeForces(CollectVectorInstance *c);
00063 
00065   void receiveOutputPosReady(int seq);
00066   void receiveOutputVelReady(int seq);
00067   void receiveOutputForceReady(int seq);
00068   //totalT is the time taken to do file I/O for each output workflow -Chao Mei  
00069   void startNextRoundOutputPos(double totalT);
00070   void startNextRoundOutputVel(double totalT);
00071   void startNextRoundOutputForce(double totalT);
00072 
00073   void wrapCoorFinished();
00074 
00075   enum OperationStatus {NOT_PROCESSED, IN_PROCESS, HAS_PROCESSED};
00077 #ifdef MEM_OPT_VERSION
00078   class CollectVectorInstance
00079   {
00080   public:
00081     CollectVectorInstance(void) : seq(-10) { ; }
00082 
00083     CollectVectorInstance(int s) { reset(s); }
00084 
00085     void free() { seq = -10; status = HAS_PROCESSED; }
00086     int notfree() { return ( seq != -10 ); }
00087 
00088     void reset(int s) {
00089       if ( s == -10 ) NAMD_bug("seq == free in CollectionMaster");
00090       seq = s;
00091       CProxy_ParallelIOMgr io(CkpvAccess(BOCclass_group).ioMgr);      
00092       ParallelIOMgr *ioMgr = io.ckLocalBranch();
00093       remaining = ioMgr->getNumOutputProcs();
00094       status = NOT_PROCESSED;          
00095     }
00096     
00097     void append(){ --remaining; }
00098     
00099     int ready(void) { return ( ! remaining ); }
00100 
00101     int seq;
00102     Lattice lattice;
00103     //mainly used for tracking the progress of wrap_coor operation
00104     //the write to files will not happen until the wrap_coor is finished,
00105     //and the CollectVectorInstance is freed. -Chao Mei
00106     OperationStatus status; 
00107   private:
00108     int remaining;
00109 
00110   }; //end of declaration for CollectionMaster::CollectVectorInstance
00111 
00112   class CollectVectorSequence
00113   {
00114   public:
00115 
00116     void submitData(int seq){
00117       CollectVectorInstance **c = data.begin();
00118       CollectVectorInstance **c_e = data.end();
00119       for( ; c != c_e && (*c)->seq != seq; ++c );
00120       if ( c == c_e )
00121       {
00122         c = data.begin();
00123         for( ; c != c_e && (*c)->notfree(); ++c );
00124         if ( c == c_e ) {
00125           data.add(new CollectVectorInstance(seq));
00126           c = data.end() - 1;
00127         }
00128         (*c)->reset(seq);
00129       }
00130       (*c)->append();
00131     }
00132 
00133     void enqueue(int seq, Lattice &lattice) {
00134       queue.add(seq);
00135       latqueue.add(lattice);
00136     }
00137 
00138     CollectVectorInstance* removeReady(void)
00139     {
00140       //it is equal to
00141       //if(getReady()) removeFirstReady();
00142       //But it should be not used in memory
00143       //optimized version as the ready instance
00144       //is delayed to be freed at the start of
00145       //next round of output -Chao Mei
00146       return NULL;
00147     }
00148 
00149     //only get the ready instance, not remove their info
00150     //from timestep queue and lattice queue
00151     CollectVectorInstance* getReady(void)
00152     {
00153       CollectVectorInstance *o = 0;
00154       if ( queue.size() )
00155       {
00156         int seq = queue[0];
00157         CollectVectorInstance **c = data.begin();
00158         CollectVectorInstance **c_e = data.end();
00159         for( ; c != c_e && (*c)->seq != seq; ++c );
00160         if ( c != c_e && (*c)->ready() )
00161         {
00162           o = *c;
00163           o->lattice = latqueue[0];
00164         }
00165       }
00166       return o;
00167     }
00168 
00169     //the function is intended to be used after "getReady"
00170     //to remove the info regarding the timestep and lattice.
00171     //So, it removes the front ready one. -Chao Mei
00172     int removeFirstReady(){
00173       int seq = queue[0];
00174       queue.del(0,1);
00175       latqueue.del(0,1);
00176       return seq;
00177     }
00178 
00179     ResizeArray<CollectVectorInstance*> data;
00180     ResizeArray<int> queue;
00181     ResizeArray<Lattice> latqueue;
00182   }; //end of declaration for CollectionMaster::CollectVectorSequence
00183 #else
00184   class CollectVectorInstance
00185   {
00186   public:
00187 
00188     CollectVectorInstance(void) : seq(-10) { ; }
00189 
00190     CollectVectorInstance(int s) { reset(s); }
00191 
00192     void free() { seq = -10; }
00193     int notfree() { return ( seq != -10 ); }
00194 
00195     void reset(int s) {
00196         if ( s == -10 ) NAMD_bug("seq == free in CollectionMaster");
00197         seq = s;
00198         remaining = (PatchMap::Object())->numNodesWithPatches(); 
00199         data.resize(0);
00200         fdata.resize(0);
00201     }
00202 
00203     // true -> send it and delete it!
00204     void append(CollectVectorMsg *msg, int max_index)
00205     {
00206       AtomID *a = msg->aid;
00207       Vector *d = msg->data;
00208       FloatVector *fd = msg->fdata;
00209       int size = msg->aid_size;
00210       if ( msg->data_size ) {
00211         data.resize(max_index);
00212         Vector *ptr = data.begin();
00213         for( int i = 0; i < size; ++i ) { ptr[a[i]] = d[i]; }
00214       }
00215       if ( msg->fdata_size ) {
00216         fdata.resize(max_index);
00217         FloatVector *ptr = fdata.begin();
00218         for( int i = 0; i < size; ++i ) { ptr[a[i]] = fd[i]; }
00219       }
00220       --remaining;
00221     }
00222 
00223     int ready(void) { return ( ! remaining ); }
00224 
00225     int seq;
00226     Lattice lattice;
00227 
00228     ResizeArray<Vector> data;
00229     ResizeArray<FloatVector> fdata;
00230 
00231   private:
00232     int remaining;
00233 
00234   };
00235 
00236   class CollectVectorSequence
00237   {
00238   public:
00239 
00240     void submitData(CollectVectorMsg *msg, int max_index)
00241     {
00242       int seq = msg->seq;
00243       CollectVectorInstance **c = data.begin();
00244       CollectVectorInstance **c_e = data.end();
00245       for( ; c != c_e && (*c)->seq != seq; ++c );
00246       if ( c == c_e )
00247       {
00248         c = data.begin();
00249         for( ; c != c_e && (*c)->notfree(); ++c );
00250         if ( c == c_e ) {
00251           data.add(new CollectVectorInstance(seq));
00252           c = data.end() - 1;
00253         }
00254         (*c)->reset(seq);
00255       }
00256       (*c)->append(msg, max_index);
00257     }
00258 
00259     void enqueue(int seq, Lattice &lattice) {
00260       queue.add(seq);
00261       latqueue.add(lattice);
00262     }
00263 
00264     CollectVectorInstance* removeReady(void)
00265     {
00266       CollectVectorInstance *o = 0;
00267       if ( queue.size() )
00268       {
00269         int seq = queue[0];
00270         CollectVectorInstance **c = data.begin();
00271         CollectVectorInstance **c_e = data.end();
00272         for( ; c != c_e && (*c)->seq != seq; ++c );
00273         if ( c != c_e && (*c)->ready() )
00274         {
00275           o = *c;
00276           o->lattice = latqueue[0];
00277           queue.del(0,1);
00278           latqueue.del(0,1);
00279         }
00280       }
00281       return o;
00282     }
00283 
00284     ResizeArray<CollectVectorInstance*> data;
00285     ResizeArray<int> queue;
00286     ResizeArray<Lattice> latqueue;
00287 
00288   };
00289 #endif
00290 private:
00291 
00292   CollectVectorSequence positions;
00293   CollectVectorSequence velocities;
00294   CollectVectorSequence forces;
00295   int posTimings, velTimings, forceTimings;
00296   FILE *dataStreamFile;
00297 
00298 #ifdef MEM_OPT_VERSION
00299   int wrapCoorDoneCnt;
00300   ParOutput *parOut;
00301   double posOutTime; //record the output time
00302   double velOutTime; //record the output time
00303   double forceOutTime; //record the output time
00304   double posIOTime; //record the max time spent on real file IO for one output
00305   double velIOTime; //record the max time spent on real file IO for one output
00306   double forceIOTime; //record the max time spent on real file IO for one output
00307   
00308   //for the sake of simultaneous writing to the same file
00309   int posDoneCnt;
00310   int velDoneCnt;
00311   int forceDoneCnt;
00312 
00313   void checkPosReady();
00314   void checkVelReady();
00315   void checkForceReady();
00316 #endif
00317 };
00318 
00319 class DataStreamMsg : public CMessage_DataStreamMsg {
00320 public:
00321 
00322   ResizeArray<char> data;
00323 
00324   static void* pack(DataStreamMsg* msg);
00325   static DataStreamMsg* unpack(void *ptr);
00326 
00327 };
00328 
00329 //Use varsize message to be more SMP safe 
00330 class CollectVectorVarMsg : public CMessage_CollectVectorVarMsg
00331 {
00332 public:
00333   enum DataStatus {VectorValid, FloatVectorValid, BothValid};
00334 public:
00335   int seq;
00336   int size;
00337   DataStatus status;
00338   AtomID *aid;
00339   Vector *data;
00340   FloatVector *fdata;
00341 };
00342 
00343 #ifdef MEM_OPT_VERSION
00344 class CollectMidVectorInstance{
00345   public:
00346 
00347     CollectMidVectorInstance(void) : seq(-10) {
00348       CProxy_ParallelIOMgr io(CkpvAccess(BOCclass_group).ioMgr);
00349       ParallelIOMgr *ioMgr = io.ckLocalBranch();
00350       ioMgr->getMyAtomsRangeOnOutput(fromAtomID, toAtomID);    
00351     }
00352 
00353     CollectMidVectorInstance(int s) { 
00354       CProxy_ParallelIOMgr io(CkpvAccess(BOCclass_group).ioMgr);
00355       ParallelIOMgr *ioMgr = io.ckLocalBranch();
00356       ioMgr->getMyAtomsRangeOnOutput(fromAtomID, toAtomID);
00357       reset(s); 
00358     }
00359 
00360     void free() { seq = -10; }
00361     int notfree() { return ( seq != -10 ); }
00362 
00363     void reset(int s) {
00364       if ( s == -10 ) NAMD_bug("seq == free in CollectionMidMaster");
00365       seq = s;
00366       remaining = toAtomID-fromAtomID+1;
00367       data.resize(0);
00368       fdata.resize(0);
00369     }
00370 
00371     // if 1 is returned, indicates all the expected atoms for a 
00372     // timestep have been received
00373     int append(int size, AtomID *a, Vector *d, FloatVector *fd){      
00374       if (d) {
00375         if ( size ) data.resize(toAtomID-fromAtomID+1);
00376         Vector *ptr = data.begin();
00377         for(int i = 0; i < size; ++i) { ptr[a[i]-fromAtomID] = d[i]; }
00378       }
00379       if (fd) {
00380         if ( size ) fdata.resize(toAtomID-fromAtomID+1);
00381         FloatVector *ptr = fdata.begin();
00382         for(int i = 0; i < size; ++i) { ptr[a[i]-fromAtomID] = fd[i]; }
00383       }
00384       remaining -= size;
00385 
00386       return ready();
00387     }
00388 
00389     int ready(void) { return ( ! remaining ); }
00390 
00391     int seq;    
00392     Lattice lattice;
00393     ResizeArray<Vector> data;
00394     ResizeArray<FloatVector> fdata;
00395 
00396     //indicates the range of atoms this object is responsible for collecting
00397     int fromAtomID; 
00398     int toAtomID;
00399 
00400   private:
00401     int remaining;
00402 };//end of declaration for CollectMidVectorInstance
00403 
00404 
00405 //An object of this class will be a member of Parallel IO Manager
00406 //It is responsible to buffer the position/coordinates data that is
00407 //going to be written to file system.
00408 //In particular, the instance of this class will be on output procs.
00409 //It will communicate via parallel IO manager with the CollectionMaster
00410 //object on PE0 to be notified on when and what to write --Chao Mei
00411 class CollectionMidMaster{
00412 public:
00413 
00414   CollectionMidMaster(ParallelIOMgr *pIO_) : pIO(pIO_) { parOut = new ParOutput(pIO_->myOutputRank); }
00415   ~CollectionMidMaster(void) { delete parOut; }
00416 
00417   int receivePositions(CollectVectorVarMsg *msg) {return positions.submitData(msg);}
00418   int receiveVelocities(CollectVectorVarMsg *msg) {return velocities.submitData(msg);}
00419   int receiveForces(CollectVectorVarMsg *msg) {return forces.submitData(msg);}
00420 
00421   void disposePositions(int seq);
00422   void disposeVelocities(int seq);
00423   void disposeForces(int seq);
00424   
00425   CollectMidVectorInstance *getReadyPositions(int seq) { return positions.getReady(seq); }
00426 
00427   //containing an array of CollectVectorInstance and their corresponding
00428   //timestep value and lattice value
00429   class CollectVectorSequence{    
00430   public:
00431     int submitData(CollectVectorVarMsg *msg){
00432       int seq = msg->seq;
00433       CollectMidVectorInstance **c = data.begin();
00434       CollectMidVectorInstance **c_e = data.end();
00435       for( ; c != c_e && (*c)->seq != seq; ++c );
00436       if ( c == c_e ){
00437         c = data.begin();
00438         for( ; c != c_e && (*c)->notfree(); ++c );
00439         if ( c == c_e ) {
00440           data.add(new CollectMidVectorInstance(seq));
00441           c = data.end() - 1;
00442         }
00443         (*c)->reset(seq);
00444       }      
00445       AtomID *i = msg->aid;
00446       Vector *d = msg->data;
00447       FloatVector *fd = msg->fdata;
00448       if(msg->status==CollectVectorVarMsg::VectorValid) {
00449         fd = NULL;
00450       }else if(msg->status==CollectVectorVarMsg::FloatVectorValid){
00451         d = NULL;
00452       }
00453       return (*c)->append(msg->size,i,d,fd);
00454     }
00455 
00456     CollectMidVectorInstance* getReady(int seq){
00457       CollectMidVectorInstance **c = data.begin();
00458       CollectMidVectorInstance **c_e = data.end();
00459       for( ; c != c_e && (*c)->seq != seq; ++c );
00460       CmiAssert(c != c_e);
00461       return *c;      
00462     }
00463 
00464     ResizeArray<CollectMidVectorInstance*> data;
00465   };//end of declaration for CollectionMidMaster::CollectVectorSequence
00466 
00467 private:
00468   CollectVectorSequence positions;
00469   CollectVectorSequence velocities;
00470   CollectVectorSequence forces;
00471   ParallelIOMgr *pIO; 
00472   ParOutput *parOut; 
00473 }; //end of declaration for CollectionMidMaster
00474 #endif
00475 
00476 #endif
00477 

Generated on Thu May 23 04:07:14 2013 for NAMD by  doxygen 1.3.9.1