ParallelIOMgr.C

Go to the documentation of this file.
00001 
00002 #include "largefiles.h"  // must be first!
00003 
00004 #include <stdio.h>
00005 #include "BOCgroup.h"
00006 #include "Molecule.h"
00007 #include "Node.h"
00008 #include "Node.decl.h"
00009 #include "NamdState.h"
00010 #include "WorkDistrib.h"
00011 #include "PDB.h"
00012 #include "PatchMap.h"
00013 #include "PatchMap.inl"
00014 #include "packmsg.h"
00015 #include "HomePatch.h"
00016 #include "InfoStream.h"
00017 #include "CollectionMaster.h"
00018 #include "CollectionMgr.h"
00019 
00020 #include "ParallelIOMgr.decl.h"
00021 #include "ParallelIOMgr.h"
00022 
00023 #include "Output.h"
00024 #include "Random.h"
00025 
00026 #include <algorithm>
00027 using namespace std;
00028 
00029 ParallelIOMgr::ParallelIOMgr()
00030 {
00031     CkpvAccess(BOCclass_group).ioMgr = thisgroup;
00032 
00033     numInputProcs=-1;
00034     inputProcArray = NULL;
00035     numOutputProcs=-1;
00036     outputProcArray = NULL;
00037 
00038     procsReceived=0;
00039     hydroMsgRecved=0;
00040 
00041     totalMV.x = totalMV.y = totalMV.z = 0.0;
00042     totalMass = 0.0;
00043     totalCharge = 0.0;
00044 
00045     isOKToRecvHPAtoms = false;
00046     hpAtomsList = NULL;
00047 
00048     clusterID = NULL;
00049     clusterSize = NULL;
00050 
00051 #ifdef MEM_OPT_VERSION
00052     midCM = NULL;
00053 #endif
00054 
00055     isWater = NULL;
00056 
00057     numCSMAck = 0;
00058     numReqRecved = 0;
00059 
00060     sendAtomsThread = 0;
00061 
00062 #if COLLECT_PERFORMANCE_DATA
00063     numFixedAtomLookup = 0;
00064 #endif
00065 }
00066 
00067 ParallelIOMgr::~ParallelIOMgr()
00068 {
00069     delete [] inputProcArray;
00070     delete [] outputProcArray;
00071     delete [] clusterID;
00072     delete [] clusterSize;
00073 
00074 #ifdef MEM_OPT_VERSION
00075     delete midCM;
00076 #endif
00077 
00078     delete [] isWater;
00079 }
00080 
00081 #ifndef OUTPUT_SINGLE_FILE
00082 #error OUTPUT_SINGLE_FILE not defined!
00083 #endif
00084 
00085 // initialization needed for the parallel IO manager. Entry point through Scripttcl
00086 void ParallelIOMgr::initialize(Node *node)
00087 {
00088     simParameters = node->simParameters;
00089     molecule = node->molecule;
00090 
00091     numInputProcs = simParameters->numinputprocs;
00092     numOutputProcs = simParameters->numoutputprocs;
00093     numOutputWrts = simParameters->numoutputwrts;
00094 
00095 
00096     if(!CkMyPe()) {
00097         iout << iINFO << "Running with " <<numInputProcs<<" input processors.\n"<<endi;
00098         #if OUTPUT_SINGLE_FILE
00099         iout << iINFO << "Running with " <<numOutputProcs<<" output processors ("<<numOutputWrts<<" of them will output simultaneously).\n"<<endi;
00100         #else
00101         iout << iINFO << "Running with " <<numOutputProcs<<" output processors, and each of them will output to its own separate file.\n"<<endi;
00102         #endif
00103     }
00104 
00105     //build inputProcArray
00106    {
00107     inputProcArray = new int[numInputProcs];
00108     myInputRank = -1;
00109     for(int i=0; i<numInputProcs; ++i) {
00110       inputProcArray[i] = WorkDistrib::peDiffuseOrdering[(1+numOutputProcs+i)%CkNumPes()];
00111     }
00112     std::sort(inputProcArray, inputProcArray+numInputProcs);
00113     for(int i=0; i<numInputProcs; ++i) {
00114       if ( CkMyPe() == inputProcArray[i] ) {
00115         if ( myInputRank != -1 ) NAMD_bug("Duplicate input proc");
00116         myInputRank = i;
00117       }
00118     }
00119 
00120     if(!CkMyPe()) {
00121       iout << iINFO << "INPUT PROC LOCATIONS:";
00122       int i;
00123       for ( i=0; i<numInputProcs && i < 10; ++i ) {
00124         iout << " " << inputProcArray[i];
00125       }
00126       if ( i<numInputProcs ) iout << " ... " << inputProcArray[numInputProcs-1];
00127       iout << "\n" << endi;
00128     }
00129    }
00130 
00131     if(myInputRank!=-1) {
00132         //NOTE: this could further be optimized by pre-allocate the memory
00133         //for incoming atoms --Chao Mei
00134         int numMyAtoms = numInitMyAtomsOnInput();
00135         initAtoms.resize(numMyAtoms+100);  // extra space for orphan hydrogens
00136         initAtoms.resize(numMyAtoms);
00137         tmpRecvAtoms.resize(0);
00138     } else {
00139         initAtoms.resize(0);
00140         tmpRecvAtoms.resize(0);
00141     }
00142     hpIDList.resize(0);
00143 
00144     //build outputProcArray
00145     //spread the output processors across all the processors
00146    {
00147     outputProcArray = new int[numOutputProcs];
00148     outputProcFlags = new char[CkNumPes()];
00149     myOutputRank = -1;
00150     for(int i=0; i<numOutputProcs; ++i) {
00151       outputProcArray[i] = WorkDistrib::peDiffuseOrdering[(1+i)%CkNumPes()];
00152     }
00153     std::sort(outputProcArray, outputProcArray+numOutputProcs);
00154     for(int i=0; i<CkNumPes(); ++i) {
00155       outputProcFlags[i] = 0;
00156     }
00157     for(int i=0; i<numOutputProcs; ++i) {
00158       outputProcFlags[outputProcArray[i]] = 1;
00159       if ( CkMyPe() == outputProcArray[i] ) {
00160         if ( myOutputRank != -1 ) NAMD_bug("Duplicate output proc");
00161         myOutputRank = i;
00162       }
00163     }
00164 
00165     if(!CkMyPe()) {
00166       iout << iINFO << "OUTPUT PROC LOCATIONS:";
00167       int i;
00168       for ( i=0; i<numOutputProcs && i < 10; ++i ) {
00169         iout << " " << outputProcArray[i];
00170       }
00171       if ( i<numOutputProcs ) iout << " ... " << outputProcArray[numOutputProcs-1];
00172       iout << "\n" << endi;
00173     }
00174    }
00175 
00176 #ifdef MEM_OPT_VERSION
00177     if(myOutputRank!=-1) {
00178         midCM = new CollectionMidMaster(this);
00179     }
00180     remoteClusters.clear();
00181     csmBuf.resize(0);
00182     remoteCoors.clear();
00183     ccmBuf.resize(0);
00184 
00185     mainMaster = CollectionMgr::Object()->getMasterChareID();
00186 #endif
00187 }
00188 
00189 bool ParallelIOMgr::isOutputProcessor(int pe) {
00190    return outputProcFlags[pe];
00191 }
00192 
00193 int isOutputProcessor(int pe){ 
00194   return CProxy_ParallelIOMgr::ckLocalBranch(CkpvAccess(BOCclass_group).ioMgr)->isOutputProcessor(pe);
00195 }
00196 
00197 
00198 
00199 void ParallelIOMgr::readPerAtomInfo()
00200 {
00201 #ifdef MEM_OPT_VERSION
00202     if(myInputRank!=-1) {
00203         int myAtomLIdx, myAtomUIdx;
00204         getMyAtomsInitRangeOnInput(myAtomLIdx, myAtomUIdx);
00205 
00206         //1. read the file that contains per-atom info such as signature index
00207         molecule->read_binary_atom_info(myAtomLIdx, myAtomUIdx, initAtoms);
00208 
00209         //2. read coordinates and velocities of each atom if the velocity file
00210         //exists, otherwise, the velocity of each atom is randomly generated.
00211         //This has to be DONE AFTER THE FIRST STEP as the atom mass is required
00212         //if the velocity is generated randomly.
00213         readCoordinatesAndVelocity();
00214 
00215         //3. set every atom's output processor rank, i.e. the dest pe this
00216         //atom will be sent for writing positions and velocities etc.
00217         int oRank=atomRankOnOutput(myAtomLIdx);
00218         for(int i=oRank; i<numOutputProcs; i++) {
00219             int lIdx, uIdx; //indicates the range of atom ids outputProcArray[i] has
00220             getAtomsRangeOnOutput(lIdx, uIdx, i);
00221             if(lIdx > myAtomUIdx) break;
00222             int fid = lIdx>myAtomLIdx?lIdx:myAtomLIdx;
00223             int tid = uIdx>myAtomUIdx?myAtomUIdx:uIdx;
00224             for(int j=fid; j<=tid; j++) initAtoms[j-myAtomLIdx].outputRank = i;
00225         }
00226     }
00227 
00228     //read clusters
00229     if(myOutputRank!=-1) {
00230         //only when wrapAll or wrapWater is set, cluster info is required
00231         if(!(simParameters->wrapAll || simParameters->wrapWater)) return;
00232         readInfoForParOutput();
00233     }
00234 #endif
00235 }
00236 
00237 void ParallelIOMgr::readCoordinatesAndVelocity()
00238 {
00239 #ifdef MEM_OPT_VERSION
00240     int needFlip = 0;
00241     int myAtomLIdx, myAtomUIdx;
00242     getMyAtomsInitRangeOnInput(myAtomLIdx, myAtomUIdx);
00243     int myNumAtoms = myAtomUIdx-myAtomLIdx+1;
00244 
00245     //contains the data for Position and Velocity
00246     Vector *tmpData = new Vector[myNumAtoms];
00247 
00248     //begin to read coordinates
00249     //step1: open the file
00250     FILE *ifp = fopen(simParameters->binCoorFile, "rb");
00251     if(!ifp) {
00252         char s[256];
00253         sprintf(s, "The binary coordinate file %s cannot be opened on proc %d\n", simParameters->binCoorFile, CkMyPe());
00254         NAMD_err(s);
00255     }
00256     //step2: check whether flip is needed
00257     int filelen;
00258     fread(&filelen, sizeof(int32),1,ifp);
00259     char lenbuf[sizeof(int32)];
00260     memcpy(lenbuf, (const char *)&filelen, sizeof(int32));
00261     flipNum(lenbuf, sizeof(int32), 1);
00262     if(!memcmp(lenbuf, (const char *)&filelen, sizeof(int32))) {
00263         iout << iWARN << "Number of atoms in binary file " << simParameters->binCoorFile
00264              <<" is palindromic, assuming same endian.\n" << endi;
00265     }
00266     if(filelen!=molecule->numAtoms) {
00267         needFlip = 1;
00268         memcpy((void *)&filelen, lenbuf,sizeof(int32));
00269     }
00270     if(filelen!=molecule->numAtoms) {
00271         char s[256];
00272         sprintf(s, "Incorrect atom count in binary file %s", simParameters->binCoorFile);
00273         NAMD_die(s);
00274     }
00275     //step3: read the file specified by the range
00276     int64 offsetPos = ((int64)myAtomLIdx)*sizeof(Position);
00277 #ifdef WIN32
00278     if ( _fseeki64(ifp, offsetPos, SEEK_CUR) )
00279 #else
00280     if ( fseeko(ifp, offsetPos, SEEK_CUR) )
00281 #endif
00282     {
00283         char s[256];
00284         sprintf(s, "Error in seeking binary file %s on proc %d",  simParameters->binCoorFile, CkMyPe());
00285         NAMD_err(s);
00286     }
00287     size_t totalRead = fread(tmpData, sizeof(Vector), myNumAtoms, ifp);
00288     if(totalRead!=myNumAtoms) {
00289         char s[256];
00290         sprintf(s, "Error in reading binary file %s on proc %d",  simParameters->binCoorFile, CkMyPe());
00291         NAMD_err(s);
00292     }
00293     if(needFlip) flipNum((char *)tmpData, sizeof(BigReal), myNumAtoms*3);
00294     fclose(ifp);
00295     for(int i=0; i<myNumAtoms; i++) initAtoms[i].position = tmpData[i];
00296 
00297     //begin to read velocity
00298     //step1: generate velocity randomly or open the file
00299     if(!simParameters->binVelFile) {
00300         //generate velocity randomly
00301         Node::Object()->workDistrib->random_velocities_parallel(simParameters->initialTemp, initAtoms);
00302     } else {
00303         ifp = fopen(simParameters->binVelFile, "rb");
00304         if(!ifp) {
00305             char s[256];
00306             sprintf(s, "The binary velocity file %s cannot be opened on proc %d\n", simParameters->binVelFile, CkMyPe());
00307             NAMD_err(s);
00308         }
00309         //step2: check whether flip is needed
00310         fread(&filelen, sizeof(int32),1,ifp);
00311         memcpy(lenbuf, (const char *)&filelen, sizeof(int32));
00312         flipNum(lenbuf, sizeof(int32), 1);
00313         if(!memcmp(lenbuf, (const char *)&filelen, sizeof(int32))) {
00314             iout << iWARN << "Number of atoms in binary file " << simParameters->binVelFile
00315                  <<" is palindromic, assuming same endian.\n" << endi;
00316         }
00317         if(filelen!=molecule->numAtoms) {
00318             needFlip = 1;
00319             memcpy((void *)&filelen, lenbuf,sizeof(int32));
00320         }
00321         if(filelen!=molecule->numAtoms) {
00322             char s[256];
00323             sprintf(s, "Incorrect atom count in binary file %s", simParameters->binVelFile);
00324             NAMD_die(s);
00325         }
00326 
00327         //step3: read the file specified by the range
00328         int64 offsetPos = ((int64)myAtomLIdx)*sizeof(Velocity);
00329 #ifdef WIN32
00330         if ( _fseeki64(ifp, offsetPos, SEEK_CUR) )
00331 #else
00332         if ( fseeko(ifp, offsetPos, SEEK_CUR) )
00333 #endif
00334         {
00335             char s[256];
00336             sprintf(s, "Error in seeking binary file %s on proc %d",  simParameters->binVelFile, CkMyPe());
00337             NAMD_err(s);
00338         }
00339         totalRead = fread(tmpData, sizeof(Vector), myNumAtoms, ifp);
00340         if(totalRead!=myNumAtoms) {
00341             char s[256];
00342             sprintf(s, "Error in reading binary file %s on proc %d",  simParameters->binVelFile, CkMyPe());
00343             NAMD_err(s);
00344         }
00345         if(needFlip) flipNum((char *)tmpData, sizeof(BigReal), myNumAtoms*3);
00346         fclose(ifp);
00347         for(int i=0; i<myNumAtoms; i++) initAtoms[i].velocity = tmpData[i];
00348     }
00349 
00350     //begin to read reference coordinates
00351     //step1: use initial positions or open the file
00352     if(!simParameters->binRefFile) {
00353         for(int i=0; i<myNumAtoms; i++) initAtoms[i].fixedPosition = initAtoms[i].position;
00354     } else {
00355         ifp = fopen(simParameters->binRefFile, "rb");
00356         if(!ifp) {
00357             char s[256];
00358             sprintf(s, "The binary reference coordinate file %s cannot be opened on proc %d\n", simParameters->binRefFile, CkMyPe());
00359             NAMD_err(s);
00360         }
00361         //step2: check whether flip is needed
00362         fread(&filelen, sizeof(int32),1,ifp);
00363         memcpy(lenbuf, (const char *)&filelen, sizeof(int32));
00364         flipNum(lenbuf, sizeof(int32), 1);
00365         if(!memcmp(lenbuf, (const char *)&filelen, sizeof(int32))) {
00366             iout << iWARN << "Number of atoms in binary file " << simParameters->binRefFile
00367                  <<" is palindromic, assuming same endian.\n" << endi;
00368         }
00369         if(filelen!=molecule->numAtoms) {
00370             needFlip = 1;
00371             memcpy((void *)&filelen, lenbuf,sizeof(int32));
00372         }
00373         if(filelen!=molecule->numAtoms) {
00374             char s[256];
00375             sprintf(s, "Incorrect atom count in binary file %s", simParameters->binRefFile);
00376             NAMD_die(s);
00377         }
00378 
00379         //step3: read the file specified by the range
00380         int64 offsetPos = ((int64)myAtomLIdx)*sizeof(Position);
00381 #ifdef WIN32
00382         if ( _fseeki64(ifp, offsetPos, SEEK_CUR) )
00383 #else
00384         if ( fseeko(ifp, offsetPos, SEEK_CUR) )
00385 #endif
00386         {
00387             char s[256];
00388             sprintf(s, "Error in seeking binary file %s on proc %d",  simParameters->binRefFile, CkMyPe());
00389             NAMD_err(s);
00390         }
00391         totalRead = fread(tmpData, sizeof(Vector), myNumAtoms, ifp);
00392         if(totalRead!=myNumAtoms) {
00393             char s[256];
00394             sprintf(s, "Error in reading binary file %s on proc %d",  simParameters->binRefFile, CkMyPe());
00395             NAMD_err(s);
00396         }
00397         if(needFlip) flipNum((char *)tmpData, sizeof(BigReal), myNumAtoms*3);
00398         fclose(ifp);
00399         for(int i=0; i<myNumAtoms; i++) initAtoms[i].fixedPosition = tmpData[i];
00400     }
00401 
00402     delete [] tmpData;
00403 #endif
00404 }
00405 
00406 void ParallelIOMgr::readInfoForParOutput()
00407 {
00408     int fromIdx, toIdx; //atoms' range
00409     getMyAtomsRangeOnOutput(fromIdx,toIdx);
00410     int numMyAtoms = toIdx-fromIdx+1;
00411 
00412     clusterID = new int[numMyAtoms];
00413     clusterSize = new int[numMyAtoms];
00414 
00415     //Since the input proc also reads this file, all the checks
00416     //(file version check, atom record size check) are omitted here
00417     FILE *ifp = fopen(simParameters->binAtomFile, "rb");
00418     //read magic number to set needFlip
00419     int needFlip = 0;
00420     int magicNum;
00421     fread(&magicNum, sizeof(int), 1, ifp);
00422     if (magicNum!=COMPRESSED_PSF_MAGICNUM) {
00423         needFlip = 1;
00424     }
00425 
00426     //need to load isWater info
00427     isWater = new char[numMyAtoms];
00428     //seek from the end of the file (note offset is negative!)
00429     int64 offset = sizeof(char)*((int64)(fromIdx-molecule->numAtoms));
00430 #ifdef WIN32
00431     if ( _fseeki64(ifp, offset, SEEK_END) )
00432 #else
00433     if ( fseeko(ifp, offset, SEEK_END) )
00434 #endif
00435     {
00436         char s[256];
00437         sprintf(s, "Error in seeking binary file %s on proc %d",  simParameters->binAtomFile, CkMyPe());
00438         NAMD_err(s);
00439     }
00440     fread(isWater, sizeof(char), numMyAtoms, ifp);
00441     //there's no need for flipping as it's a char array
00442 
00443     //seek from the end of the file (note offset is negative!)
00444     offset = sizeof(int)*((int64)(fromIdx-molecule->numAtoms))
00445                    - sizeof(char)*((int64)(molecule->numAtoms));
00446 #ifdef WIN32
00447     if ( _fseeki64(ifp, offset, SEEK_END) )
00448 #else
00449     if ( fseeko(ifp, offset, SEEK_END) )
00450 #endif
00451     {
00452         char s[256];
00453         sprintf(s, "Error in seeking binary file %s on proc %d",  simParameters->binAtomFile, CkMyPe());
00454         NAMD_err(s);
00455     }
00456     fread(clusterID, sizeof(int), numMyAtoms, ifp);
00457     if(needFlip) flipNum((char *)clusterID, sizeof(int), numMyAtoms);
00458     fclose(ifp);
00459 
00460     //calculate cluster size (communication may be neccessary)
00461     ClusterElem one;
00462     for(int i=0; i<numMyAtoms; i++) {
00463         clusterSize[i] = 0;        
00464         int cid = clusterID[i];
00465         //check if the cluster id (i.e. the header of the atom of
00466         //the cluster) is in my local repository. If the cluster id
00467         //is not in my local repository, then it must appear in output
00468         //processors that are in front of this one because of the way
00469         //of determing cluster info for the molecular system.
00470         CmiAssert(cid<=toIdx);
00471         if(cid<fromIdx) {
00472             //on output procs ahead of me
00473             one.clusterId = cid;
00474             ClusterElem *ret = remoteClusters.find(one);
00475             if(ret==NULL) {
00476                 one.atomsCnt = 1;
00477                 remoteClusters.add(one);
00478             } else {
00479                 ret->atomsCnt++;
00480             }
00481         } else {
00482             int lidx = cid-fromIdx;
00483             CmiAssert(lidx<=i);
00484             clusterSize[lidx]++;
00485         }
00486     }
00487 
00488     //Prepare to send msgs to remote output procs to reduce the cluster size
00489     //Since the expected number of msgs to be very small, msgs to the same proc
00490     //are not aggregated. --Chao Mei
00491 #if 0
00492     printf("output[%d]=%d: prepare to send %d remote msgs for cluster size\n",
00493            myOutputRank, CkMyPe(), remoteClusters.size());
00494 #endif
00495 
00496     numRemoteClusters = remoteClusters.size();
00497     numCSMAck = 0; //set to 0 to prepare recving the final cluster size update
00498     CProxy_ParallelIOMgr pIO(thisgroup);
00499     ClusterSetIter iter(remoteClusters);
00500     for(iter=iter.begin(); iter!=iter.end(); iter++) {
00501         ClusterSizeMsg *msg = new ClusterSizeMsg;
00502         msg->srcRank = myOutputRank;
00503         msg->clusterId = iter->clusterId;
00504         msg->atomsCnt = iter->atomsCnt;
00505         int dstRank = atomRankOnOutput(iter->clusterId);
00506         pIO[outputProcArray[dstRank]].recvClusterSize(msg);
00507     }
00508 }
00509 
00510 void ParallelIOMgr::recvClusterSize(ClusterSizeMsg *msg)
00511 {
00512     csmBuf.add(msg); //added to buffer for reuse to send back to src
00513 
00514     //update cluster size has to be delayed to integration to prevent
00515     //data racing where the clusterSize has not been created!
00516 }
00517 
00518 void ParallelIOMgr::integrateClusterSize()
00519 {
00520     if(myOutputRank==-1) return;
00521     if(!(simParameters->wrapAll || simParameters->wrapWater)) return;
00522 
00523     int fromIdx, toIdx; //atoms' range
00524     getMyAtomsRangeOnOutput(fromIdx,toIdx);
00525 
00526     //calculated the final cluster size
00527     for(int i=0; i<csmBuf.size(); i++) {
00528         ClusterSizeMsg *msg = csmBuf[i];
00529         int lidx = msg->clusterId - fromIdx;
00530         clusterSize[lidx] += msg->atomsCnt;
00531     }
00532 
00533     CProxy_ParallelIOMgr pIO(thisgroup);
00534     for(int i=0; i<csmBuf.size(); i++) {
00535         ClusterSizeMsg *msg = csmBuf[i];
00536         int lidx = msg->clusterId - fromIdx;
00537         msg->atomsCnt = clusterSize[lidx];
00538         pIO[outputProcArray[msg->srcRank]].recvFinalClusterSize(msg);
00539     }
00540     numRemoteReqs = csmBuf.size();
00541     csmBuf.resize(0);
00542     
00543     //There's a possible msg race problem here that recvFinalClusterSize 
00544     //executes before integrateClusterSize because other proc finishes faster
00545     //in calculating the cluster size. The recvFinalClusterSize should be
00546     //executed after integrateClusterSize. To avoid this, a self message is
00547     //sent to participate the reduction.
00548     if(numRemoteClusters!=0){
00549         recvFinalClusterSize(NULL);
00550     }else{
00551         //this output proc already has the final cluster size for each atom
00552         int numMyAtoms = toIdx-fromIdx+1;
00553         for(int i=0; i<numMyAtoms; i++) {
00554             int lidx = clusterID[i]-fromIdx;
00555             clusterSize[i] = clusterSize[lidx];
00556         }
00557         
00558         #if 0 //write out cluster debug info
00559         char fname[128];
00560         sprintf(fname, "cluster.par.%d", CkMyPe());
00561         FILE *ofp = fopen(fname, "w");
00562         for(int i=0; i<numMyAtoms; i++) {
00563             fprintf(ofp, "%d: %d: %d\n", i+fromIdx, clusterID[i], clusterSize[i]);
00564         }
00565         fclose(ofp);
00566         #endif
00567     }
00568 }
00569 
00570 void ParallelIOMgr::recvFinalClusterSize(ClusterSizeMsg *msg)
00571 {
00572     //only process the message sent by other procs
00573     if(msg!=NULL) {
00574         //indicating a message from other procs
00575         ClusterElem one(msg->clusterId);
00576         ClusterElem *ret = remoteClusters.find(one);
00577         CmiAssert(ret!=NULL);
00578         ret->atomsCnt = msg->atomsCnt;
00579     }
00580     delete msg;
00581 
00582     //include a msg sent by itself for reduction
00583     if(++numCSMAck == (numRemoteClusters+1)) {
00584         //recved all the msgs needed to update the cluster size for each atom finally
00585         int fromIdx, toIdx; //atoms' range
00586         getMyAtomsRangeOnOutput(fromIdx,toIdx);
00587         int numMyAtoms = toIdx-fromIdx+1;
00588         ClusterElem tmp;
00589         for(int i=0; i<numMyAtoms; i++) {
00590             int cid = clusterID[i];
00591             int lidx = cid-fromIdx;
00592             if(lidx<0) {
00593                 //this cid should be inside remoteClusters
00594                 tmp.clusterId = cid;
00595                 ClusterElem *fone = remoteClusters.find(tmp);
00596                 clusterSize[i] = fone->atomsCnt;
00597             } else {
00598                 clusterSize[i] = clusterSize[lidx];
00599             }
00600         }
00601         numCSMAck = 0;
00602         remoteClusters.clear();
00603 
00604 #if 0 //write out cluster debug info
00605         char fname[128];
00606         sprintf(fname, "cluster.par.%d", CkMyPe());
00607         FILE *ofp = fopen(fname, "w");
00608         for(int i=0; i<numMyAtoms; i++) {
00609             fprintf(ofp, "%d: %d: %d\n", i+fromIdx, clusterID[i], clusterSize[i]);
00610         }
00611         fclose(ofp);
00612 #endif
00613 
00614     }
00615 }
00616 
00617 void ParallelIOMgr::migrateAtomsMGrp()
00618 {
00619     if(myInputRank==-1) return;
00620 
00621     //1. first get the list of atoms to be migrated
00622     //which should be few compared with the number of atoms
00623     //initially assigned to this input proc.
00624     AtomIDList toMigrateList; //the list of atoms to be migrated
00625     //the max distance from this processor of atoms to be sent
00626     int maxOffset = 0;
00627     for(int i=0; i<initAtoms.size(); i++) {
00628         //returns the proc id on which atom MPID resides on
00629         int parentRank = atomInitRankOnInput(initAtoms[i].MPID);
00630         if(parentRank != myInputRank) {
00631             toMigrateList.add(i);
00632             initAtoms[i].isValid = false;
00633             int tmp = parentRank - myInputRank;
00634             tmp = tmp>0 ? tmp : -tmp;
00635             if(tmp > maxOffset) maxOffset = tmp;
00636         }
00637     }
00638 
00639     //2. prepare atom migration messages
00640     //the messages are indexed as [-maxOffset,..., -1,0,1,..., maxOffset]
00641     //where the "0" is not used at all. It is added for the sake of
00642     //computing the index easily.
00643     InputAtomList *migLists = new InputAtomList[2*maxOffset+1];
00644     for(int i=0; i<toMigrateList.size(); i++) {
00645         int idx = toMigrateList[i];
00646         int parentRank = atomInitRankOnInput(initAtoms[idx].MPID);
00647         //decide which migList to put this atom
00648         int offset = parentRank - myInputRank + maxOffset;
00649         migLists[offset].add(initAtoms[idx]);
00650     }
00651 
00652     CProxy_ParallelIOMgr pIO(thisgroup);
00653     for(int i=0; i<2*maxOffset+1; i++) {
00654         int migLen = migLists[i].size();
00655         if(migLen>0) {
00656             MoveInputAtomsMsg *msg = new (migLen, 0)MoveInputAtomsMsg;
00657             msg->length = migLen;
00658             memcpy(msg->atomList, migLists[i].begin(), sizeof(InputAtom)*migLen);
00659             int destRank = i-maxOffset+myInputRank;
00660             pIO[inputProcArray[destRank]].recvAtomsMGrp(msg);
00661             migLists[i].clear();
00662         }
00663     }
00664     
00665     toMigrateList.clear();
00666     delete [] migLists;
00667 }
00668 
00669 void ParallelIOMgr::recvAtomsMGrp(MoveInputAtomsMsg *msg)
00670 {
00671     for(int i=0; i<msg->length; i++) {
00672         tmpRecvAtoms.add((msg->atomList)[i]);
00673     }
00674     delete msg;
00675 }
00676 
00677 void ParallelIOMgr::integrateMigratedAtoms()
00678 {
00679     if(myInputRank==-1) return;
00680 
00681     for(int i=0; i<tmpRecvAtoms.size(); i++) {
00682         tmpRecvAtoms[i].isValid = true;
00683         initAtoms.add(tmpRecvAtoms[i]);
00684     }
00685     tmpRecvAtoms.clear();
00686 
00687     //sort atom list based on hydrogenList value
00688     std::sort(initAtoms.begin(), initAtoms.end());
00689 
00690     //now compute the counters inside Molecule such as numFixedRigidBonds
00691     //which is based on the hydrogen group info
00692 
00693     int numFixedRigidBonds = 0;
00694     if(molecule->numRigidBonds){
00695         int parentIsFixed = 0;
00696         for(int i=0; i<initAtoms.size(); i++) {
00697             InputAtom *one = &(initAtoms[i]);
00698             if(!one->isValid) continue;
00699             if(one->isGP) {
00700                 parentIsFixed = one->atomFixed;
00701                 InputAtom *a1 = &(initAtoms[i+1]);
00702                 InputAtom *a2 = &(initAtoms[i+2]);
00703                 if((one->rigidBondLength>0.0) &&
00704                    a1->atomFixed && a2->atomFixed) {
00705                     numFixedRigidBonds++;
00706                 }
00707             }else{
00708                 if((one->rigidBondLength>0.0) &&
00709                    one->atomFixed && parentIsFixed) {
00710                     numFixedRigidBonds++;
00711                 }
00712             }
00713         }
00714     }
00715 
00716     int numFixedGroups = 0;
00717     if(molecule->numFixedAtoms){        
00718         for(int i=0; i<initAtoms.size();) {
00719             InputAtom *one = &(initAtoms[i]);
00720             if(!one->isValid){
00721                 i++;
00722                 continue;
00723             }
00724             if(one->isGP) {
00725                 int allFixed = 1;                
00726                 for(int j=0; j<one->hydrogenGroupSize; j++){
00727                     InputAtom *a1 = &(initAtoms[i+j]);
00728                     allFixed = allFixed & a1->atomFixed;
00729                     if(!allFixed) break;
00730                 }
00731                 if(allFixed) numFixedGroups++;                
00732                 i += one->hydrogenGroupSize;
00733             }
00734         }
00735     }
00736     
00737     CProxy_ParallelIOMgr pIO(thisgroup);
00738     HydroBasedMsg *msg = new HydroBasedMsg;
00739     msg->numFixedGroups = numFixedGroups;
00740     msg->numFixedRigidBonds = numFixedRigidBonds;
00741     pIO[0].recvHydroBasedCounter(msg);
00742 }
00743 
00744 void ParallelIOMgr::updateMolInfo()
00745 {
00746 #ifdef MEM_OPT_VERSION
00747     if(myInputRank==-1) return;
00748 
00749     CProxy_ParallelIOMgr pIO(thisgroup);
00750 
00751     MolInfoMsg *msg = new MolInfoMsg;
00752     msg->numBonds = msg->numCalcBonds = 0;
00753     msg->numAngles = msg->numCalcAngles = 0;
00754     msg->numDihedrals = msg->numCalcDihedrals = 0;
00755     msg->numImpropers = msg->numCalcImpropers = 0;
00756     msg->numCrossterms = msg->numCalcCrossterms = 0;
00757     msg->numExclusions = msg->numCalcExclusions = 0;
00758     int numFullExclusions = msg->numCalcFullExclusions = 0;
00759     // JLai
00760     msg->numLJPairs = msg->numCalcLJPairs = 0;
00761     // End of JLai
00762     msg->numRigidBonds = 0;
00763     msg->totalMass = 0.0;
00764     msg->totalCharge = 0.0;
00765 
00766     //calculate the tuples this input processor have
00767     AtomSignature *atomSigPool = molecule->atomSigPool;
00768     ExclusionSignature *exclSigPool = molecule->exclSigPool;
00769     for(int i=0; i<initAtoms.size(); i++) {
00770         AtomSignature *thisSig = &atomSigPool[initAtoms[i].sigId];
00771         msg->numBonds += thisSig->bondCnt;
00772         msg->numAngles += thisSig->angleCnt;
00773         msg->numDihedrals += thisSig->dihedralCnt;
00774         msg->numImpropers += thisSig->improperCnt;
00775         msg->numCrossterms += thisSig->crosstermCnt;
00776         // JLai
00777         msg->numLJPairs += thisSig->gromacsPairCnt;
00778         // End of JLai
00779 
00780         ExclusionSignature *exclSig = &exclSigPool[initAtoms[i].exclId];
00781         msg->numExclusions += (exclSig->fullExclCnt + exclSig->modExclCnt);
00782         numFullExclusions += exclSig->fullExclCnt;
00783 
00784         if(initAtoms[i].rigidBondLength > 0.0) msg->numRigidBonds++;
00785 
00786         msg->totalMass += initAtoms[i].mass;
00787         msg->totalCharge += initAtoms[i].charge;
00788     }
00789 
00790     //deal with numCalc* which is related with fixed atoms!
00791     if(molecule->numFixedAtoms>0 && ! simParameters->fixedAtomsForces) {
00792         //if there's fixed atoms, calcExclusions needs to be calculated
00793         //Since it's possible the atom inside the this exclusion set is on
00794         //another input processor, we have to resort to the global fixed atoms
00795         //info inside the Molecule object. The number of such accesses should
00796         //be very small! --Chao Mei
00797         int sAId = initAtoms[0].id;
00798         int remoteCnt=0; //stats info
00799         for(int i=0; i<initAtoms.size(); i++) {
00800             //When all the atoms in the set are fixed, the elem (Bond etc.)
00801             //is not counted as a calc*.
00802             int myAId = initAtoms[i].id;
00803             AtomSignature *thisSig = &atomSigPool[initAtoms[i].sigId];
00804             ExclusionSignature *exclSig = &exclSigPool[initAtoms[i].exclId];
00805             if(!initAtoms[i].atomFixed) {
00806                 msg->numCalcBonds += thisSig->bondCnt;                
00807                 msg->numCalcAngles += thisSig->angleCnt;
00808                 msg->numCalcDihedrals += thisSig->dihedralCnt;
00809                 msg->numCalcImpropers += thisSig->improperCnt;
00810                 msg->numCalcCrossterms += thisSig->crosstermCnt;
00811                 msg->numCalcExclusions+=(exclSig->fullExclCnt+exclSig->modExclCnt);
00812                 msg->numCalcFullExclusions+=(exclSig->fullExclCnt);
00813                 continue;
00814             }
00815                        
00816             //1. Bonds
00817             for(int j=0; j<thisSig->bondCnt; j++) {            
00818                 TupleSignature *bsig = &(thisSig->bondSigs[j]);
00819                 int a1 = myAId + bsig->offset[0];
00820                 if(!isAtomFixed(sAId, a1)) msg->numCalcBonds++;
00821             }
00822             
00823             //2. Angles
00824             for(int j=0; j<thisSig->angleCnt; j++) {            
00825                 TupleSignature *bsig = &(thisSig->angleSigs[j]);
00826                 int a1 = myAId + bsig->offset[0];
00827                 int a2 = myAId + bsig->offset[1];
00828                 if(!isAtomFixed(sAId, a1) || !isAtomFixed(sAId, a2)) 
00829                     msg->numCalcAngles++;
00830             }
00831 
00832             //3. Dihedrals
00833             for(int j=0; j<thisSig->dihedralCnt; j++) {            
00834                 TupleSignature *bsig = &(thisSig->dihedralSigs[j]);
00835                 int a1 = myAId + bsig->offset[0];
00836                 int a2 = myAId + bsig->offset[1];
00837                 int a3 = myAId + bsig->offset[2];
00838                 if(!isAtomFixed(sAId, a1) || 
00839                    !isAtomFixed(sAId, a2) ||
00840                    !isAtomFixed(sAId, a3)) 
00841                     msg->numCalcDihedrals++;
00842             }
00843 
00844             //4. Impropers
00845             for(int j=0; j<thisSig->improperCnt; j++) {            
00846                 TupleSignature *bsig = &(thisSig->improperSigs[j]);
00847                 int a1 = myAId + bsig->offset[0];
00848                 int a2 = myAId + bsig->offset[1];
00849                 int a3 = myAId + bsig->offset[2];
00850                 if(!isAtomFixed(sAId, a1) || 
00851                    !isAtomFixed(sAId, a2) ||
00852                    !isAtomFixed(sAId, a3)) 
00853                     msg->numCalcImpropers++;
00854             }
00855 
00856             //5. Crossterms
00857             for(int j=0; j<thisSig->crosstermCnt; j++) {            
00858                 TupleSignature *bsig = &(thisSig->crosstermSigs[j]);
00859                 int a1 = myAId + bsig->offset[0];
00860                 int a2 = myAId + bsig->offset[1];
00861                 int a3 = myAId + bsig->offset[2];
00862                 int a4 = myAId + bsig->offset[3];
00863                 int a5 = myAId + bsig->offset[4];
00864                 int a6 = myAId + bsig->offset[5];
00865                 int a7 = myAId + bsig->offset[6];
00866 
00867                 if(!isAtomFixed(sAId, a1) || 
00868                    !isAtomFixed(sAId, a2) ||
00869                    !isAtomFixed(sAId, a3) ||
00870                    !isAtomFixed(sAId, a4) ||
00871                    !isAtomFixed(sAId, a5) ||
00872                    !isAtomFixed(sAId, a6) ||
00873                    !isAtomFixed(sAId, a7)) 
00874                     msg->numCalcDihedrals++;
00875             }
00876             
00877             //6: Exclusions            
00878             //this atom is fixed, check atoms in the exclusion set
00879             for(int j=0; j<exclSig->fullExclCnt; j++) {
00880                 int thisAId = exclSig->fullOffset[j]+myAId;
00881                 if(!isAtomFixed(sAId, thisAId)) { msg->numCalcExclusions++; msg->numCalcFullExclusions++; }
00882             }
00883             for(int j=0; j<exclSig->modExclCnt; j++) {
00884                 int thisAId = exclSig->modOffset[j]+myAId;
00885                 if(!isAtomFixed(sAId, thisAId)) msg->numCalcExclusions++;
00886             }
00887 
00888             //7: GromacsPair
00889             for(int j=0; j<thisSig->gromacsPairCnt; j++) {
00890                 TupleSignature *bsig = &(thisSig->gromacsPairSigs[j]);
00891                 int a1 = myAId + bsig->offset[0];
00892                 int a2 = myAId + bsig->offset[1];
00893                 if(!isAtomFixed(sAId, a1) || 
00894                    !isAtomFixed(sAId, a2))
00895                     msg->numCalcLJPairs++;
00896             }
00897         }
00898 #if COLLECT_PERFORMANCE_DATA
00899         printf("Num fixedAtom lookup on proc %d is %d\n", CkMyPe(), numFixedAtomLookup);
00900 #endif
00901     } else {
00902         //no fixed atoms, numCalc* is same with numExclusions
00903         msg->numCalcBonds = msg->numBonds;
00904         msg->numCalcAngles = msg->numAngles;
00905         msg->numCalcDihedrals = msg->numDihedrals;
00906         msg->numCalcImpropers = msg->numImpropers;
00907         msg->numCalcCrossterms = msg->numCrossterms;
00908         msg->numCalcExclusions = msg->numExclusions;
00909         msg->numCalcFullExclusions = numFullExclusions;
00910     }
00911 
00912 
00913     if(!simParameters->comMove) {
00914         //to remove the center of mass motion from a molecule.
00915         //first calculate the values on every input proc, then reduce.
00916         //For more info, refer to WorkDistrib::remove_com_motion
00917         //-Chao Mei
00918         (msg->totalMV).x = 0.0;
00919         (msg->totalMV).y = 0.0;
00920         (msg->totalMV).z = 0.0;
00921         for (int i=0; i<initAtoms.size(); i++) {            
00922             msg->totalMV += initAtoms[i].mass * initAtoms[i].velocity;
00923         }
00924     }
00925 
00926     //always send to the master processor (proc 0)
00927     pIO[0].recvMolInfo(msg);
00928 #endif
00929 }
00930 
00931 //only executed on proc 0
00932 void ParallelIOMgr::recvMolInfo(MolInfoMsg *msg)
00933 {
00934     molecule->numBonds += msg->numBonds;
00935     molecule->numCalcBonds += msg->numCalcBonds;
00936     molecule->numAngles += msg->numAngles;
00937     molecule->numCalcAngles += msg->numCalcAngles;
00938     molecule->numDihedrals += msg->numDihedrals;
00939     molecule->numCalcDihedrals += msg->numCalcDihedrals;
00940     molecule->numImpropers += msg->numImpropers;
00941     molecule->numCalcImpropers += msg->numCalcImpropers;
00942     molecule->numCrossterms += msg->numCrossterms;
00943     molecule->numCalcCrossterms += msg->numCalcCrossterms;
00944     molecule->numTotalExclusions += msg->numExclusions;
00945     molecule->numCalcExclusions += msg->numCalcExclusions;
00946     molecule->numCalcFullExclusions += msg->numCalcFullExclusions;
00947     molecule->numRigidBonds += msg->numRigidBonds;
00948 
00949     totalMass += msg->totalMass;
00950     totalCharge += msg->totalCharge;
00951 
00952     if(!simParameters->comMove) {
00953         totalMV += msg->totalMV;        
00954     }
00955 
00956     if(++procsReceived == numInputProcs) {
00957         //received all the counters
00958         msg->numBonds = molecule->numBonds;
00959         msg->numCalcBonds = molecule->numCalcBonds;
00960         msg->numAngles = molecule->numAngles;
00961         msg->numCalcAngles = molecule->numCalcAngles;
00962         msg->numDihedrals = molecule->numDihedrals;
00963         msg->numCalcDihedrals = molecule->numCalcDihedrals;
00964         msg->numImpropers = molecule->numImpropers;
00965         msg->numCalcImpropers = molecule->numCalcImpropers;
00966         msg->numCrossterms = molecule->numCrossterms;
00967         msg->numCalcCrossterms = molecule->numCalcCrossterms;
00968         msg->numExclusions = molecule->numTotalExclusions/2;
00969         msg->numCalcExclusions = molecule->numCalcExclusions/2;
00970         msg->numCalcFullExclusions = molecule->numCalcFullExclusions/2;
00971         msg->numRigidBonds = molecule->numRigidBonds;
00972 
00973         msg->totalMass = totalMass;
00974         msg->totalCharge = totalCharge;
00975 
00976         if(!simParameters->comMove) {
00977             msg->totalMV = totalMV;            
00978         }
00979 
00980         CProxy_ParallelIOMgr pIO(thisgroup);
00981         pIO.bcastMolInfo(msg);
00982 
00983         //reset to 0 for the next p2p-based reduction on input procs
00984         procsReceived = 0;
00985     } else delete msg;
00986 }
00987 
00988 void ParallelIOMgr::bcastMolInfo(MolInfoMsg *msg)
00989 {
00990 #ifdef MEM_OPT_VERSION
00991     if(myInputRank!=-1) {
00992         if(!simParameters->comMove) {
00993             //needs to remove the center of mass motion from a molecule
00994             Vector val = msg->totalMV / msg->totalMass;
00995             for (int i=0; i<initAtoms.size(); i++) initAtoms[i].velocity -= val;
00996         }
00997     }
00998 
00999     //only the rank 0 in the SMP node update the Molecule object
01000     if(CmiMyRank()) {
01001         delete msg;
01002         return;
01003     }
01004 
01005     molecule->numBonds = msg->numBonds;
01006     molecule->numCalcBonds = msg->numCalcBonds;
01007     molecule->numAngles = msg->numAngles;
01008     molecule->numCalcAngles = msg->numCalcAngles;
01009     molecule->numDihedrals = msg->numDihedrals;
01010     molecule->numCalcDihedrals = msg->numCalcDihedrals;
01011     molecule->numImpropers = msg->numImpropers;
01012     molecule->numCalcImpropers = msg->numCalcImpropers;
01013     molecule->numCrossterms = msg->numCrossterms;
01014     molecule->numCalcCrossterms = msg->numCalcCrossterms;
01015 
01016     molecule->numTotalExclusions = msg->numExclusions;
01017     molecule->numCalcExclusions = msg->numCalcExclusions;
01018     molecule->numCalcFullExclusions = msg->numCalcFullExclusions;
01019 
01020     molecule->numRigidBonds = msg->numRigidBonds;
01021     delete msg;
01022 
01023     if(!CkMyPe()) {
01024         iout << iINFO << "LOADED " << molecule->numTotalExclusions << " TOTAL EXCLUSIONS\n" << endi;
01025         if(!simParameters->comMove) {
01026             iout << iINFO << "REMOVING COM VELOCITY "
01027                  << (PDBVELFACTOR * (msg->totalMV / msg->totalMass))<< "\n" <<endi;
01028         }
01029     }
01030 #endif
01031 }
01032 
01033 //only called on PE0
01034 void ParallelIOMgr::recvHydroBasedCounter(HydroBasedMsg *msg){
01035     molecule->numFixedRigidBonds += msg->numFixedRigidBonds;
01036     molecule->numFixedGroups += msg->numFixedGroups;
01037 
01038     if(++hydroMsgRecved == numInputProcs){
01039         msg->numFixedRigidBonds = molecule->numFixedRigidBonds;
01040         msg->numFixedGroups = molecule->numFixedGroups;
01041         CProxy_ParallelIOMgr pIO(thisgroup);
01042         pIO.bcastHydroBasedCounter(msg);
01043         hydroMsgRecved = 0;
01044     }else delete msg;
01045 }
01046 
01047 void ParallelIOMgr::bcastHydroBasedCounter(HydroBasedMsg *msg){
01048 #ifdef MEM_OPT_VERSION
01049     //only the rank 0 in the SMP node update the Molecule object
01050     if(CmiMyRank()) {
01051         delete msg;
01052         return;
01053     }
01054     molecule->numFixedRigidBonds = msg->numFixedRigidBonds;
01055     molecule->numFixedGroups = msg->numFixedGroups;
01056     delete msg;
01057 
01058     if(!CkMyPe()) {
01059         iout << iINFO << "****************************\n";
01060         iout << iINFO << "STRUCTURE SUMMARY:\n";
01061         iout << iINFO << molecule->numAtoms << " ATOMS\n";
01062         iout << iINFO << molecule->numBonds << " BONDS\n";
01063         iout << iINFO << molecule->numAngles << " ANGLES\n";
01064         iout << iINFO << molecule->numDihedrals << " DIHEDRALS\n";
01065         iout << iINFO << molecule->numImpropers << " IMPROPERS\n";
01066         iout << iINFO << molecule->numCrossterms << " CROSSTERMS\n";
01067         iout << iINFO << molecule->numExclusions << " EXCLUSIONS\n";
01068 
01069             //****** BEGIN CHARMM/XPLOR type changes
01070         if ((molecule->numMultipleDihedrals) && (simParameters->paraTypeXplorOn)){
01071             iout << iINFO << molecule->numMultipleDihedrals 
01072              << " DIHEDRALS WITH MULTIPLE PERIODICITY (BASED ON PSF FILE)\n";
01073         }
01074         if ((molecule->numMultipleDihedrals) && (simParameters->paraTypeCharmmOn)){
01075             iout << iINFO << molecule->numMultipleDihedrals 
01076          << " DIHEDRALS WITH MULTIPLE PERIODICITY IGNORED (BASED ON PSF FILE) \n";
01077             iout << iINFO  
01078          << " CHARMM MULTIPLICITIES BASED ON PARAMETER FILE INFO! \n";
01079         }
01080             //****** END CHARMM/XPLOR type changes
01081 
01082         if (molecule->numMultipleImpropers){
01083             iout << iINFO << molecule->numMultipleImpropers 
01084                  << " IMPROPERS WITH MULTIPLE PERIODICITY\n";
01085         }
01086 
01087         if (simParameters->fixedAtomsOn)
01088            iout << iINFO << molecule->numFixedAtoms << " FIXED ATOMS\n";
01089         
01090 
01091         if (simParameters->rigidBonds)        
01092            iout << iINFO << molecule->numRigidBonds << " RIGID BONDS\n";        
01093 
01094         if (simParameters->fixedAtomsOn && simParameters->rigidBonds)        
01095            iout << iINFO << molecule->numFixedRigidBonds <<
01096                 " RIGID BONDS BETWEEN FIXED ATOMS\n";
01097 
01098         iout << iINFO << molecule->num_deg_freedom(1)
01099              << " DEGREES OF FREEDOM\n";
01100 
01101         iout << iINFO << molecule->numHydrogenGroups << " HYDROGEN GROUPS\n";
01102         iout << iINFO << molecule->maxHydrogenGroupSize
01103             << " ATOMS IN LARGEST HYDROGEN GROUP\n";
01104         iout << iINFO << molecule->numMigrationGroups << " MIGRATION GROUPS\n";
01105         iout << iINFO << molecule->maxMigrationGroupSize
01106             << " ATOMS IN LARGEST MIGRATION GROUP\n";
01107         if (simParameters->fixedAtomsOn)
01108         {
01109            iout << iINFO << molecule->numFixedGroups <<
01110                 " HYDROGEN GROUPS WITH ALL ATOMS FIXED\n";
01111         }
01112         
01113         iout << iINFO << "TOTAL MASS = " << totalMass << " amu\n"; 
01114         iout << iINFO << "TOTAL CHARGE = " << totalCharge << " e\n"; 
01115 
01116         BigReal volume = simParameters->lattice.volume();
01117         if ( volume ) {
01118             iout << iINFO << "MASS DENSITY = "
01119                 << ((totalMass/volume) / 0.6022) << " g/cm^3\n";
01120             iout << iINFO << "ATOM DENSITY = "
01121                 << (molecule->numAtoms/volume) << " atoms/A^3\n";
01122         }
01123     
01124         iout << iINFO << "*****************************\n";
01125         iout << endi;
01126         fflush(stdout);               
01127     }
01128 #endif
01129 }
01130 
01131 void ParallelIOMgr::calcAtomsInEachPatch()
01132 {
01133     if(myInputRank==-1) return;
01134 
01135     PatchMap *patchMap = PatchMap::Object();
01136     int numPatches = patchMap->numPatches();
01137 
01138     patchMap->initTmpPatchAtomsList();
01139 
01140     //each list contains the atom index to the initAtoms
01141     vector<int> *eachPatchAtomList = patchMap->getTmpPatchAtomsList();
01142 
01143     CProxy_PatchMgr pm(CkpvAccess(BOCclass_group).patchMgr);
01144     PatchMgr *patchMgr = pm.ckLocalBranch();
01145 
01146     int pid=0;
01147     const Lattice lattice = simParameters->lattice;
01148     for(int i=0; i<initAtoms.size(); i++) {
01149         InputAtom *atom = &(initAtoms[i]);
01150         if(!atom->isValid) continue;
01151         if(atom->isMP) {
01152             pid = patchMap->assignToPatch(atom->position, lattice);
01153         }
01154         eachPatchAtomList[pid].push_back(i);
01155     }
01156 
01157     CProxy_ParallelIOMgr pIO(thisgroup);
01158 
01159     int patchCnt = 0;
01160     for(int i=0; i<numPatches; i++) {
01161         int cursize = eachPatchAtomList[i].size();
01162         if(cursize>0) patchCnt++;
01163     }
01164 
01165     AtomsCntPerPatchMsg *msg = NULL;
01166     if(simParameters->fixedAtomsOn) {
01167         msg = new (patchCnt, patchCnt, patchCnt, 0)AtomsCntPerPatchMsg;
01168     } else {
01169         msg = new (patchCnt, patchCnt, 0, 0)AtomsCntPerPatchMsg;
01170     }
01171 
01172     msg->length = patchCnt;
01173     patchCnt = 0;
01174     for(int i=0; i<numPatches; i++) {
01175         int cursize = eachPatchAtomList[i].size();
01176         if(cursize>0) {
01177             if ( cursize > USHRT_MAX ) {
01178               char errstr[512];
01179               sprintf(errstr, "Patch %d exceeds %d atoms.", i, USHRT_MAX);
01180               NAMD_die(errstr);
01181             }
01182             msg->pidList[patchCnt] = i;
01183             msg->atomsCntList[patchCnt] = cursize;
01184             patchCnt++;
01185         }
01186     }
01187 
01188     if(simParameters->fixedAtomsOn) {
01189         patchCnt = 0;
01190         for(int i=0; i<numPatches; i++) {
01191             int cursize = eachPatchAtomList[i].size();
01192             if(cursize>0) {
01193                 int fixedCnt = 0;
01194                 for(int j=0; j<cursize; j++) {
01195                     int aid = eachPatchAtomList[i][j];
01196                     //atomFixed is either 0 or 1
01197                     fixedCnt += initAtoms[aid].atomFixed;
01198                 }
01199                 msg->fixedAtomsCntList[patchCnt] = fixedCnt;
01200                 patchCnt++;
01201             }
01202         }
01203     }
01204 
01205     pIO[0].recvAtomsCntPerPatch(msg);
01206 
01207 }
01208 
01209 void ParallelIOMgr::recvAtomsCntPerPatch(AtomsCntPerPatchMsg *msg)
01210 {
01211 #ifdef MEM_OPT_VERSION
01212     PatchMap *patchMap = PatchMap::Object();
01213     for(int i=0; i<msg->length; i++) {
01214         int pid = msg->pidList[i];
01215         int oldNum = patchMap->numAtoms(pid);
01216         if ( oldNum + msg->atomsCntList[i] > USHRT_MAX ) {
01217           char errstr[512];
01218           sprintf(errstr, "Patch %d exceeds %d atoms.", pid, USHRT_MAX);
01219           NAMD_die(errstr);
01220         }
01221         patchMap->setNumAtoms(pid, oldNum+msg->atomsCntList[i]);
01222         if(simParameters->fixedAtomsOn) {
01223             oldNum = patchMap->numFixedAtoms(pid);
01224             patchMap->setNumFixedAtoms(pid, oldNum+msg->fixedAtomsCntList[i]);
01225         }
01226     }
01227     delete msg;
01228 
01229     if(++procsReceived == numInputProcs) {
01230         //print max PATCH info
01231         int maxAtoms = -1;
01232         int maxPatch = -1;
01233         int totalAtoms = 0;
01234         for(int i=0; i<patchMap->numPatches(); i++) {
01235             int cnt = patchMap->numAtoms(i);
01236             totalAtoms += cnt;
01237             if(cnt>maxAtoms) {
01238                 maxAtoms = cnt;
01239                 maxPatch = i;
01240             }
01241         }
01242         procsReceived = 0;
01243         iout << iINFO << "LARGEST PATCH (" << maxPatch <<
01244              ") HAS " << maxAtoms << " ATOMS\n" << endi;
01245         if ( totalAtoms !=  Node::Object()->molecule->numAtoms ) {
01246           char errstr[512];
01247           sprintf(errstr, "Incorrect atom count in void ParallelIOMgr::recvAtomsCntPerPatch: %d vs %d", totalAtoms, Node::Object()->molecule->numAtoms);
01248           NAMD_die(errstr);
01249         }
01250     }
01251 #endif
01252 }
01253 
01254 void call_sendAtomsToHomePatchProcs(void *arg)
01255 {
01256   ((ParallelIOMgr*)arg)->sendAtomsToHomePatchProcs();
01257 }
01258 
01259 void ParallelIOMgr::sendAtomsToHomePatchProcs()
01260 {
01261 #ifdef MEM_OPT_VERSION
01262     if(myInputRank==-1) return;
01263 
01264     if ( sendAtomsThread == 0 ) {
01265       sendAtomsThread = CthCreate((CthVoidFn)call_sendAtomsToHomePatchProcs,this,0);
01266       CthAwaken(sendAtomsThread);
01267       return;
01268     }
01269     sendAtomsThread = 0;
01270     numAcksOutstanding = 0;
01271 
01272     PatchMap *patchMap = PatchMap::Object();
01273     int numPatches = patchMap->numPatches();
01274     vector<int> *eachPatchAtomList = patchMap->getTmpPatchAtomsList();
01275 
01276     //each element (proc) contains the list of ids of patches which will stay
01277     //on that processor
01278     ResizeArray<int> *procList = new ResizeArray<int>[CkNumPes()];
01279     ResizeArray<int> pesToSend;
01280     for(int i=0; i<numPatches; i++) {
01281         if(eachPatchAtomList[i].size()==0) continue;
01282         int onPE = patchMap->node(i);
01283         if ( procList[onPE].size() == 0 ) pesToSend.add(onPE);
01284         procList[onPE].add(i);
01285     }
01286 
01287     Random(CkMyPe()).reorder(pesToSend.begin(),pesToSend.size());
01288     //CkPrintf("Pe %d ParallelIOMgr::sendAtomsToHomePatchProcs sending to %d pes\n",CkMyPe(),pesToSend.size());
01289 
01290     //go over every processor to send a message if necessary
01291     //TODO: Optimization for local home patches to save temp memory usage??? -CHAOMEI
01292     CProxy_ParallelIOMgr pIO(thisgroup);
01293     for(int k=0; k<pesToSend.size(); k++) {
01294         const int i = pesToSend[k];
01295         int len = procList[i].size();
01296         if(len==0) continue;
01297 
01298         if ( numAcksOutstanding >= 10 ) {
01299           //CkPrintf("Pe %d ParallelIOMgr::sendAtomsToHomePatchProcs suspending at %d of %d pes\n",CkMyPe(),k,pesToSend.size());
01300           //fflush(stdout);
01301           sendAtomsThread = CthSelf();
01302           CthSuspend();
01303         }
01304         ++numAcksOutstanding;
01305 
01306         //prepare a message to send
01307         int patchCnt = len;
01308         int totalAtomCnt = 0;
01309         for(int j=0; j<len; j++) {
01310             int pid = procList[i][j];
01311             int atomCnt = eachPatchAtomList[pid].size();
01312             totalAtomCnt += atomCnt;
01313         }
01314 
01315         MovePatchAtomsMsg *msg = new (patchCnt, patchCnt, totalAtomCnt, 0)MovePatchAtomsMsg;
01316         msg->from = CkMyPe();
01317         msg->patchCnt = patchCnt;
01318         int atomIdx = 0;
01319         for(int j=0; j<len; j++) {
01320             int pid = procList[i][j];
01321             int atomCnt = eachPatchAtomList[pid].size();
01322             msg->pidList[j] = pid;
01323             msg->sizeList[j] = atomCnt;
01324             for(int k=0; k<atomCnt; k++, atomIdx++) {
01325                 int aid = eachPatchAtomList[pid][k];
01326                 FullAtom one = initAtoms[aid];
01327                 //HACK to re-sort the atom list after receiving the atom list on
01328                 //home patch processor -Chao Mei
01329                 one.hydVal = initAtoms[aid].hydList;
01330                 msg->allAtoms[atomIdx] = one;
01331             }
01332         }
01333         pIO[i].recvAtomsToHomePatchProcs(msg);
01334 
01335         procList[i].clear();
01336     }
01337 
01338     //clean up to free space
01339     delete [] procList;
01340     patchMap->delTmpPatchAtomsList();
01341 
01342     //free the space occupied by the list that contains the input atoms
01343     initAtoms.clear();
01344 #endif
01345 }
01346 
01347 void ParallelIOMgr::ackAtomsToHomePatchProcs()
01348 {
01349   --numAcksOutstanding;
01350   if ( sendAtomsThread ) {
01351     CthAwaken(sendAtomsThread);
01352     sendAtomsThread = 0;
01353   }
01354 }
01355 
01356 void ParallelIOMgr::recvAtomsToHomePatchProcs(MovePatchAtomsMsg *msg)
01357 {
01358     CProxy_ParallelIOMgr pIO(thisgroup);
01359     pIO[msg->from].ackAtomsToHomePatchProcs();
01360 
01361     if(!isOKToRecvHPAtoms) {
01362         prepareHomePatchAtomList();
01363         isOKToRecvHPAtoms = true;
01364     }
01365 
01366     int numRecvPatches = msg->patchCnt;
01367     int aid = 0;
01368     for(int i=0; i<numRecvPatches; i++) {
01369         int pid = msg->pidList[i];
01370         int size = msg->sizeList[i];
01371         int idx = binaryFindHPID(pid);
01372         for(int j=0; j<size; j++, aid++) {
01373             hpAtomsList[idx].add(msg->allAtoms[aid]);
01374         }
01375     }
01376     //CkPrintf("Pe %d recvAtomsToHomePatchProcs for %d patches %d atoms\n",CkMyPe(),numRecvPatches,aid);
01377     delete msg;
01378 }
01379 
01380 void ParallelIOMgr::prepareHomePatchAtomList()
01381 {
01382     PatchMap *patchMap = PatchMap::Object();
01383     for(int i=0; i<patchMap->numPatches(); i++) {
01384         if(patchMap->node(i)==CkMyPe()) {
01385             hpIDList.add(i);
01386         }
01387     }
01388     if(hpIDList.size()>0)
01389         hpAtomsList = new FullAtomList[hpIDList.size()];
01390 }
01391 
01392 int ParallelIOMgr::binaryFindHPID(int pid)
01393 {
01394     //hpIDList should be in increasing order!!
01395     int lIdx, rIdx;
01396     int retIdx = -1;
01397 
01398     rIdx=0;
01399     lIdx=hpIDList.size()-1;
01400 
01401     while(rIdx<=lIdx ) {
01402         int idx = (rIdx+lIdx)/2;
01403         int curPid = hpIDList[idx];
01404         if(pid>curPid) {
01405             //in the left
01406             rIdx = idx+1;
01407         } else if(pid<curPid) {
01408             //in the right
01409             lIdx = idx-1;
01410         } else {
01411             //found!
01412             retIdx = idx;
01413             break;
01414         }
01415     }
01416     CmiAssert(retIdx!=-1);
01417     return retIdx;
01418 }
01419 
01420 void ParallelIOMgr::createHomePatches()
01421 {
01422 #ifdef MEM_OPT_VERSION
01423 
01424     int assignedPids = PatchMap::Object()->numPatchesOnNode(CkMyPe());
01425     int numPids = hpIDList.size();
01426     if(numPids==0){
01427         //this node actually contains no homepatches
01428         if(assignedPids == 0) return; 
01429 
01430         //Entering the rare condition that all the homepatches this node has
01431         //are empty so that "recvAtomsToHomePatchProcs" is never called!
01432         //But we still need to create those empty homepatches!
01433         CmiAssert(isOKToRecvHPAtoms == false);        
01434         PatchMap *patchMap = PatchMap::Object();
01435         CProxy_PatchMgr pm(CkpvAccess(BOCclass_group).patchMgr);
01436         PatchMgr *patchMgr = pm.ckLocalBranch();
01437         for(int i=0; i<patchMap->numPatches(); i++) {
01438             if(patchMap->node(i)==CkMyPe()) {
01439                 FullAtomList emptyone;
01440                 patchMgr->createHomePatch(i, emptyone);
01441             }
01442         }        
01443         return;
01444     }
01445 
01446     CProxy_PatchMgr pm(CkpvAccess(BOCclass_group).patchMgr);
01447     PatchMgr *patchMgr = pm.ckLocalBranch();
01448 
01449     //go through the home patch list
01450     for(int i=0; i<numPids; i++) {
01451         int pid = hpIDList[i];
01452 
01453         //re-sort the atom list of this patch
01454         std::sort(hpAtomsList[i].begin(), hpAtomsList[i].end());
01455         Node::Object()->workDistrib->fillAtomListForOnePatch(pid, hpAtomsList[i]);
01456         patchMgr->createHomePatch(pid, hpAtomsList[i]);
01457     }
01458 
01459     hpIDList.clear();   
01460     delete [] hpAtomsList;
01461 
01462     hpAtomsList = NULL;
01463 #endif
01464 }
01465 
01466 void ParallelIOMgr::freeMolSpace()
01467 {
01468 #ifdef MEM_OPT_VERSION
01469     molecule->delAtomNames();
01470     molecule->delChargeSpace();
01471 
01472     //???TODO NOT SURE WHETHER freeEnergyOn is support in MEM_OPT_VERSION
01473     //-CHAOMEI
01474     if(!CkMyPe() && !simParameters->freeEnergyOn)
01475         molecule->delMassSpace();
01476 
01477     molecule->delFixedAtoms();
01478 #endif
01479 }
01480 
01481 //The initial distribution of atoms:
01482 //(avgNum+1),...,(avgNum+1),avgNum,...,avgNum where
01483 //avgNum equals to the total number of atoms in the system
01484 //divided by number of input processors --Chao Mei
01485 int ParallelIOMgr::numMyAtoms(int rank, int numProcs)
01486 {
01487     if(rank==-1) return -1;
01488     int avgNum = molecule->numAtoms/numProcs;
01489     int remainder = molecule->numAtoms%numProcs;
01490     if(rank<remainder) return avgNum+1;
01491     else return avgNum;
01492 }
01493 
01494 int ParallelIOMgr::atomRank(int atomID, int numProcs)
01495 {
01496     int avgNum = molecule->numAtoms/numProcs;
01497     int remainder = molecule->numAtoms%numProcs;
01498     int midLimit = remainder*(avgNum+1);
01499     int idx;
01500     if(atomID<midLimit) {
01501         idx = atomID/(avgNum+1);
01502     } else {
01503         idx = remainder+(atomID-midLimit)/avgNum;
01504     }
01505     return idx;
01506 }
01507 
01508 void ParallelIOMgr::getMyAtomsRange(int &lowerIdx, int &upperIdx, int rank, int numProcs)
01509 {
01510     if(rank==-1) {
01511         //just to make sure upperIdx-lowerIdx+1 == -1
01512         lowerIdx=-1;
01513         upperIdx=-3;
01514         return;
01515     }
01516     int avgNum = molecule->numAtoms/numProcs;
01517     int remainder = molecule->numAtoms%numProcs;
01518     if(rank<remainder) {
01519         lowerIdx = rank*(avgNum+1);
01520         upperIdx = lowerIdx+avgNum;
01521     } else {
01522         int midLimit = remainder*(avgNum+1);
01523         lowerIdx = midLimit+(rank-remainder)*avgNum;
01524         upperIdx = lowerIdx+avgNum-1;
01525     }
01526 }
01527 
01528 void ParallelIOMgr::receivePositions(CollectVectorVarMsg *msg)
01529 {
01530 #ifdef MEM_OPT_VERSION
01531     int ready = midCM->receivePositions(msg);
01532     if(ready) {
01533         CProxy_CollectionMaster cm(mainMaster);
01534         cm.receiveOutputPosReady(msg->seq);
01535     }
01536     delete msg;
01537 #endif
01538 }
01539 
01540 void ParallelIOMgr::receiveVelocities(CollectVectorVarMsg *msg)
01541 {
01542 #ifdef MEM_OPT_VERSION
01543     int ready = midCM->receiveVelocities(msg);
01544     if(ready) {
01545         CProxy_CollectionMaster cm(mainMaster);
01546         cm.receiveOutputVelReady(msg->seq);        
01547     }
01548     delete msg;
01549 #endif
01550 }
01551 
01552 void ParallelIOMgr::receiveForces(CollectVectorVarMsg *msg)
01553 {
01554 #ifdef MEM_OPT_VERSION
01555     int ready = midCM->receiveForces(msg);
01556     if(ready) {
01557         CProxy_CollectionMaster cm(mainMaster);
01558         cm.receiveOutputForceReady(msg->seq);        
01559     }
01560     delete msg;
01561 #endif
01562 }
01563 
01564 
01565 void ParallelIOMgr::disposePositions(int seq, double prevT)
01566 {
01567 #ifdef MEM_OPT_VERSION
01568         double iotime = CmiWallTimer();
01569     midCM->disposePositions(seq);
01570         iotime = CmiWallTimer()-iotime+prevT;
01571 
01572 #if OUTPUT_SINGLE_FILE    
01573         //Token-based file output
01574     if(myOutputRank == getMyOutputGroupHighestRank()) {
01575         //notify the CollectionMaster to start the next round
01576         CProxy_CollectionMaster cm(mainMaster);
01577         cm.startNextRoundOutputPos(iotime);
01578     } else {
01579         CProxy_ParallelIOMgr io(thisgroup);
01580         io[outputProcArray[myOutputRank+1]].disposePositions(seq, iotime);
01581     }
01582 #else
01583         //notify the CollectionMaster to start the next round
01584         CProxy_CollectionMaster cm(mainMaster);
01585         cm.startNextRoundOutputPos(iotime);
01586 #endif
01587 
01588 #endif
01589 }
01590 
01591 void ParallelIOMgr::disposeVelocities(int seq, double prevT)
01592 {
01593 #ifdef MEM_OPT_VERSION
01594         double iotime = CmiWallTimer();
01595     midCM->disposeVelocities(seq);
01596         iotime = CmiWallTimer()-iotime+prevT;
01597     
01598 #if OUTPUT_SINGLE_FILE
01599         //Token-based file output
01600     if(myOutputRank==getMyOutputGroupHighestRank()) {
01601         //notify the CollectionMaster to start the next round
01602         CProxy_CollectionMaster cm(mainMaster);
01603         cm.startNextRoundOutputVel(iotime);
01604     } else {
01605         CProxy_ParallelIOMgr io(thisgroup);
01606         io[outputProcArray[myOutputRank+1]].disposeVelocities(seq, iotime);
01607     }
01608 #else
01609         //notify the CollectionMaster to start the next round
01610         CProxy_CollectionMaster cm(mainMaster);
01611         cm.startNextRoundOutputVel(iotime);     
01612 #endif
01613 
01614 #endif
01615 }
01616 
01617 void ParallelIOMgr::disposeForces(int seq, double prevT)
01618 {
01619 #ifdef MEM_OPT_VERSION
01620         double iotime = CmiWallTimer();
01621     midCM->disposeForces(seq);
01622         iotime = CmiWallTimer()-iotime+prevT;
01623     
01624 #if OUTPUT_SINGLE_FILE
01625         //Token-based file output
01626     if(myOutputRank==getMyOutputGroupHighestRank()) {
01627         //notify the CollectionMaster to start the next round
01628         CProxy_CollectionMaster cm(mainMaster);
01629         cm.startNextRoundOutputForce(iotime);
01630     } else {
01631         CProxy_ParallelIOMgr io(thisgroup);
01632         io[outputProcArray[myOutputRank+1]].disposeForces(seq, iotime);
01633     }
01634 #else
01635         //notify the CollectionMaster to start the next round
01636         CProxy_CollectionMaster cm(mainMaster);
01637         cm.startNextRoundOutputForce(iotime);   
01638 #endif
01639 
01640 #endif
01641 }
01642 
01643 
01644 void ParallelIOMgr::wrapCoor(int seq, Lattice lat)
01645 {
01646 #ifdef MEM_OPT_VERSION
01647     coorInstance = midCM->getReadyPositions(seq);
01648 
01649     coorInstance->lattice = lat; //record the lattice to use for wrapAll/Water!
01650     int fromAtomID = coorInstance->fromAtomID;
01651     int toAtomID = coorInstance->toAtomID;
01652 
01653     //only reference copies
01654     ResizeArray<Vector> &data = coorInstance->data;
01655     ResizeArray<FloatVector> &fdata = coorInstance->fdata;
01656     //if both data and fdata are not empty, they contain exact values, the only
01657     //difference lies in their precisions. Therefore, we only need to compute 
01658     //the higher precision coordinate array. -Chao Mei
01659     int dsize = data.size();    
01660     int numMyAtoms = toAtomID-fromAtomID+1;
01661     tmpCoorCon = new Vector[numMyAtoms];    
01662     ClusterCoorElem one;
01663     //1. compute wrapped coordinates locally 
01664     for(int i=0; i<numMyAtoms; i++){
01665         tmpCoorCon[i] = 0.0;
01666         int cid = clusterID[i];
01667         if(cid<fromAtomID){
01668             //on output procs ahead of me
01669             one.clusterId = cid;
01670             ClusterCoorElem *ret = remoteCoors.find(one);
01671             if(ret==NULL){
01672                 if(dsize==0) 
01673                     one.dsum = fdata[i];
01674                 else 
01675                     one.dsum = data[i];
01676                                 
01677                 remoteCoors.add(one);                 
01678             }else{
01679                 if(dsize==0) 
01680                     ret->dsum += fdata[i];
01681                 else 
01682                     ret->dsum += data[i];               
01683             }
01684         }else{
01685             if(dsize==0) 
01686                 tmpCoorCon[cid-fromAtomID] += fdata[i];
01687             else 
01688                 tmpCoorCon[cid-fromAtomID] += data[i];
01689         }
01690     }
01691 
01692     //2. Prepare to send msgs to remote output procs to reduce coordinates 
01693     //values of a cluster
01694     CmiAssert(numRemoteClusters == remoteCoors.size());
01695     numCSMAck = 0; //set to 0 to prepare recving the final coor update
01696     CProxy_ParallelIOMgr pIO(thisgroup);
01697     ClusterCoorSetIter iter(remoteCoors);
01698     for(iter=iter.begin(); iter!=iter.end(); iter++){
01699         ClusterCoorMsg *msg = new ClusterCoorMsg;
01700         msg->srcRank = myOutputRank;
01701         msg->clusterId = iter->clusterId;
01702         msg->dsum = iter->dsum;
01703         int dstRank = atomRankOnOutput(iter->clusterId);
01704         pIO[outputProcArray[dstRank]].recvClusterCoor(msg);
01705     }
01706     
01707     //Just send a local NULL msg to indicate the local wrapping
01708     //coordinates has finished.
01709     recvClusterCoor(NULL);
01710 #endif
01711 }
01712 
01713 //On the output proc, it's possible (could be a rare case) that recvClusterCoor
01714 //is executed before wrapCoor, so we need to make sure the local tmpCoorCon has
01715 //been calculated. This is why (numRemoteReqs+1) msgs are expected as the 
01716 //additional one is sent by itself when it finishes wrapping coordinates.
01717 // --Chao Mei
01718 void ParallelIOMgr::recvClusterCoor(ClusterCoorMsg *msg){
01719     //only add the msg from remote procs
01720     if(msg!=NULL) ccmBuf.add(msg);
01721 
01722     //include a msg sent by itself
01723     if(++numReqRecved == (numRemoteReqs+1)){
01724         numReqRecved = 0;
01725         integrateClusterCoor();
01726     }
01727 }
01728 
01729 void ParallelIOMgr::integrateClusterCoor(){
01730 #ifdef MEM_OPT_VERSION
01731     int fromIdx = coorInstance->fromAtomID;
01732     int toIdx = coorInstance->toAtomID;
01733     for(int i=0; i<ccmBuf.size(); i++){
01734         ClusterCoorMsg *msg = ccmBuf[i];
01735         int lidx = msg->clusterId - fromIdx;        
01736         tmpCoorCon[lidx] += msg->dsum;
01737     }
01738 
01739     //send back those msgs
01740     CProxy_ParallelIOMgr pIO(thisgroup);
01741     for(int i=0; i<ccmBuf.size(); i++){
01742         ClusterCoorMsg *msg = ccmBuf[i];
01743         int lidx = msg->clusterId - fromIdx;        
01744         if(simParameters->wrapAll || isWater[lidx]) {
01745             Lattice *lat = &(coorInstance->lattice);
01746             Vector coni = tmpCoorCon[lidx]/clusterSize[lidx];
01747             msg->dsum = (simParameters->wrapNearest ?
01748                   lat->wrap_nearest_delta(coni) : lat->wrap_delta(coni));
01749         }else{
01750             msg->dsum = 0.0;
01751         }
01752         pIO[outputProcArray[msg->srcRank]].recvFinalClusterCoor(msg);
01753     }
01754     ccmBuf.resize(0);    
01755 
01756     //It's possible that recvFinalClusterCoor is executed before integrateClusterCoor
01757     //on this processor (the other proc executes faster in doing wrapCoor). So avoid
01758     //this msg race, do send a message to itself to participate the reduction.
01759     if(numRemoteClusters!=0){
01760         recvFinalClusterCoor(NULL);
01761     } else {
01762         //this output proc is ready has the sum of coordinates of each cluster
01763         //on it, so it is ready to do the final wrap coor computation        
01764         int numMyAtoms = toIdx-fromIdx+1;
01765         ResizeArray<Vector> &data = coorInstance->data;
01766         ResizeArray<FloatVector> &fdata = coorInstance->fdata;
01767         for(int i=0; i<numMyAtoms; i++){
01768             if(!simParameters->wrapAll && !isWater[i]) continue;
01769             int lidx = clusterID[i]-fromIdx;
01770             if(lidx==i){
01771                 //the head atom of the cluster
01772                 Lattice *lat = &(coorInstance->lattice);
01773                 Vector coni = tmpCoorCon[lidx]/clusterSize[lidx];
01774                 tmpCoorCon[lidx] = (simParameters->wrapNearest ?
01775                   lat->wrap_nearest_delta(coni) : lat->wrap_delta(coni));
01776             }
01777             if(data.size()) data[i] += tmpCoorCon[lidx]; 
01778             //no operator += (FloatVector, Vector)
01779             if(fdata.size()) fdata[i] = fdata[i] + tmpCoorCon[lidx]; 
01780         }
01781         
01782         delete [] tmpCoorCon;
01783         tmpCoorCon = NULL;
01784         CProxy_CollectionMaster cm(mainMaster);
01785         cm.wrapCoorFinished();
01786     }
01787 #endif    
01788 }
01789 
01790 void ParallelIOMgr::recvFinalClusterCoor(ClusterCoorMsg *msg){
01791 #ifdef MEM_OPT_VERSION
01792     if(msg!=NULL){
01793         //only process the message sent from other procs!
01794         ClusterCoorElem one(msg->clusterId);
01795         ClusterCoorElem *ret = remoteCoors.find(one);
01796         ret->dsum = msg->dsum;
01797         delete msg;
01798     }
01799     
01800     if(++numCSMAck == (numRemoteClusters+1)){        
01801         //final wrap coor computation
01802         int fromIdx = coorInstance->fromAtomID;
01803         int toIdx = coorInstance->toAtomID;
01804         int numMyAtoms = toIdx-fromIdx+1;
01805         ResizeArray<Vector> &data = coorInstance->data;
01806         ResizeArray<FloatVector> &fdata = coorInstance->fdata;
01807         ClusterCoorElem tmp;
01808         for(int i=0; i<numMyAtoms; i++){
01809             if(!simParameters->wrapAll && !isWater[i]) continue;
01810             int cid = clusterID[i];
01811             int lidx = cid-fromIdx;
01812             if(lidx<0){
01813                 //this cid should be inside remoteCoors
01814                 tmp.clusterId = cid;
01815                 ClusterCoorElem *fone = remoteCoors.find(tmp);
01816                 if(data.size()) data[i] += fone->dsum; 
01817                 if(fdata.size()) fdata[i] = fdata[i] + fone->dsum; 
01818             }else{
01819                 if(lidx==i){
01820                     Lattice *lat = &(coorInstance->lattice);
01821                     Vector coni = tmpCoorCon[lidx]/clusterSize[lidx];
01822                     tmpCoorCon[lidx] = (simParameters->wrapNearest ?
01823                     lat->wrap_nearest_delta(coni) : lat->wrap_delta(coni));
01824                 }
01825                 if(data.size()) data[i] += tmpCoorCon[lidx]; 
01826                 if(fdata.size()) fdata[i] = fdata[i] + tmpCoorCon[lidx];
01827             }
01828         }
01829 
01830         delete [] tmpCoorCon;
01831         tmpCoorCon = NULL;
01832         CProxy_CollectionMaster cm(mainMaster);
01833         cm.wrapCoorFinished();
01834         numCSMAck = 0;
01835         remoteCoors.clear();
01836     }
01837 #endif
01838 }
01839 #include "ParallelIOMgr.def.h"

Generated on Sat Sep 23 01:17:14 2017 for NAMD by  doxygen 1.4.7