Main Page | Namespace List | Class Hierarchy | Alphabetical List | Class List | File List | Class Members | File Members

CollectionMaster.C

Go to the documentation of this file.
00001 
00007 #include "largefiles.h"  // must be first!
00008 
00009 #include "InfoStream.h"
00010 #include "CollectionMaster.h"
00011 #include "ProcessorPrivate.h"
00012 #include "SimParameters.h"
00013 #include "packmsg.h"
00014 #include "CollectionMaster.decl.h"
00015 #include "Molecule.h"
00016 
00017 #include "memusage.h"
00018 
00019 // #define DEBUGM
00020 #include "Debug.h"
00021 
00022 CollectionMaster::CollectionMaster()
00023 {
00024   if (CkpvAccess(CollectionMaster_instance) == 0) {
00025     CkpvAccess(CollectionMaster_instance) = this;
00026   } else {
00027     DebugM(1, "CollectionMaster::CollectionMaster() - another instance of CollectionMaster exists!\n");
00028   }
00029   dataStreamFile = 0;
00030 
00031 #ifdef MEM_OPT_VERSION
00032   wrapCoorDoneCnt = 0;
00033   posDoneCnt = 0;
00034   velDoneCnt = 0;
00035   parOut = new ParOutput();
00036 #endif
00037 
00038   posTimings = 10;  velTimings = forceTimings = 5;
00039 }
00040 
00041 
00042 CollectionMaster::~CollectionMaster(void)
00043 {
00044 }
00045 
00046 void CollectionMaster::receivePositions(CollectVectorMsg *msg)
00047 {
00048 #ifndef MEM_OPT_VERSION
00049   positions.submitData(msg,Node::Object()->molecule->numAtoms);
00050   delete msg;
00051   
00052   CollectVectorInstance *c;
00053   while ( ( c = positions.removeReady() ) ) { disposePositions(c); }
00054 #endif
00055 }
00056 
00057 void CollectionMaster::enqueuePositions(int seq, Lattice &lattice)
00058 {
00059   positions.enqueue(seq,lattice);
00060 
00061 #ifndef MEM_OPT_VERSION
00062   CollectVectorInstance *c;
00063   while ( ( c = positions.removeReady() ) ) { disposePositions(c); }
00064 #else
00065   checkPosReady();
00066 #endif
00067 }
00068 
00069 void CollectionMaster::disposePositions(CollectVectorInstance *c)
00070 {
00071 #ifndef MEM_OPT_VERSION
00072     DebugM(3,"Collected positions at " << c->seq << std::endl);
00073     int seq = c->seq;
00074     int size = c->data.size();
00075     if ( ! size ) size = c->fdata.size();
00076     Vector *data = c->data.begin();
00077     FloatVector *fdata = c->fdata.begin();
00078     double exectime = CmiWallTimer();
00079     double mem = memusage_MB();
00080     Node::Object()->output->coordinate(seq,size,data,fdata,c->lattice);
00081     c->free();
00082     exectime = CmiWallTimer()-exectime;
00083     if ( posTimings ) {
00084       CkPrintf("The last position output (seq=%d) takes %.3f seconds, %.3f MB of memory in use\n", seq, exectime, mem);
00085       --posTimings;
00086     }
00087 #endif
00088 }
00089 
00090 
00091 void CollectionMaster::receiveVelocities(CollectVectorMsg *msg)
00092 {
00093 #ifndef MEM_OPT_VERSION
00094   velocities.submitData(msg,Node::Object()->molecule->numAtoms);
00095   delete msg;
00096 
00097   CollectVectorInstance *c;
00098   while ( ( c = velocities.removeReady() ) ) { disposeVelocities(c); }
00099 #endif
00100 }
00101 
00102 void CollectionMaster::enqueueVelocities(int seq)
00103 {
00104   Lattice dummy;
00105   velocities.enqueue(seq,dummy);
00106 #ifndef MEM_OPT_VERSION
00107   CollectVectorInstance *c;
00108   while ( ( c = velocities.removeReady() ) ) { disposeVelocities(c); }
00109 #else
00110   checkVelReady();
00111 #endif
00112 }
00113 
00114 void CollectionMaster::disposeVelocities(CollectVectorInstance *c)
00115 {
00116 #ifndef MEM_OPT_VERSION
00117     DebugM(3,"Collected velocities at " << c->seq << std::endl);
00118     int seq = c->seq;
00119     int size = c->data.size();
00120     Vector *data = c->data.begin();
00121     double exectime = CmiWallTimer();
00122     double mem = memusage_MB();
00123     Node::Object()->output->velocity(seq,size,data);
00124     c->free();
00125     exectime = CmiWallTimer()-exectime;
00126     if ( velTimings ) {
00127       CkPrintf("The last velocity output (seq=%d) takes %.3f seconds, %.3f MB of memory in use\n", seq, exectime, mem);
00128       --velTimings;
00129     }
00130 #endif
00131 }
00132 
00133 
00134 void CollectionMaster::receiveForces(CollectVectorMsg *msg)
00135 {
00136 #ifndef MEM_OPT_VERSION
00137   forces.submitData(msg,Node::Object()->molecule->numAtoms);
00138   delete msg;
00139 
00140   CollectVectorInstance *c;
00141   while ( ( c = forces.removeReady() ) ) { disposeForces(c); }
00142 #endif
00143 }
00144 
00145 void CollectionMaster::enqueueForces(int seq)
00146 {
00147   Lattice dummy;
00148   forces.enqueue(seq,dummy);
00149 #ifndef MEM_OPT_VERSION
00150   CollectVectorInstance *c;
00151   while ( ( c = forces.removeReady() ) ) { disposeForces(c); }
00152 #else
00153   checkForceReady();
00154 #endif
00155 }
00156 
00157 void CollectionMaster::disposeForces(CollectVectorInstance *c)
00158 {
00159 #ifndef MEM_OPT_VERSION
00160     DebugM(3,"Collected forces at " << c->seq << std::endl);
00161     int seq = c->seq;
00162     int size = c->data.size();
00163     Vector *data = c->data.begin();
00164     double exectime = CmiWallTimer();
00165     double mem = memusage_MB();
00166     Node::Object()->output->force(seq,size,data);
00167     c->free();
00168     exectime = CmiWallTimer()-exectime;
00169     if ( forceTimings ) {
00170       CkPrintf("The last force output (seq=%d) takes %.3f seconds, %.3f MB of memory in use\n", seq, exectime, mem);
00171       --forceTimings;
00172     }
00173 #endif
00174 }
00175 
00176 
00177 void CollectionMaster::receiveDataStream(DataStreamMsg *msg) {
00178     if ( ! dataStreamFile ) {
00179       char *fname = Node::Object()->simParameters->auxFilename;
00180       // iout has large file linking issues on AIX
00181       // iout << iINFO << "OPENING AUXILIARY DATA STREAM FILE "
00182       //                                << fname << "\n" << endi;
00183       CkPrintf("Info: OPENING AUXILIARY DATA STREAM FILE %s\n", fname);
00184       NAMD_backup_file(fname);
00185       dataStreamFile = fopen(fname,"w");
00186       if ( ! dataStreamFile )
00187                 NAMD_die("Can't open auxiliary data stream file!");
00188     }
00189     fprintf(dataStreamFile,"%s",msg->data.begin());
00190     fflush(dataStreamFile);
00191     delete msg;
00192 }
00193 
00194 PACK_MSG(DataStreamMsg,
00195   PACK_RESIZE(data);
00196 )
00197 
00198 //The timesteps on CollectionMaster will be always in increasing order
00199 //because they are enqueued on PE0 by controller in order. -Chao Mei
00200 //
00201 //The computation of wrap_coor is also serialized for the sake of easy
00202 //implementation (easier management of output)
00203 void CollectionMaster::receiveOutputPosReady(int seq){
00204 #ifdef MEM_OPT_VERSION
00205     positions.submitData(seq);
00206     checkPosReady();
00207 #endif
00208 }
00209 
00210 void CollectionMaster::receiveOutputVelReady(int seq){
00211 #ifdef MEM_OPT_VERSION
00212     velocities.submitData(seq);
00213     checkVelReady();
00214 #endif
00215 }
00216 
00217 void CollectionMaster::receiveOutputForceReady(int seq){
00218 #ifdef MEM_OPT_VERSION
00219     forces.submitData(seq);
00220     checkForceReady();
00221 #endif
00222 }
00223 
00224 
00225 void CollectionMaster::startNextRoundOutputPos(double totalT){
00226 #ifdef MEM_OPT_VERSION
00227 
00228         if(totalT > posIOTime) posIOTime = totalT;
00229 
00230 #if OUTPUT_SINGLE_FILE
00231     if(++posDoneCnt < Node::Object()->simParameters->numoutputwrts)  return;
00232 #else
00233         if(++posDoneCnt < Node::Object()->simParameters->numoutputprocs)  return;
00234 #endif
00235 
00236         posDoneCnt = 0;
00237 
00238     //retrieve the last ready instance
00239     CollectVectorInstance *c = positions.getReady();
00240     int seq = c->seq;
00241     CmiAssert(c->status == IN_PROCESS);
00242     double mem = memusage_MB();
00243     positions.removeFirstReady();
00244     c->free();
00245     posOutTime = CmiWallTimer()-posOutTime;
00246     if ( posTimings ) {
00247       CkPrintf("The last position output (seq=%d) takes %.3f seconds(file I/O: %.3f secs), %.3f MB of memory in use\n", seq, posOutTime, posIOTime, mem);
00248       --posTimings;
00249     }
00250 
00251     //Actually the c->status doesn't need to be checked because it is
00252     //certain that the new ready one will not be in  IN_PROCESS status 
00253     checkPosReady();
00254 #endif
00255 }
00256 
00257 void CollectionMaster::startNextRoundOutputVel(double totalT){
00258 #ifdef MEM_OPT_VERSION
00259         
00260         if(totalT > velIOTime) velIOTime = totalT;
00261 
00262 #if OUTPUT_SINGLE_FILE
00263     if(++velDoneCnt < Node::Object()->simParameters->numoutputwrts)  return;
00264 #else
00265         if(++velDoneCnt < Node::Object()->simParameters->numoutputprocs)  return;
00266 #endif
00267 
00268     velDoneCnt = 0;
00269 
00270     //retrieve the last ready instance
00271     CollectVectorInstance *c = velocities.getReady();
00272     int seq = c->seq;
00273     CmiAssert(c->status == IN_PROCESS);
00274     double mem = memusage_MB();
00275     velocities.removeFirstReady();
00276     c->free();
00277     velOutTime = CmiWallTimer()-velOutTime;
00278     if ( velTimings ) {
00279       CkPrintf("The last velocity output (seq=%d) takes %.3f seconds(file I/O: %.3f secs), %.3f MB of memory in use\n", seq, velOutTime, velIOTime, mem);
00280       --velTimings;
00281     }
00282 
00283     //Actually the c->status doesn't need to be checked because it is
00284     //certain that the new ready one will not be in  IN_PROCESS status 
00285     checkVelReady();
00286 #endif
00287 }
00288 
00289 void CollectionMaster::startNextRoundOutputForce(double totalT){
00290 #ifdef MEM_OPT_VERSION
00291         
00292         if(totalT > forceIOTime) forceIOTime = totalT;
00293 
00294 #if OUTPUT_SINGLE_FILE
00295     if(++forceDoneCnt < Node::Object()->simParameters->numoutputwrts)  return;
00296 #else
00297         if(++forceDoneCnt < Node::Object()->simParameters->numoutputprocs)  return;
00298 #endif
00299 
00300     forceDoneCnt = 0;
00301 
00302     //retrieve the last ready instance
00303     CollectVectorInstance *c = forces.getReady();
00304     int seq = c->seq;
00305     CmiAssert(c->status == IN_PROCESS);
00306     double mem = memusage_MB();
00307     forces.removeFirstReady();
00308     c->free();
00309     forceOutTime = CmiWallTimer()-forceOutTime;
00310     if ( forceTimings ) {
00311       CkPrintf("The last force output (seq=%d) takes %.3f seconds(file I/O: %.3f secs), %.3f MB of memory in use\n", seq, forceOutTime, forceIOTime, mem);
00312       --forceTimings;
00313     }
00314 
00315     //Actually the c->status doesn't need to be checked because it is
00316     //certain that the new ready one will not be in  IN_PROCESS status 
00317     checkForceReady();
00318 #endif
00319 }
00320 
00321 
00322 void CollectionMaster::wrapCoorFinished(){
00323 #ifdef MEM_OPT_VERSION
00324     if(++wrapCoorDoneCnt == Node::Object()->simParameters->numoutputprocs){
00325         wrapCoorDoneCnt = 0;
00326 
00327                 //count the wrapping-coor time into master writing time
00328                 posIOTime = CmiWallTimer()-posOutTime; 
00329 
00330         //it's ready to output positions
00331         CollectVectorInstance *c = positions.getReady();
00332 
00333                 CProxy_ParallelIOMgr io(CkpvAccess(BOCclass_group).ioMgr);
00334                 ParallelIOMgr *ioMgr = io.ckLocalBranch();
00335 
00336 #if OUTPUT_SINGLE_FILE
00337         //notify output procs to do Token based output
00338         int grpsize = ioMgr->numOutputProcs / ioMgr->numOutputWrts;
00339         int remains = ioMgr->numOutputProcs % ioMgr->numOutputWrts;
00340         int outrank = 0;
00341         int i;
00342         for(i=0; i<remains; i++){
00343             io[ioMgr->outputProcArray[outrank]].disposePositions(c->seq, posIOTime);
00344             outrank += (grpsize+1);
00345         }
00346         for(; i<ioMgr->numOutputWrts; i++){
00347             io[ioMgr->outputProcArray[outrank]].disposePositions(c->seq, posIOTime);
00348             outrank += grpsize;
00349         }
00350 #else
00351                 //output multiple files
00352                 for(int i=0; i<ioMgr->numOutputProcs; i++) {
00353                         io[ioMgr->outputProcArray[i]].disposePositions(c->seq, posIOTime);
00354                 }
00355 #endif
00356 
00357     }
00358 #endif
00359 }
00360 
00361 #ifdef MEM_OPT_VERSION
00362 void CollectionMaster::checkPosReady(){
00363     CollectVectorInstance *c;
00364     if((c = positions.getReady())){
00365         if(c->status == IN_PROCESS){
00366             //indicating in the process of outputing coordinates
00367             return;
00368         }        
00369         c->status = IN_PROCESS;
00370 
00371         posOutTime = CmiWallTimer();
00372         SimParameters *simParam = Node::Object()->simParameters;
00373         CProxy_ParallelIOMgr io(CkpvAccess(BOCclass_group).ioMgr);
00374         ParallelIOMgr *ioMgr = io.ckLocalBranch();
00375         if(simParam->wrapAll || simParam->wrapWater){
00376             for(int i=0; i<ioMgr->numOutputProcs; i++){
00377                 io[ioMgr->outputProcArray[i]].wrapCoor(c->seq, c->lattice);
00378             }
00379             //write the header to overlap with the computation of 
00380             //wrapping coordinates
00381             parOut->coordinateMaster(c->seq,Node::Object()->molecule->numAtoms,c->lattice);
00382         }else{
00383             //write the header 
00384             parOut->coordinateMaster(c->seq,Node::Object()->molecule->numAtoms,c->lattice);
00385                         posIOTime = CmiWallTimer() - posOutTime;            
00386 
00387                 #if OUTPUT_SINGLE_FILE
00388             int grpsize = ioMgr->numOutputProcs / ioMgr->numOutputWrts;
00389             int remains = ioMgr->numOutputProcs % ioMgr->numOutputWrts;
00390             int outrank = 0;
00391             int i;
00392             for(i=0; i<remains; i++){
00393                 io[ioMgr->outputProcArray[outrank]].disposePositions(c->seq, posIOTime);
00394                 outrank += (grpsize+1);
00395             }
00396             for(; i<ioMgr->numOutputWrts; i++){
00397                 io[ioMgr->outputProcArray[outrank]].disposePositions(c->seq, posIOTime);
00398                 outrank += grpsize;
00399             }
00400                 #else
00401                         //output multiple files
00402                         for(int i=0; i<ioMgr->numOutputProcs; i++) {
00403                                 io[ioMgr->outputProcArray[i]].disposePositions(c->seq, posIOTime);
00404                         }
00405                 #endif
00406         }
00407         //this instance c is freed in the next round of output invocation.
00408     }
00409 }
00410 
00411 void CollectionMaster::checkVelReady(){
00412     CollectVectorInstance *c;
00413     if((c = velocities.getReady())){
00414         if(c->status == IN_PROCESS){
00415             //indicating in the process of outputing velocities
00416             return;
00417         }
00418 
00419         c->status = IN_PROCESS;
00420 
00421         velOutTime = CmiWallTimer();
00422         //write the header
00423         parOut->velocityMaster(c->seq, Node::Object()->molecule->numAtoms);
00424                 velIOTime = CmiWallTimer() - velOutTime;
00425 
00426         //notify output procs to do Token based output
00427         CProxy_ParallelIOMgr io(CkpvAccess(BOCclass_group).ioMgr);
00428         ParallelIOMgr *ioMgr = io.ckLocalBranch();
00429 
00430         #if OUTPUT_SINGLE_FILE
00431         int grpsize = ioMgr->numOutputProcs / ioMgr->numOutputWrts;
00432         int remains = ioMgr->numOutputProcs % ioMgr->numOutputWrts;
00433         int outrank = 0;
00434         int i;
00435         for(i=0; i<remains; i++){
00436             io[ioMgr->outputProcArray[outrank]].disposeVelocities(c->seq, velIOTime);
00437             outrank += (grpsize+1);
00438         }
00439         for(; i<ioMgr->numOutputWrts; i++){
00440             io[ioMgr->outputProcArray[outrank]].disposeVelocities(c->seq, velIOTime);
00441             outrank += grpsize;
00442         }
00443         #else
00444                 //output multiple files
00445                 for(int i=0; i<ioMgr->numOutputProcs; i++) {
00446                         io[ioMgr->outputProcArray[i]].disposeVelocities(c->seq, velIOTime);
00447                 }
00448         #endif
00449         //this instance c is freed in the next round of output invocation.        
00450     }
00451 }
00452 
00453 void CollectionMaster::checkForceReady(){
00454     CollectVectorInstance *c;
00455     if((c = forces.getReady())){
00456         if(c->status == IN_PROCESS){
00457             //indicating in the process of outputing forces
00458             return;
00459         }
00460 
00461         c->status = IN_PROCESS;
00462 
00463         forceOutTime = CmiWallTimer();
00464         //write the header
00465         parOut->forceMaster(c->seq, Node::Object()->molecule->numAtoms);
00466                 forceIOTime = CmiWallTimer() - forceOutTime;
00467 
00468         //notify output procs to do Token based output
00469         CProxy_ParallelIOMgr io(CkpvAccess(BOCclass_group).ioMgr);
00470         ParallelIOMgr *ioMgr = io.ckLocalBranch();
00471 
00472         #if OUTPUT_SINGLE_FILE
00473         int grpsize = ioMgr->numOutputProcs / ioMgr->numOutputWrts;
00474         int remains = ioMgr->numOutputProcs % ioMgr->numOutputWrts;
00475         int outrank = 0;
00476         int i;
00477         for(i=0; i<remains; i++){
00478             io[ioMgr->outputProcArray[outrank]].disposeForces(c->seq, forceIOTime);
00479             outrank += (grpsize+1);
00480         }
00481         for(; i<ioMgr->numOutputWrts; i++){
00482             io[ioMgr->outputProcArray[outrank]].disposeForces(c->seq, forceIOTime);
00483             outrank += grpsize;
00484         }
00485         #else
00486                 //output multiple files
00487                 for(int i=0; i<ioMgr->numOutputProcs; i++) {
00488                         io[ioMgr->outputProcArray[i]].disposeForces(c->seq, forceIOTime);
00489                 }
00490         #endif
00491         //this instance c is freed in the next round of output invocation.        
00492     }
00493 }
00494 
00495 
00496 void CollectionMidMaster::disposePositions(int seq)
00497 {
00498     CollectMidVectorInstance *c = positions.getReady(seq);
00499     CmiAssert(c!=NULL);
00500     parOut->coordinateSlave(seq,c->fromAtomID,c->toAtomID,
00501                                c->data.begin(),c->fdata.begin());
00502     c->free();
00503 }
00504 
00505 void CollectionMidMaster::disposeVelocities(int seq)
00506 {
00507     CollectMidVectorInstance *c = velocities.getReady(seq);
00508     CmiAssert(c!=NULL);    
00509     parOut->velocitySlave(seq,c->fromAtomID,c->toAtomID,c->data.begin()); 
00510     c->free();
00511 }
00512 
00513 void CollectionMidMaster::disposeForces(int seq)
00514 {
00515     CollectMidVectorInstance *c = forces.getReady(seq);
00516     CmiAssert(c!=NULL);    
00517     parOut->forceSlave(seq,c->fromAtomID,c->toAtomID,c->data.begin()); 
00518     c->free();
00519 }
00520 #endif
00521 
00522 #include "CollectionMaster.def.h"
00523 

Generated on Fri May 25 04:07:13 2012 for NAMD by  doxygen 1.3.9.1