ParallelIOMgr.C

Go to the documentation of this file.
00001 
00002 #include "largefiles.h"  // must be first!
00003 
00004 #include <stdio.h>
00005 #include "BOCgroup.h"
00006 #include "Molecule.h"
00007 #include "Node.h"
00008 #include "Node.decl.h"
00009 #include "NamdState.h"
00010 #include "WorkDistrib.h"
00011 #include "PDB.h"
00012 #include "PatchMap.h"
00013 #include "PatchMap.inl"
00014 #include "packmsg.h"
00015 #include "HomePatch.h"
00016 #include "InfoStream.h"
00017 #include "CollectionMaster.h"
00018 #include "CollectionMgr.h"
00019 
00020 #include "ParallelIOMgr.decl.h"
00021 #include "ParallelIOMgr.h"
00022 
00023 #include "Output.h"
00024 #include "Random.h"
00025 
00026 #include <algorithm>
00027 using namespace std;
00028 
00029 ParallelIOMgr::ParallelIOMgr()
00030 {
00031     CkpvAccess(BOCclass_group).ioMgr = thisgroup;
00032 
00033     numInputProcs=-1;
00034     inputProcArray = NULL;
00035     numOutputProcs=-1;
00036     outputProcArray = NULL;
00037 
00038     procsReceived=0;
00039     hydroMsgRecved=0;
00040 
00041     totalMV.x = totalMV.y = totalMV.z = 0.0;
00042     totalMass = 0.0;
00043     totalCharge = 0.0;
00044     numTotalExclusions = 0;
00045     numCalcExclusions = 0;
00046     numCalcFullExclusions = 0;
00047 
00048     isOKToRecvHPAtoms = false;
00049     hpAtomsList = NULL;
00050 
00051     clusterID = NULL;
00052     clusterSize = NULL;
00053 
00054 #ifdef MEM_OPT_VERSION
00055     midCM = NULL;
00056 #endif
00057 
00058     isWater = NULL;
00059 
00060     numCSMAck = 0;
00061     numReqRecved = 0;
00062 
00063     sendAtomsThread = 0;
00064 
00065 #if COLLECT_PERFORMANCE_DATA
00066     numFixedAtomLookup = 0;
00067 #endif
00068 }
00069 
00070 ParallelIOMgr::~ParallelIOMgr()
00071 {
00072     delete [] inputProcArray;
00073     delete [] outputProcArray;
00074     delete [] clusterID;
00075     delete [] clusterSize;
00076 
00077 #ifdef MEM_OPT_VERSION
00078     delete midCM;
00079 #endif
00080 
00081     delete [] isWater;
00082 }
00083 
00084 #ifndef OUTPUT_SINGLE_FILE
00085 #error OUTPUT_SINGLE_FILE not defined!
00086 #endif
00087 
00088 // initialization needed for the parallel IO manager. Entry point through Scripttcl
00089 void ParallelIOMgr::initialize(Node *node)
00090 {
00091     simParameters = node->simParameters;
00092     molecule = node->molecule;
00093 
00094     numInputProcs = simParameters->numinputprocs;
00095     numOutputProcs = simParameters->numoutputprocs;
00096     numOutputWrts = simParameters->numoutputwrts;
00097 
00098 
00099     if(!CkMyPe()) {
00100         iout << iINFO << "Running with " <<numInputProcs<<" input processors.\n"<<endi;
00101         #if OUTPUT_SINGLE_FILE
00102         iout << iINFO << "Running with " <<numOutputProcs<<" output processors ("<<numOutputWrts<<" of them will output simultaneously).\n"<<endi;
00103         #else
00104         iout << iINFO << "Running with " <<numOutputProcs<<" output processors, and each of them will output to its own separate file.\n"<<endi;
00105         #endif
00106     }
00107 
00108     //build inputProcArray
00109    {
00110     inputProcArray = new int[numInputProcs];
00111     myInputRank = -1;
00112     for(int i=0; i<numInputProcs; ++i) {
00113       inputProcArray[i] = WorkDistrib::peDiffuseOrdering[(1+numOutputProcs+i)%CkNumPes()];
00114     }
00115     std::sort(inputProcArray, inputProcArray+numInputProcs);
00116     for(int i=0; i<numInputProcs; ++i) {
00117       if ( CkMyPe() == inputProcArray[i] ) {
00118         if ( myInputRank != -1 ) NAMD_bug("Duplicate input proc");
00119         myInputRank = i;
00120       }
00121     }
00122 
00123     if(!CkMyPe()) {
00124       iout << iINFO << "INPUT PROC LOCATIONS:";
00125       int i;
00126       for ( i=0; i<numInputProcs && i < 10; ++i ) {
00127         iout << " " << inputProcArray[i];
00128       }
00129       if ( i<numInputProcs ) iout << " ... " << inputProcArray[numInputProcs-1];
00130       iout << "\n" << endi;
00131     }
00132    }
00133 
00134     if(myInputRank!=-1) {
00135         //NOTE: this could further be optimized by pre-allocate the memory
00136         //for incoming atoms --Chao Mei
00137         int numMyAtoms = numInitMyAtomsOnInput();
00138         initAtoms.resize(numMyAtoms+100);  // extra space for orphan hydrogens
00139         initAtoms.resize(numMyAtoms);
00140         tmpRecvAtoms.resize(0);
00141     } else {
00142         initAtoms.resize(0);
00143         tmpRecvAtoms.resize(0);
00144     }
00145     hpIDList.resize(0);
00146 
00147     //build outputProcArray
00148     //spread the output processors across all the processors
00149    {
00150     outputProcArray = new int[numOutputProcs];
00151     outputProcFlags = new char[CkNumPes()];
00152     myOutputRank = -1;
00153     for(int i=0; i<numOutputProcs; ++i) {
00154       outputProcArray[i] = WorkDistrib::peDiffuseOrdering[(1+i)%CkNumPes()];
00155     }
00156     std::sort(outputProcArray, outputProcArray+numOutputProcs);
00157     for(int i=0; i<CkNumPes(); ++i) {
00158       outputProcFlags[i] = 0;
00159     }
00160     for(int i=0; i<numOutputProcs; ++i) {
00161       outputProcFlags[outputProcArray[i]] = 1;
00162       if ( CkMyPe() == outputProcArray[i] ) {
00163         if ( myOutputRank != -1 ) NAMD_bug("Duplicate output proc");
00164         myOutputRank = i;
00165       }
00166     }
00167 
00168     if(!CkMyPe()) {
00169       iout << iINFO << "OUTPUT PROC LOCATIONS:";
00170       int i;
00171       for ( i=0; i<numOutputProcs && i < 10; ++i ) {
00172         iout << " " << outputProcArray[i];
00173       }
00174       if ( i<numOutputProcs ) iout << " ... " << outputProcArray[numOutputProcs-1];
00175       iout << "\n" << endi;
00176     }
00177    }
00178 
00179 #ifdef MEM_OPT_VERSION
00180     if(myOutputRank!=-1) {
00181         midCM = new CollectionMidMaster(this);
00182     }
00183     remoteClusters.clear();
00184     csmBuf.resize(0);
00185     remoteCoors.clear();
00186     ccmBuf.resize(0);
00187 
00188     mainMaster = CollectionMgr::Object()->getMasterChareID();
00189 #endif
00190 }
00191 
00192 bool ParallelIOMgr::isOutputProcessor(int pe) {
00193    return outputProcFlags[pe];
00194 }
00195 
00196 int isOutputProcessor(int pe){ 
00197   return CProxy_ParallelIOMgr::ckLocalBranch(CkpvAccess(BOCclass_group).ioMgr)->isOutputProcessor(pe);
00198 }
00199 
00200 
00201 
00202 void ParallelIOMgr::readPerAtomInfo()
00203 {
00204 #ifdef MEM_OPT_VERSION
00205     if(myInputRank!=-1) {
00206         int myAtomLIdx, myAtomUIdx;
00207         getMyAtomsInitRangeOnInput(myAtomLIdx, myAtomUIdx);
00208 
00209         //1. read the file that contains per-atom info such as signature index
00210         molecule->read_binary_atom_info(myAtomLIdx, myAtomUIdx, initAtoms);
00211 
00212         //2. read coordinates and velocities of each atom if the velocity file
00213         //exists, otherwise, the velocity of each atom is randomly generated.
00214         //This has to be DONE AFTER THE FIRST STEP as the atom mass is required
00215         //if the velocity is generated randomly.
00216         readCoordinatesAndVelocity();
00217 
00218         //3. set every atom's output processor rank, i.e. the dest pe this
00219         //atom will be sent for writing positions and velocities etc.
00220         int oRank=atomRankOnOutput(myAtomLIdx);
00221         for(int i=oRank; i<numOutputProcs; i++) {
00222             int lIdx, uIdx; //indicates the range of atom ids outputProcArray[i] has
00223             getAtomsRangeOnOutput(lIdx, uIdx, i);
00224             if(lIdx > myAtomUIdx) break;
00225             int fid = lIdx>myAtomLIdx?lIdx:myAtomLIdx;
00226             int tid = uIdx>myAtomUIdx?myAtomUIdx:uIdx;
00227             for(int j=fid; j<=tid; j++) initAtoms[j-myAtomLIdx].outputRank = i;
00228         }
00229     }
00230 
00231     //read clusters
00232     if(myOutputRank!=-1) {
00233         //only when wrapAll or wrapWater is set, cluster info is required
00234         if(!(simParameters->wrapAll || simParameters->wrapWater)) return;
00235         readInfoForParOutput();
00236     }
00237 #endif
00238 }
00239 
00240 void ParallelIOMgr::readCoordinatesAndVelocity()
00241 {
00242 #ifdef MEM_OPT_VERSION
00243     int needFlip = 0;
00244     int myAtomLIdx, myAtomUIdx;
00245     getMyAtomsInitRangeOnInput(myAtomLIdx, myAtomUIdx);
00246     int myNumAtoms = myAtomUIdx-myAtomLIdx+1;
00247 
00248     //contains the data for Position and Velocity
00249     Vector *tmpData = new Vector[myNumAtoms];
00250 
00251     //begin to read coordinates
00252     //step1: open the file
00253     FILE *ifp = fopen(simParameters->binCoorFile, "rb");
00254     if(!ifp) {
00255         char s[256];
00256         sprintf(s, "The binary coordinate file %s cannot be opened on proc %d\n", simParameters->binCoorFile, CkMyPe());
00257         NAMD_err(s);
00258     }
00259     //step2: check whether flip is needed
00260     int filelen;
00261     fread(&filelen, sizeof(int32),1,ifp);
00262     char lenbuf[sizeof(int32)];
00263     memcpy(lenbuf, (const char *)&filelen, sizeof(int32));
00264     flipNum(lenbuf, sizeof(int32), 1);
00265     if(!memcmp(lenbuf, (const char *)&filelen, sizeof(int32))) {
00266         iout << iWARN << "Number of atoms in binary file " << simParameters->binCoorFile
00267              <<" is palindromic, assuming same endian.\n" << endi;
00268     }
00269     if(filelen!=molecule->numAtoms) {
00270         needFlip = 1;
00271         memcpy((void *)&filelen, lenbuf,sizeof(int32));
00272     }
00273     if(filelen!=molecule->numAtoms) {
00274         char s[256];
00275         sprintf(s, "Incorrect atom count in binary file %s", simParameters->binCoorFile);
00276         NAMD_die(s);
00277     }
00278     //step3: read the file specified by the range
00279     int64 offsetPos = ((int64)myAtomLIdx)*sizeof(Position);
00280 #ifdef WIN32
00281     if ( _fseeki64(ifp, offsetPos, SEEK_CUR) )
00282 #else
00283     if ( fseeko(ifp, offsetPos, SEEK_CUR) )
00284 #endif
00285     {
00286         char s[256];
00287         sprintf(s, "Error in seeking binary file %s on proc %d",  simParameters->binCoorFile, CkMyPe());
00288         NAMD_err(s);
00289     }
00290     size_t totalRead = fread(tmpData, sizeof(Vector), myNumAtoms, ifp);
00291     if(totalRead!=myNumAtoms) {
00292         char s[256];
00293         sprintf(s, "Error in reading binary file %s on proc %d",  simParameters->binCoorFile, CkMyPe());
00294         NAMD_err(s);
00295     }
00296     if(needFlip) flipNum((char *)tmpData, sizeof(BigReal), myNumAtoms*3);
00297     fclose(ifp);
00298     for(int i=0; i<myNumAtoms; i++) initAtoms[i].position = tmpData[i];
00299 
00300     //begin to read velocity
00301     //step1: generate velocity randomly or open the file
00302     if(!simParameters->binVelFile) {
00303         //generate velocity randomly
00304         Node::Object()->workDistrib->random_velocities_parallel(simParameters->initialTemp, initAtoms);
00305     } else {
00306         ifp = fopen(simParameters->binVelFile, "rb");
00307         if(!ifp) {
00308             char s[256];
00309             sprintf(s, "The binary velocity file %s cannot be opened on proc %d\n", simParameters->binVelFile, CkMyPe());
00310             NAMD_err(s);
00311         }
00312         //step2: check whether flip is needed
00313         fread(&filelen, sizeof(int32),1,ifp);
00314         memcpy(lenbuf, (const char *)&filelen, sizeof(int32));
00315         flipNum(lenbuf, sizeof(int32), 1);
00316         if(!memcmp(lenbuf, (const char *)&filelen, sizeof(int32))) {
00317             iout << iWARN << "Number of atoms in binary file " << simParameters->binVelFile
00318                  <<" is palindromic, assuming same endian.\n" << endi;
00319         }
00320         if(filelen!=molecule->numAtoms) {
00321             needFlip = 1;
00322             memcpy((void *)&filelen, lenbuf,sizeof(int32));
00323         }
00324         if(filelen!=molecule->numAtoms) {
00325             char s[256];
00326             sprintf(s, "Incorrect atom count in binary file %s", simParameters->binVelFile);
00327             NAMD_die(s);
00328         }
00329 
00330         //step3: read the file specified by the range
00331         int64 offsetPos = ((int64)myAtomLIdx)*sizeof(Velocity);
00332 #ifdef WIN32
00333         if ( _fseeki64(ifp, offsetPos, SEEK_CUR) )
00334 #else
00335         if ( fseeko(ifp, offsetPos, SEEK_CUR) )
00336 #endif
00337         {
00338             char s[256];
00339             sprintf(s, "Error in seeking binary file %s on proc %d",  simParameters->binVelFile, CkMyPe());
00340             NAMD_err(s);
00341         }
00342         totalRead = fread(tmpData, sizeof(Vector), myNumAtoms, ifp);
00343         if(totalRead!=myNumAtoms) {
00344             char s[256];
00345             sprintf(s, "Error in reading binary file %s on proc %d",  simParameters->binVelFile, CkMyPe());
00346             NAMD_err(s);
00347         }
00348         if(needFlip) flipNum((char *)tmpData, sizeof(BigReal), myNumAtoms*3);
00349         fclose(ifp);
00350         for(int i=0; i<myNumAtoms; i++) initAtoms[i].velocity = tmpData[i];
00351     }
00352 
00353     //begin to read reference coordinates
00354     //step1: use initial positions or open the file
00355     if(!simParameters->binRefFile) {
00356         for(int i=0; i<myNumAtoms; i++) initAtoms[i].fixedPosition = initAtoms[i].position;
00357     } else {
00358         ifp = fopen(simParameters->binRefFile, "rb");
00359         if(!ifp) {
00360             char s[256];
00361             sprintf(s, "The binary reference coordinate file %s cannot be opened on proc %d\n", simParameters->binRefFile, CkMyPe());
00362             NAMD_err(s);
00363         }
00364         //step2: check whether flip is needed
00365         fread(&filelen, sizeof(int32),1,ifp);
00366         memcpy(lenbuf, (const char *)&filelen, sizeof(int32));
00367         flipNum(lenbuf, sizeof(int32), 1);
00368         if(!memcmp(lenbuf, (const char *)&filelen, sizeof(int32))) {
00369             iout << iWARN << "Number of atoms in binary file " << simParameters->binRefFile
00370                  <<" is palindromic, assuming same endian.\n" << endi;
00371         }
00372         if(filelen!=molecule->numAtoms) {
00373             needFlip = 1;
00374             memcpy((void *)&filelen, lenbuf,sizeof(int32));
00375         }
00376         if(filelen!=molecule->numAtoms) {
00377             char s[256];
00378             sprintf(s, "Incorrect atom count in binary file %s", simParameters->binRefFile);
00379             NAMD_die(s);
00380         }
00381 
00382         //step3: read the file specified by the range
00383         int64 offsetPos = ((int64)myAtomLIdx)*sizeof(Position);
00384 #ifdef WIN32
00385         if ( _fseeki64(ifp, offsetPos, SEEK_CUR) )
00386 #else
00387         if ( fseeko(ifp, offsetPos, SEEK_CUR) )
00388 #endif
00389         {
00390             char s[256];
00391             sprintf(s, "Error in seeking binary file %s on proc %d",  simParameters->binRefFile, CkMyPe());
00392             NAMD_err(s);
00393         }
00394         totalRead = fread(tmpData, sizeof(Vector), myNumAtoms, ifp);
00395         if(totalRead!=myNumAtoms) {
00396             char s[256];
00397             sprintf(s, "Error in reading binary file %s on proc %d",  simParameters->binRefFile, CkMyPe());
00398             NAMD_err(s);
00399         }
00400         if(needFlip) flipNum((char *)tmpData, sizeof(BigReal), myNumAtoms*3);
00401         fclose(ifp);
00402         for(int i=0; i<myNumAtoms; i++) initAtoms[i].fixedPosition = tmpData[i];
00403     }
00404 
00405     delete [] tmpData;
00406 #endif
00407 }
00408 
00409 void ParallelIOMgr::readInfoForParOutput()
00410 {
00411     int fromIdx, toIdx; //atoms' range
00412     getMyAtomsRangeOnOutput(fromIdx,toIdx);
00413     int numMyAtoms = toIdx-fromIdx+1;
00414 
00415     clusterID = new int[numMyAtoms];
00416     clusterSize = new int[numMyAtoms];
00417 
00418     //Since the input proc also reads this file, all the checks
00419     //(file version check, atom record size check) are omitted here
00420     FILE *ifp = fopen(simParameters->binAtomFile, "rb");
00421     //read magic number to set needFlip
00422     int needFlip = 0;
00423     int magicNum;
00424     fread(&magicNum, sizeof(int), 1, ifp);
00425     if (magicNum!=COMPRESSED_PSF_MAGICNUM) {
00426         needFlip = 1;
00427     }
00428 
00429     //need to load isWater info
00430     isWater = new char[numMyAtoms];
00431     //seek from the end of the file (note offset is negative!)
00432     int64 offset = sizeof(char)*((int64)(fromIdx-molecule->numAtoms));
00433 #ifdef WIN32
00434     if ( _fseeki64(ifp, offset, SEEK_END) )
00435 #else
00436     if ( fseeko(ifp, offset, SEEK_END) )
00437 #endif
00438     {
00439         char s[256];
00440         sprintf(s, "Error in seeking binary file %s on proc %d",  simParameters->binAtomFile, CkMyPe());
00441         NAMD_err(s);
00442     }
00443     fread(isWater, sizeof(char), numMyAtoms, ifp);
00444     //there's no need for flipping as it's a char array
00445 
00446     //seek from the end of the file (note offset is negative!)
00447     offset = sizeof(int)*((int64)(fromIdx-molecule->numAtoms))
00448                    - sizeof(char)*((int64)(molecule->numAtoms));
00449 #ifdef WIN32
00450     if ( _fseeki64(ifp, offset, SEEK_END) )
00451 #else
00452     if ( fseeko(ifp, offset, SEEK_END) )
00453 #endif
00454     {
00455         char s[256];
00456         sprintf(s, "Error in seeking binary file %s on proc %d",  simParameters->binAtomFile, CkMyPe());
00457         NAMD_err(s);
00458     }
00459     fread(clusterID, sizeof(int), numMyAtoms, ifp);
00460     if(needFlip) flipNum((char *)clusterID, sizeof(int), numMyAtoms);
00461     fclose(ifp);
00462 
00463     //calculate cluster size (communication may be neccessary)
00464     ClusterElem one;
00465     for(int i=0; i<numMyAtoms; i++) {
00466         clusterSize[i] = 0;        
00467         int cid = clusterID[i];
00468         //check if the cluster id (i.e. the header of the atom of
00469         //the cluster) is in my local repository. If the cluster id
00470         //is not in my local repository, then it must appear in output
00471         //processors that are in front of this one because of the way
00472         //of determing cluster info for the molecular system.
00473         CmiAssert(cid<=toIdx);
00474         if(cid<fromIdx) {
00475             //on output procs ahead of me
00476             one.clusterId = cid;
00477             ClusterElem *ret = remoteClusters.find(one);
00478             if(ret==NULL) {
00479                 one.atomsCnt = 1;
00480                 remoteClusters.add(one);
00481             } else {
00482                 ret->atomsCnt++;
00483             }
00484         } else {
00485             int lidx = cid-fromIdx;
00486             CmiAssert(lidx<=i);
00487             clusterSize[lidx]++;
00488         }
00489     }
00490 
00491     //Prepare to send msgs to remote output procs to reduce the cluster size
00492     //Since the expected number of msgs to be very small, msgs to the same proc
00493     //are not aggregated. --Chao Mei
00494 #if 0
00495     printf("output[%d]=%d: prepare to send %d remote msgs for cluster size\n",
00496            myOutputRank, CkMyPe(), remoteClusters.size());
00497 #endif
00498 
00499     numRemoteClusters = remoteClusters.size();
00500     numCSMAck = 0; //set to 0 to prepare recving the final cluster size update
00501     CProxy_ParallelIOMgr pIO(thisgroup);
00502     ClusterSetIter iter(remoteClusters);
00503     for(iter=iter.begin(); iter!=iter.end(); iter++) {
00504         ClusterSizeMsg *msg = new ClusterSizeMsg;
00505         msg->srcRank = myOutputRank;
00506         msg->clusterId = iter->clusterId;
00507         msg->atomsCnt = iter->atomsCnt;
00508         int dstRank = atomRankOnOutput(iter->clusterId);
00509         pIO[outputProcArray[dstRank]].recvClusterSize(msg);
00510     }
00511 }
00512 
00513 void ParallelIOMgr::recvClusterSize(ClusterSizeMsg *msg)
00514 {
00515     csmBuf.add(msg); //added to buffer for reuse to send back to src
00516 
00517     //update cluster size has to be delayed to integration to prevent
00518     //data racing where the clusterSize has not been created!
00519 }
00520 
00521 void ParallelIOMgr::integrateClusterSize()
00522 {
00523     if(myOutputRank==-1) return;
00524     if(!(simParameters->wrapAll || simParameters->wrapWater)) return;
00525 
00526     int fromIdx, toIdx; //atoms' range
00527     getMyAtomsRangeOnOutput(fromIdx,toIdx);
00528 
00529     //calculated the final cluster size
00530     for(int i=0; i<csmBuf.size(); i++) {
00531         ClusterSizeMsg *msg = csmBuf[i];
00532         int lidx = msg->clusterId - fromIdx;
00533         clusterSize[lidx] += msg->atomsCnt;
00534     }
00535 
00536     CProxy_ParallelIOMgr pIO(thisgroup);
00537     for(int i=0; i<csmBuf.size(); i++) {
00538         ClusterSizeMsg *msg = csmBuf[i];
00539         int lidx = msg->clusterId - fromIdx;
00540         msg->atomsCnt = clusterSize[lidx];
00541         pIO[outputProcArray[msg->srcRank]].recvFinalClusterSize(msg);
00542     }
00543     numRemoteReqs = csmBuf.size();
00544     csmBuf.resize(0);
00545     
00546     //There's a possible msg race problem here that recvFinalClusterSize 
00547     //executes before integrateClusterSize because other proc finishes faster
00548     //in calculating the cluster size. The recvFinalClusterSize should be
00549     //executed after integrateClusterSize. To avoid this, a self message is
00550     //sent to participate the reduction.
00551     if(numRemoteClusters!=0){
00552         recvFinalClusterSize(NULL);
00553     }else{
00554         //this output proc already has the final cluster size for each atom
00555         int numMyAtoms = toIdx-fromIdx+1;
00556         for(int i=0; i<numMyAtoms; i++) {
00557             int lidx = clusterID[i]-fromIdx;
00558             clusterSize[i] = clusterSize[lidx];
00559         }
00560         
00561         #if 0 //write out cluster debug info
00562         char fname[128];
00563         sprintf(fname, "cluster.par.%d", CkMyPe());
00564         FILE *ofp = fopen(fname, "w");
00565         for(int i=0; i<numMyAtoms; i++) {
00566             fprintf(ofp, "%d: %d: %d\n", i+fromIdx, clusterID[i], clusterSize[i]);
00567         }
00568         fclose(ofp);
00569         #endif
00570     }
00571 }
00572 
00573 void ParallelIOMgr::recvFinalClusterSize(ClusterSizeMsg *msg)
00574 {
00575     //only process the message sent by other procs
00576     if(msg!=NULL) {
00577         //indicating a message from other procs
00578         ClusterElem one(msg->clusterId);
00579         ClusterElem *ret = remoteClusters.find(one);
00580         CmiAssert(ret!=NULL);
00581         ret->atomsCnt = msg->atomsCnt;
00582     }
00583     delete msg;
00584 
00585     //include a msg sent by itself for reduction
00586     if(++numCSMAck == (numRemoteClusters+1)) {
00587         //recved all the msgs needed to update the cluster size for each atom finally
00588         int fromIdx, toIdx; //atoms' range
00589         getMyAtomsRangeOnOutput(fromIdx,toIdx);
00590         int numMyAtoms = toIdx-fromIdx+1;
00591         ClusterElem tmp;
00592         for(int i=0; i<numMyAtoms; i++) {
00593             int cid = clusterID[i];
00594             int lidx = cid-fromIdx;
00595             if(lidx<0) {
00596                 //this cid should be inside remoteClusters
00597                 tmp.clusterId = cid;
00598                 ClusterElem *fone = remoteClusters.find(tmp);
00599                 clusterSize[i] = fone->atomsCnt;
00600             } else {
00601                 clusterSize[i] = clusterSize[lidx];
00602             }
00603         }
00604         numCSMAck = 0;
00605         remoteClusters.clear();
00606 
00607 #if 0 //write out cluster debug info
00608         char fname[128];
00609         sprintf(fname, "cluster.par.%d", CkMyPe());
00610         FILE *ofp = fopen(fname, "w");
00611         for(int i=0; i<numMyAtoms; i++) {
00612             fprintf(ofp, "%d: %d: %d\n", i+fromIdx, clusterID[i], clusterSize[i]);
00613         }
00614         fclose(ofp);
00615 #endif
00616 
00617     }
00618 }
00619 
00620 void ParallelIOMgr::migrateAtomsMGrp()
00621 {
00622     if(myInputRank==-1) return;
00623 
00624     //1. first get the list of atoms to be migrated
00625     //which should be few compared with the number of atoms
00626     //initially assigned to this input proc.
00627     AtomIDList toMigrateList; //the list of atoms to be migrated
00628     //the max distance from this processor of atoms to be sent
00629     int maxOffset = 0;
00630     for(int i=0; i<initAtoms.size(); i++) {
00631         //returns the proc id on which atom MPID resides on
00632         int parentRank = atomInitRankOnInput(initAtoms[i].MPID);
00633         if(parentRank != myInputRank) {
00634             toMigrateList.add(i);
00635             initAtoms[i].isValid = false;
00636             int tmp = parentRank - myInputRank;
00637             tmp = tmp>0 ? tmp : -tmp;
00638             if(tmp > maxOffset) maxOffset = tmp;
00639         }
00640     }
00641 
00642     //2. prepare atom migration messages
00643     //the messages are indexed as [-maxOffset,..., -1,0,1,..., maxOffset]
00644     //where the "0" is not used at all. It is added for the sake of
00645     //computing the index easily.
00646     InputAtomList *migLists = new InputAtomList[2*maxOffset+1];
00647     for(int i=0; i<toMigrateList.size(); i++) {
00648         int idx = toMigrateList[i];
00649         int parentRank = atomInitRankOnInput(initAtoms[idx].MPID);
00650         //decide which migList to put this atom
00651         int offset = parentRank - myInputRank + maxOffset;
00652         migLists[offset].add(initAtoms[idx]);
00653     }
00654 
00655     CProxy_ParallelIOMgr pIO(thisgroup);
00656     for(int i=0; i<2*maxOffset+1; i++) {
00657         int migLen = migLists[i].size();
00658         if(migLen>0) {
00659             MoveInputAtomsMsg *msg = new (migLen, 0)MoveInputAtomsMsg;
00660             msg->length = migLen;
00661             memcpy(msg->atomList, migLists[i].begin(), sizeof(InputAtom)*migLen);
00662             int destRank = i-maxOffset+myInputRank;
00663             pIO[inputProcArray[destRank]].recvAtomsMGrp(msg);
00664             migLists[i].clear();
00665         }
00666     }
00667     
00668     toMigrateList.clear();
00669     delete [] migLists;
00670 }
00671 
00672 void ParallelIOMgr::recvAtomsMGrp(MoveInputAtomsMsg *msg)
00673 {
00674     for(int i=0; i<msg->length; i++) {
00675         tmpRecvAtoms.add((msg->atomList)[i]);
00676     }
00677     delete msg;
00678 }
00679 
00680 void ParallelIOMgr::integrateMigratedAtoms()
00681 {
00682     if(myInputRank==-1) return;
00683 
00684     for(int i=0; i<tmpRecvAtoms.size(); i++) {
00685         tmpRecvAtoms[i].isValid = true;
00686         initAtoms.add(tmpRecvAtoms[i]);
00687     }
00688     tmpRecvAtoms.clear();
00689 
00690     //sort atom list based on hydrogenList value
00691     std::sort(initAtoms.begin(), initAtoms.end());
00692 
00693     //now compute the counters inside Molecule such as numFixedRigidBonds
00694     //which is based on the hydrogen group info
00695 
00696     int numFixedRigidBonds = 0;
00697     if(molecule->numRigidBonds){
00698         int parentIsFixed = 0;
00699         for(int i=0; i<initAtoms.size(); i++) {
00700             InputAtom *one = &(initAtoms[i]);
00701             if(!one->isValid) continue;
00702             if(one->isGP) {
00703                 parentIsFixed = one->atomFixed;
00704                 InputAtom *a1 = &(initAtoms[i+1]);
00705                 InputAtom *a2 = &(initAtoms[i+2]);
00706                 if((one->rigidBondLength>0.0) &&
00707                    a1->atomFixed && a2->atomFixed) {
00708                     numFixedRigidBonds++;
00709                 }
00710             }else{
00711                 if((one->rigidBondLength>0.0) &&
00712                    one->atomFixed && parentIsFixed) {
00713                     numFixedRigidBonds++;
00714                 }
00715             }
00716         }
00717     }
00718 
00719     int numFixedGroups = 0;
00720     if(molecule->numFixedAtoms){        
00721         for(int i=0; i<initAtoms.size();) {
00722             InputAtom *one = &(initAtoms[i]);
00723             if(!one->isValid){
00724                 i++;
00725                 continue;
00726             }
00727             if(one->isGP) {
00728                 int allFixed = 1;                
00729                 for(int j=0; j<one->hydrogenGroupSize; j++){
00730                     InputAtom *a1 = &(initAtoms[i+j]);
00731                     allFixed = allFixed & a1->atomFixed;
00732                     if(!allFixed) break;
00733                 }
00734                 if(allFixed) numFixedGroups++;                
00735                 i += one->hydrogenGroupSize;
00736             }
00737         }
00738     }
00739     
00740     CProxy_ParallelIOMgr pIO(thisgroup);
00741     HydroBasedMsg *msg = new HydroBasedMsg;
00742     msg->numFixedGroups = numFixedGroups;
00743     msg->numFixedRigidBonds = numFixedRigidBonds;
00744     pIO[0].recvHydroBasedCounter(msg);
00745 }
00746 
00747 void ParallelIOMgr::updateMolInfo()
00748 {
00749 #ifdef MEM_OPT_VERSION
00750     if(myInputRank==-1) return;
00751 
00752     CProxy_ParallelIOMgr pIO(thisgroup);
00753 
00754     MolInfoMsg *msg = new MolInfoMsg;
00755     msg->numBonds = msg->numCalcBonds = 0;
00756     msg->numAngles = msg->numCalcAngles = 0;
00757     msg->numDihedrals = msg->numCalcDihedrals = 0;
00758     msg->numImpropers = msg->numCalcImpropers = 0;
00759     msg->numCrossterms = msg->numCalcCrossterms = 0;
00760     msg->numExclusions = msg->numCalcExclusions = 0;
00761     int numFullExclusions = msg->numCalcFullExclusions = 0;
00762     // JLai
00763     msg->numLJPairs = msg->numCalcLJPairs = 0;
00764     // End of JLai
00765     msg->numRigidBonds = 0;
00766     msg->totalMass = 0.0;
00767     msg->totalCharge = 0.0;
00768 
00769     //calculate the tuples this input processor have
00770     AtomSignature *atomSigPool = molecule->atomSigPool;
00771     ExclusionSignature *exclSigPool = molecule->exclSigPool;
00772     for(int i=0; i<initAtoms.size(); i++) {
00773         AtomSignature *thisSig = &atomSigPool[initAtoms[i].sigId];
00774         msg->numBonds += thisSig->bondCnt;
00775         msg->numAngles += thisSig->angleCnt;
00776         msg->numDihedrals += thisSig->dihedralCnt;
00777         msg->numImpropers += thisSig->improperCnt;
00778         msg->numCrossterms += thisSig->crosstermCnt;
00779         // JLai
00780         msg->numLJPairs += thisSig->gromacsPairCnt;
00781         // End of JLai
00782 
00783         ExclusionSignature *exclSig = &exclSigPool[initAtoms[i].exclId];
00784         msg->numExclusions += (exclSig->fullExclCnt + exclSig->modExclCnt);
00785         numFullExclusions += exclSig->fullExclCnt;
00786 
00787         if(initAtoms[i].rigidBondLength > 0.0) msg->numRigidBonds++;
00788 
00789         msg->totalMass += initAtoms[i].mass;
00790         msg->totalCharge += initAtoms[i].charge;
00791     }
00792 
00793     //deal with numCalc* which is related with fixed atoms!
00794     if(molecule->numFixedAtoms>0 && ! simParameters->fixedAtomsForces) {
00795         //if there's fixed atoms, calcExclusions needs to be calculated
00796         //Since it's possible the atom inside the this exclusion set is on
00797         //another input processor, we have to resort to the global fixed atoms
00798         //info inside the Molecule object. The number of such accesses should
00799         //be very small! --Chao Mei
00800         int sAId = initAtoms[0].id;
00801         int remoteCnt=0; //stats info
00802         for(int i=0; i<initAtoms.size(); i++) {
00803             //When all the atoms in the set are fixed, the elem (Bond etc.)
00804             //is not counted as a calc*.
00805             int myAId = initAtoms[i].id;
00806             AtomSignature *thisSig = &atomSigPool[initAtoms[i].sigId];
00807             ExclusionSignature *exclSig = &exclSigPool[initAtoms[i].exclId];
00808             if(!initAtoms[i].atomFixed) {
00809                 msg->numCalcBonds += thisSig->bondCnt;                
00810                 msg->numCalcAngles += thisSig->angleCnt;
00811                 msg->numCalcDihedrals += thisSig->dihedralCnt;
00812                 msg->numCalcImpropers += thisSig->improperCnt;
00813                 msg->numCalcCrossterms += thisSig->crosstermCnt;
00814                 msg->numCalcExclusions+=(exclSig->fullExclCnt+exclSig->modExclCnt);
00815                 msg->numCalcFullExclusions+=(exclSig->fullExclCnt);
00816                 continue;
00817             }
00818                        
00819             //1. Bonds
00820             for(int j=0; j<thisSig->bondCnt; j++) {            
00821                 TupleSignature *bsig = &(thisSig->bondSigs[j]);
00822                 int a1 = myAId + bsig->offset[0];
00823                 if(!isAtomFixed(sAId, a1)) msg->numCalcBonds++;
00824             }
00825             
00826             //2. Angles
00827             for(int j=0; j<thisSig->angleCnt; j++) {            
00828                 TupleSignature *bsig = &(thisSig->angleSigs[j]);
00829                 int a1 = myAId + bsig->offset[0];
00830                 int a2 = myAId + bsig->offset[1];
00831                 if(!isAtomFixed(sAId, a1) || !isAtomFixed(sAId, a2)) 
00832                     msg->numCalcAngles++;
00833             }
00834 
00835             //3. Dihedrals
00836             for(int j=0; j<thisSig->dihedralCnt; j++) {            
00837                 TupleSignature *bsig = &(thisSig->dihedralSigs[j]);
00838                 int a1 = myAId + bsig->offset[0];
00839                 int a2 = myAId + bsig->offset[1];
00840                 int a3 = myAId + bsig->offset[2];
00841                 if(!isAtomFixed(sAId, a1) || 
00842                    !isAtomFixed(sAId, a2) ||
00843                    !isAtomFixed(sAId, a3)) 
00844                     msg->numCalcDihedrals++;
00845             }
00846 
00847             //4. Impropers
00848             for(int j=0; j<thisSig->improperCnt; j++) {            
00849                 TupleSignature *bsig = &(thisSig->improperSigs[j]);
00850                 int a1 = myAId + bsig->offset[0];
00851                 int a2 = myAId + bsig->offset[1];
00852                 int a3 = myAId + bsig->offset[2];
00853                 if(!isAtomFixed(sAId, a1) || 
00854                    !isAtomFixed(sAId, a2) ||
00855                    !isAtomFixed(sAId, a3)) 
00856                     msg->numCalcImpropers++;
00857             }
00858 
00859             //5. Crossterms
00860             for(int j=0; j<thisSig->crosstermCnt; j++) {            
00861                 TupleSignature *bsig = &(thisSig->crosstermSigs[j]);
00862                 int a1 = myAId + bsig->offset[0];
00863                 int a2 = myAId + bsig->offset[1];
00864                 int a3 = myAId + bsig->offset[2];
00865                 int a4 = myAId + bsig->offset[3];
00866                 int a5 = myAId + bsig->offset[4];
00867                 int a6 = myAId + bsig->offset[5];
00868                 int a7 = myAId + bsig->offset[6];
00869 
00870                 if(!isAtomFixed(sAId, a1) || 
00871                    !isAtomFixed(sAId, a2) ||
00872                    !isAtomFixed(sAId, a3) ||
00873                    !isAtomFixed(sAId, a4) ||
00874                    !isAtomFixed(sAId, a5) ||
00875                    !isAtomFixed(sAId, a6) ||
00876                    !isAtomFixed(sAId, a7)) 
00877                     msg->numCalcDihedrals++;
00878             }
00879             
00880             //6: Exclusions            
00881             //this atom is fixed, check atoms in the exclusion set
00882             for(int j=0; j<exclSig->fullExclCnt; j++) {
00883                 int thisAId = exclSig->fullOffset[j]+myAId;
00884                 if(!isAtomFixed(sAId, thisAId)) { msg->numCalcExclusions++; msg->numCalcFullExclusions++; }
00885             }
00886             for(int j=0; j<exclSig->modExclCnt; j++) {
00887                 int thisAId = exclSig->modOffset[j]+myAId;
00888                 if(!isAtomFixed(sAId, thisAId)) msg->numCalcExclusions++;
00889             }
00890 
00891             //7: GromacsPair
00892             for(int j=0; j<thisSig->gromacsPairCnt; j++) {
00893                 TupleSignature *bsig = &(thisSig->gromacsPairSigs[j]);
00894                 int a1 = myAId + bsig->offset[0];
00895                 int a2 = myAId + bsig->offset[1];
00896                 if(!isAtomFixed(sAId, a1) || 
00897                    !isAtomFixed(sAId, a2))
00898                     msg->numCalcLJPairs++;
00899             }
00900         }
00901 #if COLLECT_PERFORMANCE_DATA
00902         printf("Num fixedAtom lookup on proc %d is %d\n", CkMyPe(), numFixedAtomLookup);
00903 #endif
00904     } else {
00905         //no fixed atoms, numCalc* is same with numExclusions
00906         msg->numCalcBonds = msg->numBonds;
00907         msg->numCalcAngles = msg->numAngles;
00908         msg->numCalcDihedrals = msg->numDihedrals;
00909         msg->numCalcImpropers = msg->numImpropers;
00910         msg->numCalcCrossterms = msg->numCrossterms;
00911         msg->numCalcExclusions = msg->numExclusions;
00912         msg->numCalcFullExclusions = numFullExclusions;
00913     }
00914 
00915 
00916     if(!simParameters->comMove) {
00917         //to remove the center of mass motion from a molecule.
00918         //first calculate the values on every input proc, then reduce.
00919         //For more info, refer to WorkDistrib::remove_com_motion
00920         //-Chao Mei
00921         (msg->totalMV).x = 0.0;
00922         (msg->totalMV).y = 0.0;
00923         (msg->totalMV).z = 0.0;
00924         for (int i=0; i<initAtoms.size(); i++) {            
00925             msg->totalMV += initAtoms[i].mass * initAtoms[i].velocity;
00926         }
00927     }
00928 
00929     //always send to the master processor (proc 0)
00930     pIO[0].recvMolInfo(msg);
00931 #endif
00932 }
00933 
00934 //only executed on proc 0
00935 void ParallelIOMgr::recvMolInfo(MolInfoMsg *msg)
00936 {
00937     molecule->numBonds += msg->numBonds;
00938     molecule->numCalcBonds += msg->numCalcBonds;
00939     molecule->numAngles += msg->numAngles;
00940     molecule->numCalcAngles += msg->numCalcAngles;
00941     molecule->numDihedrals += msg->numDihedrals;
00942     molecule->numCalcDihedrals += msg->numCalcDihedrals;
00943     molecule->numImpropers += msg->numImpropers;
00944     molecule->numCalcImpropers += msg->numCalcImpropers;
00945     molecule->numCrossterms += msg->numCrossterms;
00946     molecule->numCalcCrossterms += msg->numCalcCrossterms;
00947     numTotalExclusions += msg->numExclusions;
00948     numCalcExclusions += msg->numCalcExclusions;
00949     numCalcFullExclusions += msg->numCalcFullExclusions;
00950     molecule->numRigidBonds += msg->numRigidBonds;
00951 
00952     totalMass += msg->totalMass;
00953     totalCharge += msg->totalCharge;
00954 
00955     if(!simParameters->comMove) {
00956         totalMV += msg->totalMV;        
00957     }
00958 
00959     if(++procsReceived == numInputProcs) {
00960         //received all the counters
00961         msg->numBonds = molecule->numBonds;
00962         msg->numCalcBonds = molecule->numCalcBonds;
00963         msg->numAngles = molecule->numAngles;
00964         msg->numCalcAngles = molecule->numCalcAngles;
00965         msg->numDihedrals = molecule->numDihedrals;
00966         msg->numCalcDihedrals = molecule->numCalcDihedrals;
00967         msg->numImpropers = molecule->numImpropers;
00968         msg->numCalcImpropers = molecule->numCalcImpropers;
00969         msg->numCrossterms = molecule->numCrossterms;
00970         msg->numCalcCrossterms = molecule->numCalcCrossterms;
00971         msg->numExclusions = numTotalExclusions/2;
00972         msg->numCalcExclusions = numCalcExclusions/2;
00973         msg->numCalcFullExclusions = numCalcFullExclusions/2;
00974         msg->numRigidBonds = molecule->numRigidBonds;
00975 
00976         msg->totalMass = totalMass;
00977         msg->totalCharge = totalCharge;
00978 
00979         if(!simParameters->comMove) {
00980             msg->totalMV = totalMV;            
00981         }
00982 
00983         CProxy_ParallelIOMgr pIO(thisgroup);
00984         pIO.bcastMolInfo(msg);
00985 
00986         //reset to 0 for the next p2p-based reduction on input procs
00987         procsReceived = 0;
00988     } else delete msg;
00989 }
00990 
00991 void ParallelIOMgr::bcastMolInfo(MolInfoMsg *msg)
00992 {
00993 #ifdef MEM_OPT_VERSION
00994     if(myInputRank!=-1) {
00995         if(!simParameters->comMove) {
00996             //needs to remove the center of mass motion from a molecule
00997             Vector val = msg->totalMV / msg->totalMass;
00998             for (int i=0; i<initAtoms.size(); i++) initAtoms[i].velocity -= val;
00999         }
01000     }
01001 
01002     //only the rank 0 in the SMP node update the Molecule object
01003     if(CmiMyRank()) {
01004         delete msg;
01005         return;
01006     }
01007 
01008     molecule->numBonds = msg->numBonds;
01009     molecule->numCalcBonds = msg->numCalcBonds;
01010     molecule->numAngles = msg->numAngles;
01011     molecule->numCalcAngles = msg->numCalcAngles;
01012     molecule->numDihedrals = msg->numDihedrals;
01013     molecule->numCalcDihedrals = msg->numCalcDihedrals;
01014     molecule->numImpropers = msg->numImpropers;
01015     molecule->numCalcImpropers = msg->numCalcImpropers;
01016     molecule->numCrossterms = msg->numCrossterms;
01017     molecule->numCalcCrossterms = msg->numCalcCrossterms;
01018 
01019     molecule->numTotalExclusions = msg->numExclusions;
01020     molecule->numCalcExclusions = msg->numCalcExclusions;
01021     molecule->numCalcFullExclusions = msg->numCalcFullExclusions;
01022 
01023     molecule->numRigidBonds = msg->numRigidBonds;
01024     delete msg;
01025 
01026     if(!CkMyPe()) {
01027         iout << iINFO << "LOADED " << molecule->numTotalExclusions << " TOTAL EXCLUSIONS\n" << endi;
01028         if(!simParameters->comMove) {
01029             iout << iINFO << "REMOVING COM VELOCITY "
01030                  << (PDBVELFACTOR * (msg->totalMV / msg->totalMass))<< "\n" <<endi;
01031         }
01032     }
01033 #endif
01034 }
01035 
01036 //only called on PE0
01037 void ParallelIOMgr::recvHydroBasedCounter(HydroBasedMsg *msg){
01038     molecule->numFixedRigidBonds += msg->numFixedRigidBonds;
01039     molecule->numFixedGroups += msg->numFixedGroups;
01040 
01041     if(++hydroMsgRecved == numInputProcs){
01042         msg->numFixedRigidBonds = molecule->numFixedRigidBonds;
01043         msg->numFixedGroups = molecule->numFixedGroups;
01044         CProxy_ParallelIOMgr pIO(thisgroup);
01045         pIO.bcastHydroBasedCounter(msg);
01046         hydroMsgRecved = 0;
01047     }else delete msg;
01048 }
01049 
01050 void ParallelIOMgr::bcastHydroBasedCounter(HydroBasedMsg *msg){
01051 #ifdef MEM_OPT_VERSION
01052     //only the rank 0 in the SMP node update the Molecule object
01053     if(CmiMyRank()) {
01054         delete msg;
01055         return;
01056     }
01057     molecule->numFixedRigidBonds = msg->numFixedRigidBonds;
01058     molecule->numFixedGroups = msg->numFixedGroups;
01059     delete msg;
01060 
01061     if(!CkMyPe()) {
01062         iout << iINFO << "****************************\n";
01063         iout << iINFO << "STRUCTURE SUMMARY:\n";
01064         iout << iINFO << molecule->numAtoms << " ATOMS\n";
01065         iout << iINFO << molecule->numBonds << " BONDS\n";
01066         iout << iINFO << molecule->numAngles << " ANGLES\n";
01067         iout << iINFO << molecule->numDihedrals << " DIHEDRALS\n";
01068         iout << iINFO << molecule->numImpropers << " IMPROPERS\n";
01069         iout << iINFO << molecule->numCrossterms << " CROSSTERMS\n";
01070         iout << iINFO << molecule->numExclusions << " EXCLUSIONS\n";
01071 
01072             //****** BEGIN CHARMM/XPLOR type changes
01073         if ((molecule->numMultipleDihedrals) && (simParameters->paraTypeXplorOn)){
01074             iout << iINFO << molecule->numMultipleDihedrals 
01075              << " DIHEDRALS WITH MULTIPLE PERIODICITY (BASED ON PSF FILE)\n";
01076         }
01077         if ((molecule->numMultipleDihedrals) && (simParameters->paraTypeCharmmOn)){
01078             iout << iINFO << molecule->numMultipleDihedrals 
01079          << " DIHEDRALS WITH MULTIPLE PERIODICITY IGNORED (BASED ON PSF FILE) \n";
01080             iout << iINFO  
01081          << " CHARMM MULTIPLICITIES BASED ON PARAMETER FILE INFO! \n";
01082         }
01083             //****** END CHARMM/XPLOR type changes
01084 
01085         if (molecule->numMultipleImpropers){
01086             iout << iINFO << molecule->numMultipleImpropers 
01087                  << " IMPROPERS WITH MULTIPLE PERIODICITY\n";
01088         }
01089 
01090         if (simParameters->fixedAtomsOn)
01091            iout << iINFO << molecule->numFixedAtoms << " FIXED ATOMS\n";
01092         
01093 
01094         if (simParameters->rigidBonds)        
01095            iout << iINFO << molecule->numRigidBonds << " RIGID BONDS\n";        
01096 
01097         if (simParameters->fixedAtomsOn && simParameters->rigidBonds)        
01098            iout << iINFO << molecule->numFixedRigidBonds <<
01099                 " RIGID BONDS BETWEEN FIXED ATOMS\n";
01100 
01101         iout << iINFO << molecule->num_deg_freedom(1)
01102              << " DEGREES OF FREEDOM\n";
01103 
01104         iout << iINFO << molecule->numHydrogenGroups << " HYDROGEN GROUPS\n";
01105         iout << iINFO << molecule->maxHydrogenGroupSize
01106             << " ATOMS IN LARGEST HYDROGEN GROUP\n";
01107         iout << iINFO << molecule->numMigrationGroups << " MIGRATION GROUPS\n";
01108         iout << iINFO << molecule->maxMigrationGroupSize
01109             << " ATOMS IN LARGEST MIGRATION GROUP\n";
01110         if (simParameters->fixedAtomsOn)
01111         {
01112            iout << iINFO << molecule->numFixedGroups <<
01113                 " HYDROGEN GROUPS WITH ALL ATOMS FIXED\n";
01114         }
01115         
01116         iout << iINFO << "TOTAL MASS = " << totalMass << " amu\n"; 
01117         iout << iINFO << "TOTAL CHARGE = " << totalCharge << " e\n"; 
01118 
01119         BigReal volume = simParameters->lattice.volume();
01120         if ( volume ) {
01121             iout << iINFO << "MASS DENSITY = "
01122                 << ((totalMass/volume) / 0.6022) << " g/cm^3\n";
01123             iout << iINFO << "ATOM DENSITY = "
01124                 << (molecule->numAtoms/volume) << " atoms/A^3\n";
01125         }
01126     
01127         iout << iINFO << "*****************************\n";
01128         iout << endi;
01129         fflush(stdout);               
01130     }
01131 #endif
01132 }
01133 
01134 void ParallelIOMgr::calcAtomsInEachPatch()
01135 {
01136     if(myInputRank==-1) return;
01137 
01138     PatchMap *patchMap = PatchMap::Object();
01139     int numPatches = patchMap->numPatches();
01140 
01141     patchMap->initTmpPatchAtomsList();
01142 
01143     //each list contains the atom index to the initAtoms
01144     vector<int> *eachPatchAtomList = patchMap->getTmpPatchAtomsList();
01145 
01146     CProxy_PatchMgr pm(CkpvAccess(BOCclass_group).patchMgr);
01147     PatchMgr *patchMgr = pm.ckLocalBranch();
01148 
01149     int pid=0;
01150     const Lattice lattice = simParameters->lattice;
01151     for(int i=0; i<initAtoms.size(); i++) {
01152         InputAtom *atom = &(initAtoms[i]);
01153         if(!atom->isValid) continue;
01154         if(atom->isMP) {
01155             pid = patchMap->assignToPatch(atom->position, lattice);
01156         }
01157         eachPatchAtomList[pid].push_back(i);
01158     }
01159 
01160     CProxy_ParallelIOMgr pIO(thisgroup);
01161 
01162     int patchCnt = 0;
01163     for(int i=0; i<numPatches; i++) {
01164         int cursize = eachPatchAtomList[i].size();
01165         if(cursize>0) patchCnt++;
01166     }
01167 
01168     AtomsCntPerPatchMsg *msg = NULL;
01169     if(simParameters->fixedAtomsOn) {
01170         msg = new (patchCnt, patchCnt, patchCnt, 0)AtomsCntPerPatchMsg;
01171     } else {
01172         msg = new (patchCnt, patchCnt, 0, 0)AtomsCntPerPatchMsg;
01173     }
01174 
01175     msg->length = patchCnt;
01176     patchCnt = 0;
01177     for(int i=0; i<numPatches; i++) {
01178         int cursize = eachPatchAtomList[i].size();
01179         if(cursize>0) {
01180             if ( cursize > USHRT_MAX ) {
01181               char errstr[512];
01182               sprintf(errstr, "Patch %d exceeds %d atoms.", i, USHRT_MAX);
01183               NAMD_die(errstr);
01184             }
01185             msg->pidList[patchCnt] = i;
01186             msg->atomsCntList[patchCnt] = cursize;
01187             patchCnt++;
01188         }
01189     }
01190 
01191     if(simParameters->fixedAtomsOn) {
01192         patchCnt = 0;
01193         for(int i=0; i<numPatches; i++) {
01194             int cursize = eachPatchAtomList[i].size();
01195             if(cursize>0) {
01196                 int fixedCnt = 0;
01197                 for(int j=0; j<cursize; j++) {
01198                     int aid = eachPatchAtomList[i][j];
01199                     //atomFixed is either 0 or 1
01200                     fixedCnt += initAtoms[aid].atomFixed;
01201                 }
01202                 msg->fixedAtomsCntList[patchCnt] = fixedCnt;
01203                 patchCnt++;
01204             }
01205         }
01206     }
01207 
01208     pIO[0].recvAtomsCntPerPatch(msg);
01209 
01210 }
01211 
01212 void ParallelIOMgr::recvAtomsCntPerPatch(AtomsCntPerPatchMsg *msg)
01213 {
01214 #ifdef MEM_OPT_VERSION
01215     PatchMap *patchMap = PatchMap::Object();
01216     for(int i=0; i<msg->length; i++) {
01217         int pid = msg->pidList[i];
01218         int oldNum = patchMap->numAtoms(pid);
01219         if ( oldNum + msg->atomsCntList[i] > USHRT_MAX ) {
01220           char errstr[512];
01221           sprintf(errstr, "Patch %d exceeds %d atoms.", pid, USHRT_MAX);
01222           NAMD_die(errstr);
01223         }
01224         patchMap->setNumAtoms(pid, oldNum+msg->atomsCntList[i]);
01225         if(simParameters->fixedAtomsOn) {
01226             oldNum = patchMap->numFixedAtoms(pid);
01227             patchMap->setNumFixedAtoms(pid, oldNum+msg->fixedAtomsCntList[i]);
01228         }
01229     }
01230     delete msg;
01231 
01232     if(++procsReceived == numInputProcs) {
01233         //print max PATCH info
01234         int maxAtoms = -1;
01235         int maxPatch = -1;
01236         int totalAtoms = 0;
01237         for(int i=0; i<patchMap->numPatches(); i++) {
01238             int cnt = patchMap->numAtoms(i);
01239             totalAtoms += cnt;
01240             if(cnt>maxAtoms) {
01241                 maxAtoms = cnt;
01242                 maxPatch = i;
01243             }
01244         }
01245         procsReceived = 0;
01246         iout << iINFO << "LARGEST PATCH (" << maxPatch <<
01247              ") HAS " << maxAtoms << " ATOMS\n" << endi;
01248         if ( totalAtoms !=  Node::Object()->molecule->numAtoms ) {
01249           char errstr[512];
01250           sprintf(errstr, "Incorrect atom count in void ParallelIOMgr::recvAtomsCntPerPatch: %d vs %d", totalAtoms, Node::Object()->molecule->numAtoms);
01251           NAMD_die(errstr);
01252         }
01253     }
01254 #endif
01255 }
01256 
01257 void call_sendAtomsToHomePatchProcs(void *arg)
01258 {
01259   ((ParallelIOMgr*)arg)->sendAtomsToHomePatchProcs();
01260 }
01261 
01262 void ParallelIOMgr::sendAtomsToHomePatchProcs()
01263 {
01264 #ifdef MEM_OPT_VERSION
01265     if(myInputRank==-1) return;
01266 
01267     if ( sendAtomsThread == 0 ) {
01268       sendAtomsThread = CthCreate((CthVoidFn)call_sendAtomsToHomePatchProcs,this,0);
01269       CthAwaken(sendAtomsThread);
01270       return;
01271     }
01272     sendAtomsThread = 0;
01273     numAcksOutstanding = 0;
01274 
01275     PatchMap *patchMap = PatchMap::Object();
01276     int numPatches = patchMap->numPatches();
01277     vector<int> *eachPatchAtomList = patchMap->getTmpPatchAtomsList();
01278 
01279     //each element (proc) contains the list of ids of patches which will stay
01280     //on that processor
01281     ResizeArray<int> *procList = new ResizeArray<int>[CkNumPes()];
01282     ResizeArray<int> pesToSend;
01283     for(int i=0; i<numPatches; i++) {
01284         if(eachPatchAtomList[i].size()==0) continue;
01285         int onPE = patchMap->node(i);
01286         if ( procList[onPE].size() == 0 ) pesToSend.add(onPE);
01287         procList[onPE].add(i);
01288     }
01289 
01290     Random(CkMyPe()).reorder(pesToSend.begin(),pesToSend.size());
01291     //CkPrintf("Pe %d ParallelIOMgr::sendAtomsToHomePatchProcs sending to %d pes\n",CkMyPe(),pesToSend.size());
01292 
01293     //go over every processor to send a message if necessary
01294     //TODO: Optimization for local home patches to save temp memory usage??? -CHAOMEI
01295     CProxy_ParallelIOMgr pIO(thisgroup);
01296     for(int k=0; k<pesToSend.size(); k++) {
01297         const int i = pesToSend[k];
01298         int len = procList[i].size();
01299         if(len==0) continue;
01300 
01301         if ( numAcksOutstanding >= 10 ) {
01302           //CkPrintf("Pe %d ParallelIOMgr::sendAtomsToHomePatchProcs suspending at %d of %d pes\n",CkMyPe(),k,pesToSend.size());
01303           //fflush(stdout);
01304           sendAtomsThread = CthSelf();
01305           CthSuspend();
01306         }
01307         ++numAcksOutstanding;
01308 
01309         //prepare a message to send
01310         int patchCnt = len;
01311         int totalAtomCnt = 0;
01312         for(int j=0; j<len; j++) {
01313             int pid = procList[i][j];
01314             int atomCnt = eachPatchAtomList[pid].size();
01315             totalAtomCnt += atomCnt;
01316         }
01317 
01318         MovePatchAtomsMsg *msg = new (patchCnt, patchCnt, totalAtomCnt, 0)MovePatchAtomsMsg;
01319         msg->from = CkMyPe();
01320         msg->patchCnt = patchCnt;
01321         int atomIdx = 0;
01322         for(int j=0; j<len; j++) {
01323             int pid = procList[i][j];
01324             int atomCnt = eachPatchAtomList[pid].size();
01325             msg->pidList[j] = pid;
01326             msg->sizeList[j] = atomCnt;
01327             for(int k=0; k<atomCnt; k++, atomIdx++) {
01328                 int aid = eachPatchAtomList[pid][k];
01329                 FullAtom one = initAtoms[aid];
01330                 //HACK to re-sort the atom list after receiving the atom list on
01331                 //home patch processor -Chao Mei
01332                 one.hydVal = initAtoms[aid].hydList;
01333                 msg->allAtoms[atomIdx] = one;
01334             }
01335         }
01336         pIO[i].recvAtomsToHomePatchProcs(msg);
01337 
01338         procList[i].clear();
01339     }
01340 
01341     //clean up to free space
01342     delete [] procList;
01343     patchMap->delTmpPatchAtomsList();
01344 
01345     //free the space occupied by the list that contains the input atoms
01346     initAtoms.clear();
01347 #endif
01348 }
01349 
01350 void ParallelIOMgr::ackAtomsToHomePatchProcs()
01351 {
01352   --numAcksOutstanding;
01353   if ( sendAtomsThread ) {
01354     CthAwaken(sendAtomsThread);
01355     sendAtomsThread = 0;
01356   }
01357 }
01358 
01359 void ParallelIOMgr::recvAtomsToHomePatchProcs(MovePatchAtomsMsg *msg)
01360 {
01361     CProxy_ParallelIOMgr pIO(thisgroup);
01362     pIO[msg->from].ackAtomsToHomePatchProcs();
01363 
01364     if(!isOKToRecvHPAtoms) {
01365         prepareHomePatchAtomList();
01366         isOKToRecvHPAtoms = true;
01367     }
01368 
01369     int numRecvPatches = msg->patchCnt;
01370     int aid = 0;
01371     for(int i=0; i<numRecvPatches; i++) {
01372         int pid = msg->pidList[i];
01373         int size = msg->sizeList[i];
01374         int idx = binaryFindHPID(pid);
01375         for(int j=0; j<size; j++, aid++) {
01376             hpAtomsList[idx].add(msg->allAtoms[aid]);
01377         }
01378     }
01379     //CkPrintf("Pe %d recvAtomsToHomePatchProcs for %d patches %d atoms\n",CkMyPe(),numRecvPatches,aid);
01380     delete msg;
01381 }
01382 
01383 void ParallelIOMgr::prepareHomePatchAtomList()
01384 {
01385     PatchMap *patchMap = PatchMap::Object();
01386     for(int i=0; i<patchMap->numPatches(); i++) {
01387         if(patchMap->node(i)==CkMyPe()) {
01388             hpIDList.add(i);
01389         }
01390     }
01391     if(hpIDList.size()>0)
01392         hpAtomsList = new FullAtomList[hpIDList.size()];
01393 }
01394 
01395 int ParallelIOMgr::binaryFindHPID(int pid)
01396 {
01397     //hpIDList should be in increasing order!!
01398     int lIdx, rIdx;
01399     int retIdx = -1;
01400 
01401     rIdx=0;
01402     lIdx=hpIDList.size()-1;
01403 
01404     while(rIdx<=lIdx ) {
01405         int idx = (rIdx+lIdx)/2;
01406         int curPid = hpIDList[idx];
01407         if(pid>curPid) {
01408             //in the left
01409             rIdx = idx+1;
01410         } else if(pid<curPid) {
01411             //in the right
01412             lIdx = idx-1;
01413         } else {
01414             //found!
01415             retIdx = idx;
01416             break;
01417         }
01418     }
01419     CmiAssert(retIdx!=-1);
01420     return retIdx;
01421 }
01422 
01423 void ParallelIOMgr::createHomePatches()
01424 {
01425 #ifdef MEM_OPT_VERSION
01426 
01427     int assignedPids = PatchMap::Object()->numPatchesOnNode(CkMyPe());
01428     int numPids = hpIDList.size();
01429     if(numPids==0){
01430         //this node actually contains no homepatches
01431         if(assignedPids == 0) return; 
01432 
01433         //Entering the rare condition that all the homepatches this node has
01434         //are empty so that "recvAtomsToHomePatchProcs" is never called!
01435         //But we still need to create those empty homepatches!
01436         CmiAssert(isOKToRecvHPAtoms == false);        
01437         PatchMap *patchMap = PatchMap::Object();
01438         CProxy_PatchMgr pm(CkpvAccess(BOCclass_group).patchMgr);
01439         PatchMgr *patchMgr = pm.ckLocalBranch();
01440         for(int i=0; i<patchMap->numPatches(); i++) {
01441             if(patchMap->node(i)==CkMyPe()) {
01442                 FullAtomList emptyone;
01443                 patchMgr->createHomePatch(i, emptyone);
01444             }
01445         }        
01446         return;
01447     }
01448 
01449     CProxy_PatchMgr pm(CkpvAccess(BOCclass_group).patchMgr);
01450     PatchMgr *patchMgr = pm.ckLocalBranch();
01451 
01452     //go through the home patch list
01453     for(int i=0; i<numPids; i++) {
01454         int pid = hpIDList[i];
01455 
01456         //re-sort the atom list of this patch
01457         std::sort(hpAtomsList[i].begin(), hpAtomsList[i].end());
01458         Node::Object()->workDistrib->fillAtomListForOnePatch(pid, hpAtomsList[i]);
01459         patchMgr->createHomePatch(pid, hpAtomsList[i]);
01460     }
01461 
01462     hpIDList.clear();   
01463     delete [] hpAtomsList;
01464 
01465     hpAtomsList = NULL;
01466 #endif
01467 }
01468 
01469 void ParallelIOMgr::freeMolSpace()
01470 {
01471 #ifdef MEM_OPT_VERSION
01472     molecule->delAtomNames();
01473     molecule->delChargeSpace();
01474 
01475     //???TODO NOT SURE WHETHER freeEnergyOn is support in MEM_OPT_VERSION
01476     //-CHAOMEI
01477     if(!CkMyPe() && !simParameters->freeEnergyOn)
01478         molecule->delMassSpace();
01479 
01480     molecule->delFixedAtoms();
01481 #endif
01482 }
01483 
01484 //The initial distribution of atoms:
01485 //(avgNum+1),...,(avgNum+1),avgNum,...,avgNum where
01486 //avgNum equals to the total number of atoms in the system
01487 //divided by number of input processors --Chao Mei
01488 int ParallelIOMgr::numMyAtoms(int rank, int numProcs)
01489 {
01490     if(rank==-1) return -1;
01491     int avgNum = molecule->numAtoms/numProcs;
01492     int remainder = molecule->numAtoms%numProcs;
01493     if(rank<remainder) return avgNum+1;
01494     else return avgNum;
01495 }
01496 
01497 int ParallelIOMgr::atomRank(int atomID, int numProcs)
01498 {
01499     int avgNum = molecule->numAtoms/numProcs;
01500     int remainder = molecule->numAtoms%numProcs;
01501     int midLimit = remainder*(avgNum+1);
01502     int idx;
01503     if(atomID<midLimit) {
01504         idx = atomID/(avgNum+1);
01505     } else {
01506         idx = remainder+(atomID-midLimit)/avgNum;
01507     }
01508     return idx;
01509 }
01510 
01511 void ParallelIOMgr::getMyAtomsRange(int &lowerIdx, int &upperIdx, int rank, int numProcs)
01512 {
01513     if(rank==-1) {
01514         //just to make sure upperIdx-lowerIdx+1 == -1
01515         lowerIdx=-1;
01516         upperIdx=-3;
01517         return;
01518     }
01519     int avgNum = molecule->numAtoms/numProcs;
01520     int remainder = molecule->numAtoms%numProcs;
01521     if(rank<remainder) {
01522         lowerIdx = rank*(avgNum+1);
01523         upperIdx = lowerIdx+avgNum;
01524     } else {
01525         int midLimit = remainder*(avgNum+1);
01526         lowerIdx = midLimit+(rank-remainder)*avgNum;
01527         upperIdx = lowerIdx+avgNum-1;
01528     }
01529 }
01530 
01531 void ParallelIOMgr::receivePositions(CollectVectorVarMsg *msg)
01532 {
01533 #ifdef MEM_OPT_VERSION
01534     int ready = midCM->receivePositions(msg);
01535     if(ready) {
01536         CProxy_CollectionMaster cm(mainMaster);
01537         cm.receiveOutputPosReady(msg->seq);
01538     }
01539     delete msg;
01540 #endif
01541 }
01542 
01543 void ParallelIOMgr::receiveVelocities(CollectVectorVarMsg *msg)
01544 {
01545 #ifdef MEM_OPT_VERSION
01546     int ready = midCM->receiveVelocities(msg);
01547     if(ready) {
01548         CProxy_CollectionMaster cm(mainMaster);
01549         cm.receiveOutputVelReady(msg->seq);        
01550     }
01551     delete msg;
01552 #endif
01553 }
01554 
01555 void ParallelIOMgr::receiveForces(CollectVectorVarMsg *msg)
01556 {
01557 #ifdef MEM_OPT_VERSION
01558     int ready = midCM->receiveForces(msg);
01559     if(ready) {
01560         CProxy_CollectionMaster cm(mainMaster);
01561         cm.receiveOutputForceReady(msg->seq);        
01562     }
01563     delete msg;
01564 #endif
01565 }
01566 
01567 
01568 void ParallelIOMgr::disposePositions(int seq, double prevT)
01569 {
01570 #ifdef MEM_OPT_VERSION
01571         double iotime = CmiWallTimer();
01572     midCM->disposePositions(seq);
01573         iotime = CmiWallTimer()-iotime+prevT;
01574 
01575 #if OUTPUT_SINGLE_FILE    
01576         //Token-based file output
01577     if(myOutputRank == getMyOutputGroupHighestRank()) {
01578         //notify the CollectionMaster to start the next round
01579         CProxy_CollectionMaster cm(mainMaster);
01580         cm.startNextRoundOutputPos(iotime);
01581     } else {
01582         CProxy_ParallelIOMgr io(thisgroup);
01583         io[outputProcArray[myOutputRank+1]].disposePositions(seq, iotime);
01584     }
01585 #else
01586         //notify the CollectionMaster to start the next round
01587         CProxy_CollectionMaster cm(mainMaster);
01588         cm.startNextRoundOutputPos(iotime);
01589 #endif
01590 
01591 #endif
01592 }
01593 
01594 void ParallelIOMgr::disposeVelocities(int seq, double prevT)
01595 {
01596 #ifdef MEM_OPT_VERSION
01597         double iotime = CmiWallTimer();
01598     midCM->disposeVelocities(seq);
01599         iotime = CmiWallTimer()-iotime+prevT;
01600     
01601 #if OUTPUT_SINGLE_FILE
01602         //Token-based file output
01603     if(myOutputRank==getMyOutputGroupHighestRank()) {
01604         //notify the CollectionMaster to start the next round
01605         CProxy_CollectionMaster cm(mainMaster);
01606         cm.startNextRoundOutputVel(iotime);
01607     } else {
01608         CProxy_ParallelIOMgr io(thisgroup);
01609         io[outputProcArray[myOutputRank+1]].disposeVelocities(seq, iotime);
01610     }
01611 #else
01612         //notify the CollectionMaster to start the next round
01613         CProxy_CollectionMaster cm(mainMaster);
01614         cm.startNextRoundOutputVel(iotime);     
01615 #endif
01616 
01617 #endif
01618 }
01619 
01620 void ParallelIOMgr::disposeForces(int seq, double prevT)
01621 {
01622 #ifdef MEM_OPT_VERSION
01623         double iotime = CmiWallTimer();
01624     midCM->disposeForces(seq);
01625         iotime = CmiWallTimer()-iotime+prevT;
01626     
01627 #if OUTPUT_SINGLE_FILE
01628         //Token-based file output
01629     if(myOutputRank==getMyOutputGroupHighestRank()) {
01630         //notify the CollectionMaster to start the next round
01631         CProxy_CollectionMaster cm(mainMaster);
01632         cm.startNextRoundOutputForce(iotime);
01633     } else {
01634         CProxy_ParallelIOMgr io(thisgroup);
01635         io[outputProcArray[myOutputRank+1]].disposeForces(seq, iotime);
01636     }
01637 #else
01638         //notify the CollectionMaster to start the next round
01639         CProxy_CollectionMaster cm(mainMaster);
01640         cm.startNextRoundOutputForce(iotime);   
01641 #endif
01642 
01643 #endif
01644 }
01645 
01646 
01647 void ParallelIOMgr::wrapCoor(int seq, Lattice lat)
01648 {
01649 #ifdef MEM_OPT_VERSION
01650     coorInstance = midCM->getReadyPositions(seq);
01651 
01652     coorInstance->lattice = lat; //record the lattice to use for wrapAll/Water!
01653     int fromAtomID = coorInstance->fromAtomID;
01654     int toAtomID = coorInstance->toAtomID;
01655 
01656     //only reference copies
01657     ResizeArray<Vector> &data = coorInstance->data;
01658     ResizeArray<FloatVector> &fdata = coorInstance->fdata;
01659     //if both data and fdata are not empty, they contain exact values, the only
01660     //difference lies in their precisions. Therefore, we only need to compute 
01661     //the higher precision coordinate array. -Chao Mei
01662     int dsize = data.size();    
01663     int numMyAtoms = toAtomID-fromAtomID+1;
01664     tmpCoorCon = new Vector[numMyAtoms];    
01665     ClusterCoorElem one;
01666     //1. compute wrapped coordinates locally 
01667     for(int i=0; i<numMyAtoms; i++){
01668         tmpCoorCon[i] = 0.0;
01669         int cid = clusterID[i];
01670         if(cid<fromAtomID){
01671             //on output procs ahead of me
01672             one.clusterId = cid;
01673             ClusterCoorElem *ret = remoteCoors.find(one);
01674             if(ret==NULL){
01675                 if(dsize==0) 
01676                     one.dsum = fdata[i];
01677                 else 
01678                     one.dsum = data[i];
01679                                 
01680                 remoteCoors.add(one);                 
01681             }else{
01682                 if(dsize==0) 
01683                     ret->dsum += fdata[i];
01684                 else 
01685                     ret->dsum += data[i];               
01686             }
01687         }else{
01688             if(dsize==0) 
01689                 tmpCoorCon[cid-fromAtomID] += fdata[i];
01690             else 
01691                 tmpCoorCon[cid-fromAtomID] += data[i];
01692         }
01693     }
01694 
01695     //2. Prepare to send msgs to remote output procs to reduce coordinates 
01696     //values of a cluster
01697     CmiAssert(numRemoteClusters == remoteCoors.size());
01698     numCSMAck = 0; //set to 0 to prepare recving the final coor update
01699     CProxy_ParallelIOMgr pIO(thisgroup);
01700     ClusterCoorSetIter iter(remoteCoors);
01701     for(iter=iter.begin(); iter!=iter.end(); iter++){
01702         ClusterCoorMsg *msg = new ClusterCoorMsg;
01703         msg->srcRank = myOutputRank;
01704         msg->clusterId = iter->clusterId;
01705         msg->dsum = iter->dsum;
01706         int dstRank = atomRankOnOutput(iter->clusterId);
01707         pIO[outputProcArray[dstRank]].recvClusterCoor(msg);
01708     }
01709     
01710     //Just send a local NULL msg to indicate the local wrapping
01711     //coordinates has finished.
01712     recvClusterCoor(NULL);
01713 #endif
01714 }
01715 
01716 //On the output proc, it's possible (could be a rare case) that recvClusterCoor
01717 //is executed before wrapCoor, so we need to make sure the local tmpCoorCon has
01718 //been calculated. This is why (numRemoteReqs+1) msgs are expected as the 
01719 //additional one is sent by itself when it finishes wrapping coordinates.
01720 // --Chao Mei
01721 void ParallelIOMgr::recvClusterCoor(ClusterCoorMsg *msg){
01722     //only add the msg from remote procs
01723     if(msg!=NULL) ccmBuf.add(msg);
01724 
01725     //include a msg sent by itself
01726     if(++numReqRecved == (numRemoteReqs+1)){
01727         numReqRecved = 0;
01728         integrateClusterCoor();
01729     }
01730 }
01731 
01732 void ParallelIOMgr::integrateClusterCoor(){
01733 #ifdef MEM_OPT_VERSION
01734     int fromIdx = coorInstance->fromAtomID;
01735     int toIdx = coorInstance->toAtomID;
01736     for(int i=0; i<ccmBuf.size(); i++){
01737         ClusterCoorMsg *msg = ccmBuf[i];
01738         int lidx = msg->clusterId - fromIdx;        
01739         tmpCoorCon[lidx] += msg->dsum;
01740     }
01741 
01742     //send back those msgs
01743     CProxy_ParallelIOMgr pIO(thisgroup);
01744     for(int i=0; i<ccmBuf.size(); i++){
01745         ClusterCoorMsg *msg = ccmBuf[i];
01746         int lidx = msg->clusterId - fromIdx;        
01747         if(simParameters->wrapAll || isWater[lidx]) {
01748             Lattice *lat = &(coorInstance->lattice);
01749             Vector coni = tmpCoorCon[lidx]/clusterSize[lidx];
01750             msg->dsum = (simParameters->wrapNearest ?
01751                   lat->wrap_nearest_delta(coni) : lat->wrap_delta(coni));
01752         }else{
01753             msg->dsum = 0.0;
01754         }
01755         pIO[outputProcArray[msg->srcRank]].recvFinalClusterCoor(msg);
01756     }
01757     ccmBuf.resize(0);    
01758 
01759     //It's possible that recvFinalClusterCoor is executed before integrateClusterCoor
01760     //on this processor (the other proc executes faster in doing wrapCoor). So avoid
01761     //this msg race, do send a message to itself to participate the reduction.
01762     if(numRemoteClusters!=0){
01763         recvFinalClusterCoor(NULL);
01764     } else {
01765         //this output proc is ready has the sum of coordinates of each cluster
01766         //on it, so it is ready to do the final wrap coor computation        
01767         int numMyAtoms = toIdx-fromIdx+1;
01768         ResizeArray<Vector> &data = coorInstance->data;
01769         ResizeArray<FloatVector> &fdata = coorInstance->fdata;
01770         for(int i=0; i<numMyAtoms; i++){
01771             if(!simParameters->wrapAll && !isWater[i]) continue;
01772             int lidx = clusterID[i]-fromIdx;
01773             if(lidx==i){
01774                 //the head atom of the cluster
01775                 Lattice *lat = &(coorInstance->lattice);
01776                 Vector coni = tmpCoorCon[lidx]/clusterSize[lidx];
01777                 tmpCoorCon[lidx] = (simParameters->wrapNearest ?
01778                   lat->wrap_nearest_delta(coni) : lat->wrap_delta(coni));
01779             }
01780             if(data.size()) data[i] += tmpCoorCon[lidx]; 
01781             //no operator += (FloatVector, Vector)
01782             if(fdata.size()) fdata[i] = fdata[i] + tmpCoorCon[lidx]; 
01783         }
01784         
01785         delete [] tmpCoorCon;
01786         tmpCoorCon = NULL;
01787         CProxy_CollectionMaster cm(mainMaster);
01788         cm.wrapCoorFinished();
01789     }
01790 #endif    
01791 }
01792 
01793 void ParallelIOMgr::recvFinalClusterCoor(ClusterCoorMsg *msg){
01794 #ifdef MEM_OPT_VERSION
01795     if(msg!=NULL){
01796         //only process the message sent from other procs!
01797         ClusterCoorElem one(msg->clusterId);
01798         ClusterCoorElem *ret = remoteCoors.find(one);
01799         ret->dsum = msg->dsum;
01800         delete msg;
01801     }
01802     
01803     if(++numCSMAck == (numRemoteClusters+1)){        
01804         //final wrap coor computation
01805         int fromIdx = coorInstance->fromAtomID;
01806         int toIdx = coorInstance->toAtomID;
01807         int numMyAtoms = toIdx-fromIdx+1;
01808         ResizeArray<Vector> &data = coorInstance->data;
01809         ResizeArray<FloatVector> &fdata = coorInstance->fdata;
01810         ClusterCoorElem tmp;
01811         for(int i=0; i<numMyAtoms; i++){
01812             if(!simParameters->wrapAll && !isWater[i]) continue;
01813             int cid = clusterID[i];
01814             int lidx = cid-fromIdx;
01815             if(lidx<0){
01816                 //this cid should be inside remoteCoors
01817                 tmp.clusterId = cid;
01818                 ClusterCoorElem *fone = remoteCoors.find(tmp);
01819                 if(data.size()) data[i] += fone->dsum; 
01820                 if(fdata.size()) fdata[i] = fdata[i] + fone->dsum; 
01821             }else{
01822                 if(lidx==i){
01823                     Lattice *lat = &(coorInstance->lattice);
01824                     Vector coni = tmpCoorCon[lidx]/clusterSize[lidx];
01825                     tmpCoorCon[lidx] = (simParameters->wrapNearest ?
01826                     lat->wrap_nearest_delta(coni) : lat->wrap_delta(coni));
01827                 }
01828                 if(data.size()) data[i] += tmpCoorCon[lidx]; 
01829                 if(fdata.size()) fdata[i] = fdata[i] + tmpCoorCon[lidx];
01830             }
01831         }
01832 
01833         delete [] tmpCoorCon;
01834         tmpCoorCon = NULL;
01835         CProxy_CollectionMaster cm(mainMaster);
01836         cm.wrapCoorFinished();
01837         numCSMAck = 0;
01838         remoteCoors.clear();
01839     }
01840 #endif
01841 }
01842 #include "ParallelIOMgr.def.h"

Generated on Thu Nov 23 01:17:13 2017 for NAMD by  doxygen 1.4.7