CollectionMaster.C

Go to the documentation of this file.
00001 
00007 #include "largefiles.h"  // must be first!
00008 
00009 #include "InfoStream.h"
00010 #include "CollectionMaster.h"
00011 #include "ProcessorPrivate.h"
00012 #include "SimParameters.h"
00013 #include "packmsg.h"
00014 #include "CollectionMaster.decl.h"
00015 #include "Molecule.h"
00016 
00017 #include "memusage.h"
00018 
00019 // #define DEBUGM
00020 #include "Debug.h"
00021 
00022 CollectionMaster::CollectionMaster()
00023 {
00024   if (CkpvAccess(CollectionMaster_instance) == 0) {
00025     CkpvAccess(CollectionMaster_instance) = this;
00026   } else {
00027     DebugM(1, "CollectionMaster::CollectionMaster() - another instance of CollectionMaster exists!\n");
00028   }
00029   dataStreamFile = 0;
00030 
00031 #ifdef MEM_OPT_VERSION
00032   wrapCoorDoneCnt = 0;
00033   posDoneCnt = 0;
00034   velDoneCnt = 0;
00035   parOut = new ParOutput();
00036 #endif
00037 
00038   posTimings = 10;  velTimings = forceTimings = 5;
00039 }
00040 
00041 
00042 CollectionMaster::~CollectionMaster(void)
00043 {
00044 }
00045 
00046 void CollectionMaster::receivePositions(CollectVectorMsg *msg)
00047 {
00048 #ifndef MEM_OPT_VERSION
00049   positions.submitData(msg,Node::Object()->molecule->numAtoms);
00050   delete msg;
00051   
00052   CollectVectorInstance *c;
00053   while ( ( c = positions.removeReady() ) ) { disposePositions(c); }
00054 #endif
00055 }
00056 
00057 void CollectionMaster::enqueuePositions(int seq, Lattice &lattice)
00058 {
00059   positions.enqueue(seq,lattice);
00060 
00061 #ifndef MEM_OPT_VERSION
00062   CollectVectorInstance *c;
00063   while ( ( c = positions.removeReady() ) ) { disposePositions(c); }
00064 #else
00065   checkPosReady();
00066 #endif
00067 }
00068 
00069 void CollectionMaster::disposePositions(CollectVectorInstance *c)
00070 {
00071 #ifndef MEM_OPT_VERSION
00072     DebugM(3,"Collected positions at " << c->seq << std::endl);
00073     int seq = c->seq;
00074     int size = c->data.size();
00075     if ( ! size ) size = c->fdata.size();
00076     Vector *data = c->data.begin();
00077     FloatVector *fdata = c->fdata.begin();
00078     double exectime = CmiWallTimer();
00079     double mem = memusage_MB();
00080     Node::Object()->output->coordinate(seq,size,data,fdata,c->lattice);
00081     c->free();
00082     exectime = CmiWallTimer()-exectime;
00083     if ( posTimings ) {
00084       CkPrintf("The last position output (seq=%d) takes %.3f seconds, %.3f MB of memory in use\n", seq, exectime, mem);
00085       --posTimings;
00086     }
00087 #endif
00088 }
00089 
00090 
00091 void CollectionMaster::receiveVelocities(CollectVectorMsg *msg)
00092 {
00093 #ifndef MEM_OPT_VERSION
00094   velocities.submitData(msg,Node::Object()->molecule->numAtoms);
00095   delete msg;
00096 
00097   CollectVectorInstance *c;
00098   while ( ( c = velocities.removeReady() ) ) { disposeVelocities(c); }
00099 #endif
00100 }
00101 
00102 void CollectionMaster::enqueueVelocities(int seq)
00103 {
00104   Lattice dummy;
00105   velocities.enqueue(seq,dummy);
00106 #ifndef MEM_OPT_VERSION
00107   CollectVectorInstance *c;
00108   while ( ( c = velocities.removeReady() ) ) { disposeVelocities(c); }
00109 #else
00110   checkVelReady();
00111 #endif
00112 }
00113 
00114 void CollectionMaster::disposeVelocities(CollectVectorInstance *c)
00115 {
00116 #ifndef MEM_OPT_VERSION
00117     DebugM(3,"Collected velocities at " << c->seq << std::endl);
00118     int seq = c->seq;
00119     int size = c->data.size();
00120     Vector *data = c->data.begin();
00121     double exectime = CmiWallTimer();
00122     double mem = memusage_MB();
00123     Node::Object()->output->velocity(seq,size,data);
00124     c->free();
00125     exectime = CmiWallTimer()-exectime;
00126     if ( velTimings ) {
00127       CkPrintf("The last velocity output (seq=%d) takes %.3f seconds, %.3f MB of memory in use\n", seq, exectime, mem);
00128       --velTimings;
00129     }
00130 #endif
00131 }
00132 
00133 
00134 void CollectionMaster::receiveForces(CollectVectorMsg *msg)
00135 {
00136 #ifndef MEM_OPT_VERSION
00137   forces.submitData(msg,Node::Object()->molecule->numAtoms);
00138   delete msg;
00139 
00140   CollectVectorInstance *c;
00141   while ( ( c = forces.removeReady() ) ) { disposeForces(c); }
00142 #endif
00143 }
00144 
00145 void CollectionMaster::enqueueForces(int seq)
00146 {
00147   Lattice dummy;
00148   forces.enqueue(seq,dummy);
00149 #ifndef MEM_OPT_VERSION
00150   CollectVectorInstance *c;
00151   while ( ( c = forces.removeReady() ) ) { disposeForces(c); }
00152 #else
00153   checkForceReady();
00154 #endif
00155 }
00156 
00157 void CollectionMaster::disposeForces(CollectVectorInstance *c)
00158 {
00159 #ifndef MEM_OPT_VERSION
00160     DebugM(3,"Collected forces at " << c->seq << std::endl);
00161     int seq = c->seq;
00162     int size = c->data.size();
00163     Vector *data = c->data.begin();
00164     double exectime = CmiWallTimer();
00165     double mem = memusage_MB();
00166     Node::Object()->output->force(seq,size,data);
00167     c->free();
00168     exectime = CmiWallTimer()-exectime;
00169     if ( forceTimings ) {
00170       CkPrintf("The last force output (seq=%d) takes %.3f seconds, %.3f MB of memory in use\n", seq, exectime, mem);
00171       --forceTimings;
00172     }
00173 #endif
00174 }
00175 
00176 
00177 void CollectionMaster::receiveDataStream(DataStreamMsg *msg) {
00178     if ( ! dataStreamFile ) {
00179       char *fname = Node::Object()->simParameters->auxFilename;
00180       // iout has large file linking issues on AIX
00181       // iout << iINFO << "OPENING AUXILIARY DATA STREAM FILE "
00182       //                                << fname << "\n" << endi;
00183       CkPrintf("Info: OPENING AUXILIARY DATA STREAM FILE %s\n", fname);
00184       NAMD_backup_file(fname);
00185       dataStreamFile = fopen(fname,"w");
00186       if ( ! dataStreamFile )
00187                 NAMD_die("Can't open auxiliary data stream file!");
00188     }
00189     fprintf(dataStreamFile,"%s",msg->data.begin());
00190     fflush(dataStreamFile);
00191     delete msg;
00192 }
00193 
00194 PACK_MSG(DataStreamMsg,
00195   PACK_RESIZE(data);
00196 )
00197 
00198 //The timesteps on CollectionMaster will be always in increasing order
00199 //because they are enqueued on PE0 by controller in order. -Chao Mei
00200 //
00201 //The computation of wrap_coor is also serialized for the sake of easy
00202 //implementation (easier management of output)
00203 void CollectionMaster::receiveOutputPosReady(int seq){
00204 #ifdef MEM_OPT_VERSION
00205     positions.submitData(seq);
00206     checkPosReady();
00207 #endif
00208 }
00209 
00210 void CollectionMaster::receiveOutputVelReady(int seq){
00211 #ifdef MEM_OPT_VERSION
00212     velocities.submitData(seq);
00213     checkVelReady();
00214 #endif
00215 }
00216 
00217 void CollectionMaster::receiveOutputForceReady(int seq){
00218 #ifdef MEM_OPT_VERSION
00219     forces.submitData(seq);
00220     checkForceReady();
00221 #endif
00222 }
00223 
00224 
00225 void CollectionMaster::startNextRoundOutputPos(double totalT){
00226 #ifdef MEM_OPT_VERSION
00227 
00228         if(totalT > posIOTime) posIOTime = totalT;
00229 
00230 #ifndef OUTPUT_SINGLE_FILE
00231 #error OUTPUT_SINGLE_FILE not defined!
00232 #endif
00233 
00234 #if OUTPUT_SINGLE_FILE
00235     if(++posDoneCnt < Node::Object()->simParameters->numoutputwrts)  return;
00236 #else
00237         if(++posDoneCnt < Node::Object()->simParameters->numoutputprocs)  return;
00238 #endif
00239 
00240         posDoneCnt = 0;
00241 
00242     //retrieve the last ready instance
00243     CollectVectorInstance *c = positions.getReady();
00244     int seq = c->seq;
00245     CmiAssert(c->status == IN_PROCESS);
00246     double mem = memusage_MB();
00247     positions.removeFirstReady();
00248     c->free();
00249     posOutTime = CmiWallTimer()-posOutTime;
00250     if ( posTimings ) {
00251       CkPrintf("The last position output (seq=%d) takes %.3f seconds(file I/O: %.3f secs), %.3f MB of memory in use\n", seq, posOutTime, posIOTime, mem);
00252       --posTimings;
00253     }
00254 
00255     //Actually the c->status doesn't need to be checked because it is
00256     //certain that the new ready one will not be in  IN_PROCESS status 
00257     checkPosReady();
00258 #endif
00259 }
00260 
00261 void CollectionMaster::startNextRoundOutputVel(double totalT){
00262 #ifdef MEM_OPT_VERSION
00263         
00264         if(totalT > velIOTime) velIOTime = totalT;
00265 
00266 #if OUTPUT_SINGLE_FILE
00267     if(++velDoneCnt < Node::Object()->simParameters->numoutputwrts)  return;
00268 #else
00269         if(++velDoneCnt < Node::Object()->simParameters->numoutputprocs)  return;
00270 #endif
00271 
00272     velDoneCnt = 0;
00273 
00274     //retrieve the last ready instance
00275     CollectVectorInstance *c = velocities.getReady();
00276     int seq = c->seq;
00277     CmiAssert(c->status == IN_PROCESS);
00278     double mem = memusage_MB();
00279     velocities.removeFirstReady();
00280     c->free();
00281     velOutTime = CmiWallTimer()-velOutTime;
00282     if ( velTimings ) {
00283       CkPrintf("The last velocity output (seq=%d) takes %.3f seconds(file I/O: %.3f secs), %.3f MB of memory in use\n", seq, velOutTime, velIOTime, mem);
00284       --velTimings;
00285     }
00286 
00287     //Actually the c->status doesn't need to be checked because it is
00288     //certain that the new ready one will not be in  IN_PROCESS status 
00289     checkVelReady();
00290 #endif
00291 }
00292 
00293 void CollectionMaster::startNextRoundOutputForce(double totalT){
00294 #ifdef MEM_OPT_VERSION
00295         
00296         if(totalT > forceIOTime) forceIOTime = totalT;
00297 
00298 #if OUTPUT_SINGLE_FILE
00299     if(++forceDoneCnt < Node::Object()->simParameters->numoutputwrts)  return;
00300 #else
00301         if(++forceDoneCnt < Node::Object()->simParameters->numoutputprocs)  return;
00302 #endif
00303 
00304     forceDoneCnt = 0;
00305 
00306     //retrieve the last ready instance
00307     CollectVectorInstance *c = forces.getReady();
00308     int seq = c->seq;
00309     CmiAssert(c->status == IN_PROCESS);
00310     double mem = memusage_MB();
00311     forces.removeFirstReady();
00312     c->free();
00313     forceOutTime = CmiWallTimer()-forceOutTime;
00314     if ( forceTimings ) {
00315       CkPrintf("The last force output (seq=%d) takes %.3f seconds(file I/O: %.3f secs), %.3f MB of memory in use\n", seq, forceOutTime, forceIOTime, mem);
00316       --forceTimings;
00317     }
00318 
00319     //Actually the c->status doesn't need to be checked because it is
00320     //certain that the new ready one will not be in  IN_PROCESS status 
00321     checkForceReady();
00322 #endif
00323 }
00324 
00325 
00326 void CollectionMaster::wrapCoorFinished(){
00327 #ifdef MEM_OPT_VERSION
00328     if(++wrapCoorDoneCnt == Node::Object()->simParameters->numoutputprocs){
00329         wrapCoorDoneCnt = 0;
00330 
00331                 //count the wrapping-coor time into master writing time
00332                 posIOTime = CmiWallTimer()-posOutTime; 
00333 
00334         //it's ready to output positions
00335         CollectVectorInstance *c = positions.getReady();
00336 
00337                 CProxy_ParallelIOMgr io(CkpvAccess(BOCclass_group).ioMgr);
00338                 ParallelIOMgr *ioMgr = io.ckLocalBranch();
00339 
00340 #if OUTPUT_SINGLE_FILE
00341         //notify output procs to do Token based output
00342         int grpsize = ioMgr->numOutputProcs / ioMgr->numOutputWrts;
00343         int remains = ioMgr->numOutputProcs % ioMgr->numOutputWrts;
00344         int outrank = 0;
00345         int i;
00346         for(i=0; i<remains; i++){
00347             io[ioMgr->outputProcArray[outrank]].disposePositions(c->seq, posIOTime);
00348             outrank += (grpsize+1);
00349         }
00350         for(; i<ioMgr->numOutputWrts; i++){
00351             io[ioMgr->outputProcArray[outrank]].disposePositions(c->seq, posIOTime);
00352             outrank += grpsize;
00353         }
00354 #else
00355                 //output multiple files
00356                 for(int i=0; i<ioMgr->numOutputProcs; i++) {
00357                         io[ioMgr->outputProcArray[i]].disposePositions(c->seq, posIOTime);
00358                 }
00359 #endif
00360 
00361     }
00362 #endif
00363 }
00364 
00365 #ifdef MEM_OPT_VERSION
00366 void CollectionMaster::checkPosReady(){
00367     CollectVectorInstance *c;
00368     if((c = positions.getReady())){
00369         if(c->status == IN_PROCESS){
00370             //indicating in the process of outputing coordinates
00371             return;
00372         }        
00373         c->status = IN_PROCESS;
00374 
00375         posOutTime = CmiWallTimer();
00376         SimParameters *simParam = Node::Object()->simParameters;
00377         CProxy_ParallelIOMgr io(CkpvAccess(BOCclass_group).ioMgr);
00378         ParallelIOMgr *ioMgr = io.ckLocalBranch();
00379         if(simParam->wrapAll || simParam->wrapWater){
00380             for(int i=0; i<ioMgr->numOutputProcs; i++){
00381                 io[ioMgr->outputProcArray[i]].wrapCoor(c->seq, c->lattice);
00382             }
00383             //write the header to overlap with the computation of 
00384             //wrapping coordinates
00385             parOut->coordinateMaster(c->seq,Node::Object()->molecule->numAtoms,c->lattice);
00386         }else{
00387             //write the header 
00388             parOut->coordinateMaster(c->seq,Node::Object()->molecule->numAtoms,c->lattice);
00389                         posIOTime = CmiWallTimer() - posOutTime;            
00390 
00391                 #if OUTPUT_SINGLE_FILE
00392             int grpsize = ioMgr->numOutputProcs / ioMgr->numOutputWrts;
00393             int remains = ioMgr->numOutputProcs % ioMgr->numOutputWrts;
00394             int outrank = 0;
00395             int i;
00396             for(i=0; i<remains; i++){
00397                 io[ioMgr->outputProcArray[outrank]].disposePositions(c->seq, posIOTime);
00398                 outrank += (grpsize+1);
00399             }
00400             for(; i<ioMgr->numOutputWrts; i++){
00401                 io[ioMgr->outputProcArray[outrank]].disposePositions(c->seq, posIOTime);
00402                 outrank += grpsize;
00403             }
00404                 #else
00405                         //output multiple files
00406                         for(int i=0; i<ioMgr->numOutputProcs; i++) {
00407                                 io[ioMgr->outputProcArray[i]].disposePositions(c->seq, posIOTime);
00408                         }
00409                 #endif
00410         }
00411         //this instance c is freed in the next round of output invocation.
00412     }
00413 }
00414 
00415 void CollectionMaster::checkVelReady(){
00416     CollectVectorInstance *c;
00417     if((c = velocities.getReady())){
00418         if(c->status == IN_PROCESS){
00419             //indicating in the process of outputing velocities
00420             return;
00421         }
00422 
00423         c->status = IN_PROCESS;
00424 
00425         velOutTime = CmiWallTimer();
00426         //write the header
00427         parOut->velocityMaster(c->seq, Node::Object()->molecule->numAtoms);
00428                 velIOTime = CmiWallTimer() - velOutTime;
00429 
00430         //notify output procs to do Token based output
00431         CProxy_ParallelIOMgr io(CkpvAccess(BOCclass_group).ioMgr);
00432         ParallelIOMgr *ioMgr = io.ckLocalBranch();
00433 
00434         #if OUTPUT_SINGLE_FILE
00435         int grpsize = ioMgr->numOutputProcs / ioMgr->numOutputWrts;
00436         int remains = ioMgr->numOutputProcs % ioMgr->numOutputWrts;
00437         int outrank = 0;
00438         int i;
00439         for(i=0; i<remains; i++){
00440             io[ioMgr->outputProcArray[outrank]].disposeVelocities(c->seq, velIOTime);
00441             outrank += (grpsize+1);
00442         }
00443         for(; i<ioMgr->numOutputWrts; i++){
00444             io[ioMgr->outputProcArray[outrank]].disposeVelocities(c->seq, velIOTime);
00445             outrank += grpsize;
00446         }
00447         #else
00448                 //output multiple files
00449                 for(int i=0; i<ioMgr->numOutputProcs; i++) {
00450                         io[ioMgr->outputProcArray[i]].disposeVelocities(c->seq, velIOTime);
00451                 }
00452         #endif
00453         //this instance c is freed in the next round of output invocation.        
00454     }
00455 }
00456 
00457 void CollectionMaster::checkForceReady(){
00458     CollectVectorInstance *c;
00459     if((c = forces.getReady())){
00460         if(c->status == IN_PROCESS){
00461             //indicating in the process of outputing forces
00462             return;
00463         }
00464 
00465         c->status = IN_PROCESS;
00466 
00467         forceOutTime = CmiWallTimer();
00468         //write the header
00469         parOut->forceMaster(c->seq, Node::Object()->molecule->numAtoms);
00470                 forceIOTime = CmiWallTimer() - forceOutTime;
00471 
00472         //notify output procs to do Token based output
00473         CProxy_ParallelIOMgr io(CkpvAccess(BOCclass_group).ioMgr);
00474         ParallelIOMgr *ioMgr = io.ckLocalBranch();
00475 
00476         #if OUTPUT_SINGLE_FILE
00477         int grpsize = ioMgr->numOutputProcs / ioMgr->numOutputWrts;
00478         int remains = ioMgr->numOutputProcs % ioMgr->numOutputWrts;
00479         int outrank = 0;
00480         int i;
00481         for(i=0; i<remains; i++){
00482             io[ioMgr->outputProcArray[outrank]].disposeForces(c->seq, forceIOTime);
00483             outrank += (grpsize+1);
00484         }
00485         for(; i<ioMgr->numOutputWrts; i++){
00486             io[ioMgr->outputProcArray[outrank]].disposeForces(c->seq, forceIOTime);
00487             outrank += grpsize;
00488         }
00489         #else
00490                 //output multiple files
00491                 for(int i=0; i<ioMgr->numOutputProcs; i++) {
00492                         io[ioMgr->outputProcArray[i]].disposeForces(c->seq, forceIOTime);
00493                 }
00494         #endif
00495         //this instance c is freed in the next round of output invocation.        
00496     }
00497 }
00498 
00499 
00500 void CollectionMidMaster::disposePositions(int seq)
00501 {
00502     CollectMidVectorInstance *c = positions.getReady(seq);
00503     CmiAssert(c!=NULL);
00504     parOut->coordinateSlave(seq,c->fromAtomID,c->toAtomID,
00505                                c->data.begin(),c->fdata.begin());
00506     c->free();
00507 }
00508 
00509 void CollectionMidMaster::disposeVelocities(int seq)
00510 {
00511     CollectMidVectorInstance *c = velocities.getReady(seq);
00512     CmiAssert(c!=NULL);    
00513     parOut->velocitySlave(seq,c->fromAtomID,c->toAtomID,c->data.begin()); 
00514     c->free();
00515 }
00516 
00517 void CollectionMidMaster::disposeForces(int seq)
00518 {
00519     CollectMidVectorInstance *c = forces.getReady(seq);
00520     CmiAssert(c!=NULL);    
00521     parOut->forceSlave(seq,c->fromAtomID,c->toAtomID,c->data.begin()); 
00522     c->free();
00523 }
00524 #endif
00525 
00526 #include "CollectionMaster.def.h"
00527 

Generated on Sat Sep 23 01:17:11 2017 for NAMD by  doxygen 1.4.7