CollectionMaster.h

Go to the documentation of this file.
00001 
00007 #ifndef COLLECTIONMASTER_H
00008 #define COLLECTIONMASTER_H
00009 
00010 #include "charm++.h"
00011 #include "main.h"
00012 #include "NamdTypes.h"
00013 #include "Lattice.h"
00014 #include "ProcessorPrivate.h"
00015 #include "PatchMap.h"
00016 #include "PatchMap.inl"
00017 #include "CollectionMaster.decl.h"
00018 #include <stdio.h>
00019 
00020 #include "Node.h"
00021 #include "ParallelIOMgr.decl.h"
00022 #include "ParallelIOMgr.h"
00023 #include "Output.h"
00024 
00025 class CollectVectorMsg : public CMessage_CollectVectorMsg
00026 {
00027 public:
00028 
00029   int seq;
00030   int aid_size;
00031   int data_size;
00032   int fdata_size;
00033   AtomID *aid;
00034   Vector *data;
00035   FloatVector *fdata;
00036 };
00037 
00038 class DataStreamMsg;
00039 
00040 class CollectionMaster : public Chare
00041 {
00042 public:
00043   static CollectionMaster *Object() { 
00044     return CkpvAccess(CollectionMaster_instance); 
00045   }
00046   CollectionMaster();
00047   ~CollectionMaster(void);
00048 
00049   void receivePositions(CollectVectorMsg *msg);
00050   void receiveVelocities(CollectVectorMsg *msg);
00051   void receiveForces(CollectVectorMsg *msg);
00052 
00053   void receiveDataStream(DataStreamMsg *msg);
00054 
00055   void enqueuePositions(int seq, Lattice &lattice);
00056   void enqueueVelocities(int seq);
00057   void enqueueForces(int seq);
00058 
00059   class CollectVectorInstance;
00060   void disposePositions(CollectVectorInstance *c);
00061   void disposeVelocities(CollectVectorInstance *c);
00062   void disposeForces(CollectVectorInstance *c);
00063 
00064   void blockPositions() { positions.block(); }
00065   void unblockPositions() { positions.unblock(); }
00066 
00068   void receiveOutputPosReady(int seq);
00069   void receiveOutputVelReady(int seq);
00070   void receiveOutputForceReady(int seq);
00071   //totalT is the time taken to do file I/O for each output workflow -Chao Mei  
00072   void startNextRoundOutputPos(double totalT);
00073   void startNextRoundOutputVel(double totalT);
00074   void startNextRoundOutputForce(double totalT);
00075 
00076   void wrapCoorFinished();
00077 
00078   enum OperationStatus {NOT_PROCESSED, IN_PROCESS, HAS_PROCESSED};
00080 #ifdef MEM_OPT_VERSION
00081   class CollectVectorInstance
00082   {
00083   public:
00084     CollectVectorInstance(void) : seq(-10) { ; }
00085 
00086     CollectVectorInstance(int s) { reset(s); }
00087 
00088     void free() { seq = -10; status = HAS_PROCESSED; }
00089     int notfree() { return ( seq != -10 ); }
00090 
00091     void reset(int s) {
00092       if ( s == -10 ) NAMD_bug("seq == free in CollectionMaster");
00093       seq = s;
00094       CProxy_ParallelIOMgr io(CkpvAccess(BOCclass_group).ioMgr);      
00095       ParallelIOMgr *ioMgr = io.ckLocalBranch();
00096       remaining = ioMgr->getNumOutputProcs();
00097       status = NOT_PROCESSED;          
00098     }
00099     
00100     void append(){ --remaining; }
00101     
00102     int ready(void) { return ( ! remaining ); }
00103 
00104     int seq;
00105     Lattice lattice;
00106     //mainly used for tracking the progress of wrap_coor operation
00107     //the write to files will not happen until the wrap_coor is finished,
00108     //and the CollectVectorInstance is freed. -Chao Mei
00109     OperationStatus status; 
00110   private:
00111     int remaining;
00112 
00113   }; //end of declaration for CollectionMaster::CollectVectorInstance
00114 
00115   class CollectVectorSequence
00116   {
00117   public:
00118 
00119     void submitData(int seq){
00120       CollectVectorInstance **c = data.begin();
00121       CollectVectorInstance **c_e = data.end();
00122       for( ; c != c_e && (*c)->seq != seq; ++c );
00123       if ( c == c_e )
00124       {
00125         c = data.begin();
00126         for( ; c != c_e && (*c)->notfree(); ++c );
00127         if ( c == c_e ) {
00128           data.add(new CollectVectorInstance(seq));
00129           c = data.end() - 1;
00130         }
00131         (*c)->reset(seq);
00132       }
00133       (*c)->append();
00134     }
00135 
00136     void enqueue(int seq, Lattice &lattice) {
00137       queue.add(seq);
00138       latqueue.add(lattice);
00139     }
00140 
00141     CollectVectorInstance* removeReady(void)
00142     {
00143       //it is equal to
00144       //if(getReady()) removeFirstReady();
00145       //But it should be not used in memory
00146       //optimized version as the ready instance
00147       //is delayed to be freed at the start of
00148       //next round of output -Chao Mei
00149       return NULL;
00150     }
00151     void block() { ; }  // unimplemented
00152     void unblock() { ; }  // unimplemented
00153 
00154     //only get the ready instance, not remove their info
00155     //from timestep queue and lattice queue
00156     CollectVectorInstance* getReady(void)
00157     {
00158       CollectVectorInstance *o = 0;
00159       if ( queue.size() )
00160       {
00161         int seq = queue[0];
00162         CollectVectorInstance **c = data.begin();
00163         CollectVectorInstance **c_e = data.end();
00164         for( ; c != c_e && (*c)->seq != seq; ++c );
00165         if ( c != c_e && (*c)->ready() )
00166         {
00167           o = *c;
00168           o->lattice = latqueue[0];
00169         }
00170       }
00171       return o;
00172     }
00173 
00174     //the function is intended to be used after "getReady"
00175     //to remove the info regarding the timestep and lattice.
00176     //So, it removes the front ready one. -Chao Mei
00177     int removeFirstReady(){
00178       int seq = queue[0];
00179       queue.del(0,1);
00180       latqueue.del(0,1);
00181       return seq;
00182     }
00183 
00184     ResizeArray<CollectVectorInstance*> data;
00185     ResizeArray<int> queue;
00186     ResizeArray<Lattice> latqueue;
00187   }; //end of declaration for CollectionMaster::CollectVectorSequence
00188 #else
00189   class CollectVectorInstance
00190   {
00191   public:
00192 
00193     CollectVectorInstance(void) : seq(-10) { ; }
00194 
00195     CollectVectorInstance(int s) { reset(s); }
00196 
00197     void free() { seq = -10; }
00198     int notfree() { return ( seq != -10 ); }
00199 
00200     void reset(int s) {
00201         if ( s == -10 ) NAMD_bug("seq == free in CollectionMaster");
00202         seq = s;
00203         remaining = (PatchMap::Object())->numNodesWithPatches(); 
00204         data.resize(0);
00205         fdata.resize(0);
00206     }
00207 
00208     // true -> send it and delete it!
00209     void append(CollectVectorMsg *msg, int max_index)
00210     {
00211       AtomID *a = msg->aid;
00212       Vector *d = msg->data;
00213       FloatVector *fd = msg->fdata;
00214       int size = msg->aid_size;
00215       if ( msg->data_size ) {
00216         data.resize(max_index);
00217         Vector *ptr = data.begin();
00218         for( int i = 0; i < size; ++i ) { ptr[a[i]] = d[i]; }
00219       }
00220       if ( msg->fdata_size ) {
00221         fdata.resize(max_index);
00222         FloatVector *ptr = fdata.begin();
00223         for( int i = 0; i < size; ++i ) { ptr[a[i]] = fd[i]; }
00224       }
00225       --remaining;
00226     }
00227 
00228     int ready(void) { return ( ! remaining ); }
00229 
00230     int seq;
00231     Lattice lattice;
00232 
00233     ResizeArray<Vector> data;
00234     ResizeArray<FloatVector> fdata;
00235 
00236   private:
00237     int remaining;
00238 
00239   };
00240 
00241   class CollectVectorSequence
00242   {
00243   public:
00244 
00245     void submitData(CollectVectorMsg *msg, int max_index)
00246     {
00247       int seq = msg->seq;
00248       CollectVectorInstance **c = data.begin();
00249       CollectVectorInstance **c_e = data.end();
00250       for( ; c != c_e && (*c)->seq != seq; ++c );
00251       if ( c == c_e )
00252       {
00253         c = data.begin();
00254         for( ; c != c_e && (*c)->notfree(); ++c );
00255         if ( c == c_e ) {
00256           data.add(new CollectVectorInstance(seq));
00257           c = data.end() - 1;
00258         }
00259         (*c)->reset(seq);
00260       }
00261       (*c)->append(msg, max_index);
00262     }
00263 
00264     void enqueue(int seq, Lattice &lattice) {
00265       queue.add(seq);
00266       latqueue.add(lattice);
00267     }
00268 
00269     CollectVectorInstance* removeReady(void)
00270     {
00271       CollectVectorInstance *o = 0;
00272       if ( queue.size() && ! blocked )
00273       {
00274         int seq = queue[0];
00275         CollectVectorInstance **c = data.begin();
00276         CollectVectorInstance **c_e = data.end();
00277         for( ; c != c_e && (*c)->seq != seq; ++c );
00278         if ( c != c_e && (*c)->ready() )
00279         {
00280           o = *c;
00281           o->lattice = latqueue[0];
00282           queue.del(0,1);
00283           latqueue.del(0,1);
00284         }
00285       }
00286       return o;
00287     }
00288 
00289     void block() { blocked = 1; }
00290     void unblock() { blocked = 0; }
00291     CollectVectorSequence() : blocked(0) { ; }
00292     
00293     ResizeArray<CollectVectorInstance*> data;
00294     ResizeArray<int> queue;
00295     ResizeArray<Lattice> latqueue;
00296     int blocked;
00297 
00298   };
00299 #endif
00300 private:
00301 
00302   CollectVectorSequence positions;
00303   CollectVectorSequence velocities;
00304   CollectVectorSequence forces;
00305   int posTimings, velTimings, forceTimings;
00306   FILE *dataStreamFile;
00307 
00308 #ifdef MEM_OPT_VERSION
00309   int wrapCoorDoneCnt;
00310   ParOutput *parOut;
00311   double posOutTime; //record the output time
00312   double velOutTime; //record the output time
00313   double forceOutTime; //record the output time
00314   double posIOTime; //record the max time spent on real file IO for one output
00315   double velIOTime; //record the max time spent on real file IO for one output
00316   double forceIOTime; //record the max time spent on real file IO for one output
00317   
00318   //for the sake of simultaneous writing to the same file
00319   int posDoneCnt;
00320   int velDoneCnt;
00321   int forceDoneCnt;
00322 
00323   void checkPosReady();
00324   void checkVelReady();
00325   void checkForceReady();
00326 #endif
00327 };
00328 
00329 class DataStreamMsg : public CMessage_DataStreamMsg {
00330 public:
00331 
00332   ResizeArray<char> data;
00333 
00334   static void* pack(DataStreamMsg* msg);
00335   static DataStreamMsg* unpack(void *ptr);
00336 
00337 };
00338 
00339 //Use varsize message to be more SMP safe 
00340 class CollectVectorVarMsg : public CMessage_CollectVectorVarMsg
00341 {
00342 public:
00343   enum DataStatus {VectorValid, FloatVectorValid, BothValid};
00344 public:
00345   int seq;
00346   int size;
00347   DataStatus status;
00348   AtomID *aid;
00349   Vector *data;
00350   FloatVector *fdata;
00351 };
00352 
00353 #ifdef MEM_OPT_VERSION
00354 class CollectMidVectorInstance{
00355   public:
00356 
00357     CollectMidVectorInstance(void) : seq(-10) {
00358       CProxy_ParallelIOMgr io(CkpvAccess(BOCclass_group).ioMgr);
00359       ParallelIOMgr *ioMgr = io.ckLocalBranch();
00360       ioMgr->getMyAtomsRangeOnOutput(fromAtomID, toAtomID);    
00361     }
00362 
00363     CollectMidVectorInstance(int s) { 
00364       CProxy_ParallelIOMgr io(CkpvAccess(BOCclass_group).ioMgr);
00365       ParallelIOMgr *ioMgr = io.ckLocalBranch();
00366       ioMgr->getMyAtomsRangeOnOutput(fromAtomID, toAtomID);
00367       reset(s); 
00368     }
00369 
00370     void free() { seq = -10; }
00371     int notfree() { return ( seq != -10 ); }
00372 
00373     void reset(int s) {
00374       if ( s == -10 ) NAMD_bug("seq == free in CollectionMidMaster");
00375       seq = s;
00376       remaining = toAtomID-fromAtomID+1;
00377       data.resize(0);
00378       fdata.resize(0);
00379     }
00380 
00381     // if 1 is returned, indicates all the expected atoms for a 
00382     // timestep have been received
00383     int append(int size, AtomID *a, Vector *d, FloatVector *fd){      
00384       if (d) {
00385         if ( size ) data.resize(toAtomID-fromAtomID+1);
00386         Vector *ptr = data.begin();
00387         for(int i = 0; i < size; ++i) { ptr[a[i]-fromAtomID] = d[i]; }
00388       }
00389       if (fd) {
00390         if ( size ) fdata.resize(toAtomID-fromAtomID+1);
00391         FloatVector *ptr = fdata.begin();
00392         for(int i = 0; i < size; ++i) { ptr[a[i]-fromAtomID] = fd[i]; }
00393       }
00394       remaining -= size;
00395 
00396       return ready();
00397     }
00398 
00399     int ready(void) { return ( ! remaining ); }
00400 
00401     int seq;    
00402     Lattice lattice;
00403     ResizeArray<Vector> data;
00404     ResizeArray<FloatVector> fdata;
00405 
00406     //indicates the range of atoms this object is responsible for collecting
00407     int fromAtomID; 
00408     int toAtomID;
00409 
00410   private:
00411     int remaining;
00412 };//end of declaration for CollectMidVectorInstance
00413 
00414 
00415 //An object of this class will be a member of Parallel IO Manager
00416 //It is responsible to buffer the position/coordinates data that is
00417 //going to be written to file system.
00418 //In particular, the instance of this class will be on output procs.
00419 //It will communicate via parallel IO manager with the CollectionMaster
00420 //object on PE0 to be notified on when and what to write --Chao Mei
00421 class CollectionMidMaster{
00422 public:
00423 
00424   CollectionMidMaster(ParallelIOMgr *pIO_) : pIO(pIO_) { parOut = new ParOutput(pIO_->myOutputRank); }
00425   ~CollectionMidMaster(void) { delete parOut; }
00426 
00427   int receivePositions(CollectVectorVarMsg *msg) {return positions.submitData(msg);}
00428   int receiveVelocities(CollectVectorVarMsg *msg) {return velocities.submitData(msg);}
00429   int receiveForces(CollectVectorVarMsg *msg) {return forces.submitData(msg);}
00430 
00431   void disposePositions(int seq);
00432   void disposeVelocities(int seq);
00433   void disposeForces(int seq);
00434   
00435   CollectMidVectorInstance *getReadyPositions(int seq) { return positions.getReady(seq); }
00436 
00437   //containing an array of CollectVectorInstance and their corresponding
00438   //timestep value and lattice value
00439   class CollectVectorSequence{    
00440   public:
00441     int submitData(CollectVectorVarMsg *msg){
00442       int seq = msg->seq;
00443       CollectMidVectorInstance **c = data.begin();
00444       CollectMidVectorInstance **c_e = data.end();
00445       for( ; c != c_e && (*c)->seq != seq; ++c );
00446       if ( c == c_e ){
00447         c = data.begin();
00448         for( ; c != c_e && (*c)->notfree(); ++c );
00449         if ( c == c_e ) {
00450           data.add(new CollectMidVectorInstance(seq));
00451           c = data.end() - 1;
00452         }
00453         (*c)->reset(seq);
00454       }      
00455       AtomID *i = msg->aid;
00456       Vector *d = msg->data;
00457       FloatVector *fd = msg->fdata;
00458       if(msg->status==CollectVectorVarMsg::VectorValid) {
00459         fd = NULL;
00460       }else if(msg->status==CollectVectorVarMsg::FloatVectorValid){
00461         d = NULL;
00462       }
00463       return (*c)->append(msg->size,i,d,fd);
00464     }
00465 
00466     CollectMidVectorInstance* getReady(int seq){
00467       CollectMidVectorInstance **c = data.begin();
00468       CollectMidVectorInstance **c_e = data.end();
00469       for( ; c != c_e && (*c)->seq != seq; ++c );
00470       CmiAssert(c != c_e);
00471       return *c;      
00472     }
00473 
00474     ResizeArray<CollectMidVectorInstance*> data;
00475   };//end of declaration for CollectionMidMaster::CollectVectorSequence
00476 
00477 private:
00478   CollectVectorSequence positions;
00479   CollectVectorSequence velocities;
00480   CollectVectorSequence forces;
00481   ParallelIOMgr *pIO; 
00482   ParOutput *parOut; 
00483 }; //end of declaration for CollectionMidMaster
00484 #endif
00485 
00486 #endif
00487 

Generated on Tue Sep 19 01:17:10 2017 for NAMD by  doxygen 1.4.7