00001
00007 #include "largefiles.h"
00008
00009 #include "InfoStream.h"
00010 #include "CollectionMaster.h"
00011 #include "ProcessorPrivate.h"
00012 #include "SimParameters.h"
00013 #include "packmsg.h"
00014 #include "CollectionMaster.decl.h"
00015 #include "Molecule.h"
00016
00017 #include "memusage.h"
00018
00019
00020 #include "Debug.h"
00021
00022 CollectionMaster::CollectionMaster()
00023 {
00024 if (CkpvAccess(CollectionMaster_instance) == 0) {
00025 CkpvAccess(CollectionMaster_instance) = this;
00026 } else {
00027 DebugM(1, "CollectionMaster::CollectionMaster() - another instance of CollectionMaster exists!\n");
00028 }
00029 dataStreamFile = 0;
00030
00031 #ifdef MEM_OPT_VERSION
00032 wrapCoorDoneCnt = 0;
00033 posDoneCnt = 0;
00034 velDoneCnt = 0;
00035 parOut = new ParOutput();
00036 #endif
00037
00038 posTimings = 10; velTimings = forceTimings = 5;
00039 }
00040
00041
00042 CollectionMaster::~CollectionMaster(void)
00043 {
00044 }
00045
00046 void CollectionMaster::receivePositions(CollectVectorMsg *msg)
00047 {
00048 #ifndef MEM_OPT_VERSION
00049 positions.submitData(msg,Node::Object()->molecule->numAtoms);
00050 delete msg;
00051
00052 CollectVectorInstance *c;
00053 while ( ( c = positions.removeReady() ) ) { disposePositions(c); }
00054 #endif
00055 }
00056
00057 void CollectionMaster::enqueuePositions(int seq, Lattice &lattice)
00058 {
00059 positions.enqueue(seq,lattice);
00060
00061 #ifndef MEM_OPT_VERSION
00062 CollectVectorInstance *c;
00063 while ( ( c = positions.removeReady() ) ) { disposePositions(c); }
00064 #else
00065 checkPosReady();
00066 #endif
00067 }
00068
00069 void CollectionMaster::disposePositions(CollectVectorInstance *c)
00070 {
00071 #ifndef MEM_OPT_VERSION
00072 DebugM(3,"Collected positions at " << c->seq << std::endl);
00073 int seq = c->seq;
00074 int size = c->data.size();
00075 if ( ! size ) size = c->fdata.size();
00076 Vector *data = c->data.begin();
00077 FloatVector *fdata = c->fdata.begin();
00078 double exectime = CmiWallTimer();
00079 double mem = memusage_MB();
00080 Node::Object()->output->coordinate(seq,size,data,fdata,c->lattice);
00081 c->free();
00082 exectime = CmiWallTimer()-exectime;
00083 if ( posTimings ) {
00084 CkPrintf("The last position output (seq=%d) takes %.3f seconds, %.3f MB of memory in use\n", seq, exectime, mem);
00085 --posTimings;
00086 }
00087 #endif
00088 }
00089
00090
00091 void CollectionMaster::receiveVelocities(CollectVectorMsg *msg)
00092 {
00093 #ifndef MEM_OPT_VERSION
00094 velocities.submitData(msg,Node::Object()->molecule->numAtoms);
00095 delete msg;
00096
00097 CollectVectorInstance *c;
00098 while ( ( c = velocities.removeReady() ) ) { disposeVelocities(c); }
00099 #endif
00100 }
00101
00102 void CollectionMaster::enqueueVelocities(int seq)
00103 {
00104 Lattice dummy;
00105 velocities.enqueue(seq,dummy);
00106 #ifndef MEM_OPT_VERSION
00107 CollectVectorInstance *c;
00108 while ( ( c = velocities.removeReady() ) ) { disposeVelocities(c); }
00109 #else
00110 checkVelReady();
00111 #endif
00112 }
00113
00114 void CollectionMaster::disposeVelocities(CollectVectorInstance *c)
00115 {
00116 #ifndef MEM_OPT_VERSION
00117 DebugM(3,"Collected velocities at " << c->seq << std::endl);
00118 int seq = c->seq;
00119 int size = c->data.size();
00120 Vector *data = c->data.begin();
00121 double exectime = CmiWallTimer();
00122 double mem = memusage_MB();
00123 Node::Object()->output->velocity(seq,size,data);
00124 c->free();
00125 exectime = CmiWallTimer()-exectime;
00126 if ( velTimings ) {
00127 CkPrintf("The last velocity output (seq=%d) takes %.3f seconds, %.3f MB of memory in use\n", seq, exectime, mem);
00128 --velTimings;
00129 }
00130 #endif
00131 }
00132
00133
00134 void CollectionMaster::receiveForces(CollectVectorMsg *msg)
00135 {
00136 #ifndef MEM_OPT_VERSION
00137 forces.submitData(msg,Node::Object()->molecule->numAtoms);
00138 delete msg;
00139
00140 CollectVectorInstance *c;
00141 while ( ( c = forces.removeReady() ) ) { disposeForces(c); }
00142 #endif
00143 }
00144
00145 void CollectionMaster::enqueueForces(int seq)
00146 {
00147 Lattice dummy;
00148 forces.enqueue(seq,dummy);
00149 #ifndef MEM_OPT_VERSION
00150 CollectVectorInstance *c;
00151 while ( ( c = forces.removeReady() ) ) { disposeForces(c); }
00152 #else
00153 checkForceReady();
00154 #endif
00155 }
00156
00157 void CollectionMaster::disposeForces(CollectVectorInstance *c)
00158 {
00159 #ifndef MEM_OPT_VERSION
00160 DebugM(3,"Collected forces at " << c->seq << std::endl);
00161 int seq = c->seq;
00162 int size = c->data.size();
00163 Vector *data = c->data.begin();
00164 double exectime = CmiWallTimer();
00165 double mem = memusage_MB();
00166 Node::Object()->output->force(seq,size,data);
00167 c->free();
00168 exectime = CmiWallTimer()-exectime;
00169 if ( forceTimings ) {
00170 CkPrintf("The last force output (seq=%d) takes %.3f seconds, %.3f MB of memory in use\n", seq, exectime, mem);
00171 --forceTimings;
00172 }
00173 #endif
00174 }
00175
00176
00177 void CollectionMaster::receiveDataStream(DataStreamMsg *msg) {
00178 if ( ! dataStreamFile ) {
00179 char *fname = Node::Object()->simParameters->auxFilename;
00180
00181
00182
00183 CkPrintf("Info: OPENING AUXILIARY DATA STREAM FILE %s\n", fname);
00184 NAMD_backup_file(fname);
00185 dataStreamFile = fopen(fname,"w");
00186 if ( ! dataStreamFile )
00187 NAMD_die("Can't open auxiliary data stream file!");
00188 }
00189 fprintf(dataStreamFile,"%s",msg->data.begin());
00190 fflush(dataStreamFile);
00191 delete msg;
00192 }
00193
00194 PACK_MSG(DataStreamMsg,
00195 PACK_RESIZE(data);
00196 )
00197
00198
00199
00200
00201
00202
00203 void CollectionMaster::receiveOutputPosReady(int seq){
00204 #ifdef MEM_OPT_VERSION
00205 positions.submitData(seq);
00206 checkPosReady();
00207 #endif
00208 }
00209
00210 void CollectionMaster::receiveOutputVelReady(int seq){
00211 #ifdef MEM_OPT_VERSION
00212 velocities.submitData(seq);
00213 checkVelReady();
00214 #endif
00215 }
00216
00217 void CollectionMaster::receiveOutputForceReady(int seq){
00218 #ifdef MEM_OPT_VERSION
00219 forces.submitData(seq);
00220 checkForceReady();
00221 #endif
00222 }
00223
00224
00225 void CollectionMaster::startNextRoundOutputPos(double totalT){
00226 #ifdef MEM_OPT_VERSION
00227
00228 if(totalT > posIOTime) posIOTime = totalT;
00229
00230 #if OUTPUT_SINGLE_FILE
00231 if(++posDoneCnt < Node::Object()->simParameters->numoutputwrts) return;
00232 #else
00233 if(++posDoneCnt < Node::Object()->simParameters->numoutputprocs) return;
00234 #endif
00235
00236 posDoneCnt = 0;
00237
00238
00239 CollectVectorInstance *c = positions.getReady();
00240 int seq = c->seq;
00241 CmiAssert(c->status == IN_PROCESS);
00242 double mem = memusage_MB();
00243 positions.removeFirstReady();
00244 c->free();
00245 posOutTime = CmiWallTimer()-posOutTime;
00246 if ( posTimings ) {
00247 CkPrintf("The last position output (seq=%d) takes %.3f seconds(file I/O: %.3f secs), %.3f MB of memory in use\n", seq, posOutTime, posIOTime, mem);
00248 --posTimings;
00249 }
00250
00251
00252
00253 checkPosReady();
00254 #endif
00255 }
00256
00257 void CollectionMaster::startNextRoundOutputVel(double totalT){
00258 #ifdef MEM_OPT_VERSION
00259
00260 if(totalT > velIOTime) velIOTime = totalT;
00261
00262 #if OUTPUT_SINGLE_FILE
00263 if(++velDoneCnt < Node::Object()->simParameters->numoutputwrts) return;
00264 #else
00265 if(++velDoneCnt < Node::Object()->simParameters->numoutputprocs) return;
00266 #endif
00267
00268 velDoneCnt = 0;
00269
00270
00271 CollectVectorInstance *c = velocities.getReady();
00272 int seq = c->seq;
00273 CmiAssert(c->status == IN_PROCESS);
00274 double mem = memusage_MB();
00275 velocities.removeFirstReady();
00276 c->free();
00277 velOutTime = CmiWallTimer()-velOutTime;
00278 if ( velTimings ) {
00279 CkPrintf("The last velocity output (seq=%d) takes %.3f seconds(file I/O: %.3f secs), %.3f MB of memory in use\n", seq, velOutTime, velIOTime, mem);
00280 --velTimings;
00281 }
00282
00283
00284
00285 checkVelReady();
00286 #endif
00287 }
00288
00289 void CollectionMaster::startNextRoundOutputForce(double totalT){
00290 #ifdef MEM_OPT_VERSION
00291
00292 if(totalT > forceIOTime) forceIOTime = totalT;
00293
00294 #if OUTPUT_SINGLE_FILE
00295 if(++forceDoneCnt < Node::Object()->simParameters->numoutputwrts) return;
00296 #else
00297 if(++forceDoneCnt < Node::Object()->simParameters->numoutputprocs) return;
00298 #endif
00299
00300 forceDoneCnt = 0;
00301
00302
00303 CollectVectorInstance *c = forces.getReady();
00304 int seq = c->seq;
00305 CmiAssert(c->status == IN_PROCESS);
00306 double mem = memusage_MB();
00307 forces.removeFirstReady();
00308 c->free();
00309 forceOutTime = CmiWallTimer()-forceOutTime;
00310 if ( forceTimings ) {
00311 CkPrintf("The last force output (seq=%d) takes %.3f seconds(file I/O: %.3f secs), %.3f MB of memory in use\n", seq, forceOutTime, forceIOTime, mem);
00312 --forceTimings;
00313 }
00314
00315
00316
00317 checkForceReady();
00318 #endif
00319 }
00320
00321
00322 void CollectionMaster::wrapCoorFinished(){
00323 #ifdef MEM_OPT_VERSION
00324 if(++wrapCoorDoneCnt == Node::Object()->simParameters->numoutputprocs){
00325 wrapCoorDoneCnt = 0;
00326
00327
00328 posIOTime = CmiWallTimer()-posOutTime;
00329
00330
00331 CollectVectorInstance *c = positions.getReady();
00332
00333 CProxy_ParallelIOMgr io(CkpvAccess(BOCclass_group).ioMgr);
00334 ParallelIOMgr *ioMgr = io.ckLocalBranch();
00335
00336 #if OUTPUT_SINGLE_FILE
00337
00338 int grpsize = ioMgr->numOutputProcs / ioMgr->numOutputWrts;
00339 int remains = ioMgr->numOutputProcs % ioMgr->numOutputWrts;
00340 int outrank = 0;
00341 int i;
00342 for(i=0; i<remains; i++){
00343 io[ioMgr->outputProcArray[outrank]].disposePositions(c->seq, posIOTime);
00344 outrank += (grpsize+1);
00345 }
00346 for(; i<ioMgr->numOutputWrts; i++){
00347 io[ioMgr->outputProcArray[outrank]].disposePositions(c->seq, posIOTime);
00348 outrank += grpsize;
00349 }
00350 #else
00351
00352 for(int i=0; i<ioMgr->numOutputProcs; i++) {
00353 io[ioMgr->outputProcArray[i]].disposePositions(c->seq, posIOTime);
00354 }
00355 #endif
00356
00357 }
00358 #endif
00359 }
00360
00361 #ifdef MEM_OPT_VERSION
00362 void CollectionMaster::checkPosReady(){
00363 CollectVectorInstance *c;
00364 if((c = positions.getReady())){
00365 if(c->status == IN_PROCESS){
00366
00367 return;
00368 }
00369 c->status = IN_PROCESS;
00370
00371 posOutTime = CmiWallTimer();
00372 SimParameters *simParam = Node::Object()->simParameters;
00373 CProxy_ParallelIOMgr io(CkpvAccess(BOCclass_group).ioMgr);
00374 ParallelIOMgr *ioMgr = io.ckLocalBranch();
00375 if(simParam->wrapAll || simParam->wrapWater){
00376 for(int i=0; i<ioMgr->numOutputProcs; i++){
00377 io[ioMgr->outputProcArray[i]].wrapCoor(c->seq, c->lattice);
00378 }
00379
00380
00381 parOut->coordinateMaster(c->seq,Node::Object()->molecule->numAtoms,c->lattice);
00382 }else{
00383
00384 parOut->coordinateMaster(c->seq,Node::Object()->molecule->numAtoms,c->lattice);
00385 posIOTime = CmiWallTimer() - posOutTime;
00386
00387 #if OUTPUT_SINGLE_FILE
00388 int grpsize = ioMgr->numOutputProcs / ioMgr->numOutputWrts;
00389 int remains = ioMgr->numOutputProcs % ioMgr->numOutputWrts;
00390 int outrank = 0;
00391 int i;
00392 for(i=0; i<remains; i++){
00393 io[ioMgr->outputProcArray[outrank]].disposePositions(c->seq, posIOTime);
00394 outrank += (grpsize+1);
00395 }
00396 for(; i<ioMgr->numOutputWrts; i++){
00397 io[ioMgr->outputProcArray[outrank]].disposePositions(c->seq, posIOTime);
00398 outrank += grpsize;
00399 }
00400 #else
00401
00402 for(int i=0; i<ioMgr->numOutputProcs; i++) {
00403 io[ioMgr->outputProcArray[i]].disposePositions(c->seq, posIOTime);
00404 }
00405 #endif
00406 }
00407
00408 }
00409 }
00410
00411 void CollectionMaster::checkVelReady(){
00412 CollectVectorInstance *c;
00413 if((c = velocities.getReady())){
00414 if(c->status == IN_PROCESS){
00415
00416 return;
00417 }
00418
00419 c->status = IN_PROCESS;
00420
00421 velOutTime = CmiWallTimer();
00422
00423 parOut->velocityMaster(c->seq, Node::Object()->molecule->numAtoms);
00424 velIOTime = CmiWallTimer() - velOutTime;
00425
00426
00427 CProxy_ParallelIOMgr io(CkpvAccess(BOCclass_group).ioMgr);
00428 ParallelIOMgr *ioMgr = io.ckLocalBranch();
00429
00430 #if OUTPUT_SINGLE_FILE
00431 int grpsize = ioMgr->numOutputProcs / ioMgr->numOutputWrts;
00432 int remains = ioMgr->numOutputProcs % ioMgr->numOutputWrts;
00433 int outrank = 0;
00434 int i;
00435 for(i=0; i<remains; i++){
00436 io[ioMgr->outputProcArray[outrank]].disposeVelocities(c->seq, velIOTime);
00437 outrank += (grpsize+1);
00438 }
00439 for(; i<ioMgr->numOutputWrts; i++){
00440 io[ioMgr->outputProcArray[outrank]].disposeVelocities(c->seq, velIOTime);
00441 outrank += grpsize;
00442 }
00443 #else
00444
00445 for(int i=0; i<ioMgr->numOutputProcs; i++) {
00446 io[ioMgr->outputProcArray[i]].disposeVelocities(c->seq, velIOTime);
00447 }
00448 #endif
00449
00450 }
00451 }
00452
00453 void CollectionMaster::checkForceReady(){
00454 CollectVectorInstance *c;
00455 if((c = forces.getReady())){
00456 if(c->status == IN_PROCESS){
00457
00458 return;
00459 }
00460
00461 c->status = IN_PROCESS;
00462
00463 forceOutTime = CmiWallTimer();
00464
00465 parOut->forceMaster(c->seq, Node::Object()->molecule->numAtoms);
00466 forceIOTime = CmiWallTimer() - forceOutTime;
00467
00468
00469 CProxy_ParallelIOMgr io(CkpvAccess(BOCclass_group).ioMgr);
00470 ParallelIOMgr *ioMgr = io.ckLocalBranch();
00471
00472 #if OUTPUT_SINGLE_FILE
00473 int grpsize = ioMgr->numOutputProcs / ioMgr->numOutputWrts;
00474 int remains = ioMgr->numOutputProcs % ioMgr->numOutputWrts;
00475 int outrank = 0;
00476 int i;
00477 for(i=0; i<remains; i++){
00478 io[ioMgr->outputProcArray[outrank]].disposeForces(c->seq, forceIOTime);
00479 outrank += (grpsize+1);
00480 }
00481 for(; i<ioMgr->numOutputWrts; i++){
00482 io[ioMgr->outputProcArray[outrank]].disposeForces(c->seq, forceIOTime);
00483 outrank += grpsize;
00484 }
00485 #else
00486
00487 for(int i=0; i<ioMgr->numOutputProcs; i++) {
00488 io[ioMgr->outputProcArray[i]].disposeForces(c->seq, forceIOTime);
00489 }
00490 #endif
00491
00492 }
00493 }
00494
00495
00496 void CollectionMidMaster::disposePositions(int seq)
00497 {
00498 CollectMidVectorInstance *c = positions.getReady(seq);
00499 CmiAssert(c!=NULL);
00500 parOut->coordinateSlave(seq,c->fromAtomID,c->toAtomID,
00501 c->data.begin(),c->fdata.begin());
00502 c->free();
00503 }
00504
00505 void CollectionMidMaster::disposeVelocities(int seq)
00506 {
00507 CollectMidVectorInstance *c = velocities.getReady(seq);
00508 CmiAssert(c!=NULL);
00509 parOut->velocitySlave(seq,c->fromAtomID,c->toAtomID,c->data.begin());
00510 c->free();
00511 }
00512
00513 void CollectionMidMaster::disposeForces(int seq)
00514 {
00515 CollectMidVectorInstance *c = forces.getReady(seq);
00516 CmiAssert(c!=NULL);
00517 parOut->forceSlave(seq,c->fromAtomID,c->toAtomID,c->data.begin());
00518 c->free();
00519 }
00520 #endif
00521
00522 #include "CollectionMaster.def.h"
00523