00001
00007 #ifndef COLLECTIONMASTER_H
00008 #define COLLECTIONMASTER_H
00009
00010 #include "charm++.h"
00011 #include "main.h"
00012 #include "NamdTypes.h"
00013 #include "Lattice.h"
00014 #include "ProcessorPrivate.h"
00015 #include "PatchMap.h"
00016 #include "PatchMap.inl"
00017 #include "CollectionMaster.decl.h"
00018 #include <stdio.h>
00019
00020 #include "Node.h"
00021 #include "ParallelIOMgr.decl.h"
00022 #include "ParallelIOMgr.h"
00023 #include "Output.h"
00024
00025 class CollectVectorMsg : public CMessage_CollectVectorMsg
00026 {
00027 public:
00028
00029 int seq;
00030 int aid_size;
00031 int data_size;
00032 int fdata_size;
00033 AtomID *aid;
00034 Vector *data;
00035 FloatVector *fdata;
00036 };
00037
00038 class DataStreamMsg;
00039
00040 class CollectionMaster : public Chare
00041 {
00042 public:
00043 static CollectionMaster *Object() {
00044 return CkpvAccess(CollectionMaster_instance);
00045 }
00046 CollectionMaster();
00047 ~CollectionMaster(void);
00048
00049 void receivePositions(CollectVectorMsg *msg);
00050 void receiveVelocities(CollectVectorMsg *msg);
00051 void receiveForces(CollectVectorMsg *msg);
00052
00053 void receiveDataStream(DataStreamMsg *msg);
00054
00055 void enqueuePositions(int seq, Lattice &lattice);
00056 void enqueueVelocities(int seq);
00057 void enqueueForces(int seq);
00058
00059 class CollectVectorInstance;
00060 void disposePositions(CollectVectorInstance *c);
00061 void disposeVelocities(CollectVectorInstance *c);
00062 void disposeForces(CollectVectorInstance *c);
00063
00065 void receiveOutputPosReady(int seq);
00066 void receiveOutputVelReady(int seq);
00067 void receiveOutputForceReady(int seq);
00068
00069 void startNextRoundOutputPos(double totalT);
00070 void startNextRoundOutputVel(double totalT);
00071 void startNextRoundOutputForce(double totalT);
00072
00073 void wrapCoorFinished();
00074
00075 enum OperationStatus {NOT_PROCESSED, IN_PROCESS, HAS_PROCESSED};
00077 #ifdef MEM_OPT_VERSION
00078 class CollectVectorInstance
00079 {
00080 public:
00081 CollectVectorInstance(void) : seq(-10) { ; }
00082
00083 CollectVectorInstance(int s) { reset(s); }
00084
00085 void free() { seq = -10; status = HAS_PROCESSED; }
00086 int notfree() { return ( seq != -10 ); }
00087
00088 void reset(int s) {
00089 if ( s == -10 ) NAMD_bug("seq == free in CollectionMaster");
00090 seq = s;
00091 CProxy_ParallelIOMgr io(CkpvAccess(BOCclass_group).ioMgr);
00092 ParallelIOMgr *ioMgr = io.ckLocalBranch();
00093 remaining = ioMgr->getNumOutputProcs();
00094 status = NOT_PROCESSED;
00095 }
00096
00097 void append(){ --remaining; }
00098
00099 int ready(void) { return ( ! remaining ); }
00100
00101 int seq;
00102 Lattice lattice;
00103
00104
00105
00106 OperationStatus status;
00107 private:
00108 int remaining;
00109
00110 };
00111
00112 class CollectVectorSequence
00113 {
00114 public:
00115
00116 void submitData(int seq){
00117 CollectVectorInstance **c = data.begin();
00118 CollectVectorInstance **c_e = data.end();
00119 for( ; c != c_e && (*c)->seq != seq; ++c );
00120 if ( c == c_e )
00121 {
00122 c = data.begin();
00123 for( ; c != c_e && (*c)->notfree(); ++c );
00124 if ( c == c_e ) {
00125 data.add(new CollectVectorInstance(seq));
00126 c = data.end() - 1;
00127 }
00128 (*c)->reset(seq);
00129 }
00130 (*c)->append();
00131 }
00132
00133 void enqueue(int seq, Lattice &lattice) {
00134 queue.add(seq);
00135 latqueue.add(lattice);
00136 }
00137
00138 CollectVectorInstance* removeReady(void)
00139 {
00140
00141
00142
00143
00144
00145
00146 return NULL;
00147 }
00148
00149
00150
00151 CollectVectorInstance* getReady(void)
00152 {
00153 CollectVectorInstance *o = 0;
00154 if ( queue.size() )
00155 {
00156 int seq = queue[0];
00157 CollectVectorInstance **c = data.begin();
00158 CollectVectorInstance **c_e = data.end();
00159 for( ; c != c_e && (*c)->seq != seq; ++c );
00160 if ( c != c_e && (*c)->ready() )
00161 {
00162 o = *c;
00163 o->lattice = latqueue[0];
00164 }
00165 }
00166 return o;
00167 }
00168
00169
00170
00171
00172 int removeFirstReady(){
00173 int seq = queue[0];
00174 queue.del(0,1);
00175 latqueue.del(0,1);
00176 return seq;
00177 }
00178
00179 ResizeArray<CollectVectorInstance*> data;
00180 ResizeArray<int> queue;
00181 ResizeArray<Lattice> latqueue;
00182 };
00183 #else
00184 class CollectVectorInstance
00185 {
00186 public:
00187
00188 CollectVectorInstance(void) : seq(-10) { ; }
00189
00190 CollectVectorInstance(int s) { reset(s); }
00191
00192 void free() { seq = -10; }
00193 int notfree() { return ( seq != -10 ); }
00194
00195 void reset(int s) {
00196 if ( s == -10 ) NAMD_bug("seq == free in CollectionMaster");
00197 seq = s;
00198 remaining = (PatchMap::Object())->numNodesWithPatches();
00199 data.resize(0);
00200 fdata.resize(0);
00201 }
00202
00203
00204 void append(CollectVectorMsg *msg, int max_index)
00205 {
00206 AtomID *a = msg->aid;
00207 Vector *d = msg->data;
00208 FloatVector *fd = msg->fdata;
00209 int size = msg->aid_size;
00210 if ( msg->data_size ) {
00211 data.resize(max_index);
00212 Vector *ptr = data.begin();
00213 for( int i = 0; i < size; ++i ) { ptr[a[i]] = d[i]; }
00214 }
00215 if ( msg->fdata_size ) {
00216 fdata.resize(max_index);
00217 FloatVector *ptr = fdata.begin();
00218 for( int i = 0; i < size; ++i ) { ptr[a[i]] = fd[i]; }
00219 }
00220 --remaining;
00221 }
00222
00223 int ready(void) { return ( ! remaining ); }
00224
00225 int seq;
00226 Lattice lattice;
00227
00228 ResizeArray<Vector> data;
00229 ResizeArray<FloatVector> fdata;
00230
00231 private:
00232 int remaining;
00233
00234 };
00235
00236 class CollectVectorSequence
00237 {
00238 public:
00239
00240 void submitData(CollectVectorMsg *msg, int max_index)
00241 {
00242 int seq = msg->seq;
00243 CollectVectorInstance **c = data.begin();
00244 CollectVectorInstance **c_e = data.end();
00245 for( ; c != c_e && (*c)->seq != seq; ++c );
00246 if ( c == c_e )
00247 {
00248 c = data.begin();
00249 for( ; c != c_e && (*c)->notfree(); ++c );
00250 if ( c == c_e ) {
00251 data.add(new CollectVectorInstance(seq));
00252 c = data.end() - 1;
00253 }
00254 (*c)->reset(seq);
00255 }
00256 (*c)->append(msg, max_index);
00257 }
00258
00259 void enqueue(int seq, Lattice &lattice) {
00260 queue.add(seq);
00261 latqueue.add(lattice);
00262 }
00263
00264 CollectVectorInstance* removeReady(void)
00265 {
00266 CollectVectorInstance *o = 0;
00267 if ( queue.size() )
00268 {
00269 int seq = queue[0];
00270 CollectVectorInstance **c = data.begin();
00271 CollectVectorInstance **c_e = data.end();
00272 for( ; c != c_e && (*c)->seq != seq; ++c );
00273 if ( c != c_e && (*c)->ready() )
00274 {
00275 o = *c;
00276 o->lattice = latqueue[0];
00277 queue.del(0,1);
00278 latqueue.del(0,1);
00279 }
00280 }
00281 return o;
00282 }
00283
00284 ResizeArray<CollectVectorInstance*> data;
00285 ResizeArray<int> queue;
00286 ResizeArray<Lattice> latqueue;
00287
00288 };
00289 #endif
00290 private:
00291
00292 CollectVectorSequence positions;
00293 CollectVectorSequence velocities;
00294 CollectVectorSequence forces;
00295 int posTimings, velTimings, forceTimings;
00296 FILE *dataStreamFile;
00297
00298 #ifdef MEM_OPT_VERSION
00299 int wrapCoorDoneCnt;
00300 ParOutput *parOut;
00301 double posOutTime;
00302 double velOutTime;
00303 double forceOutTime;
00304 double posIOTime;
00305 double velIOTime;
00306 double forceIOTime;
00307
00308
00309 int posDoneCnt;
00310 int velDoneCnt;
00311 int forceDoneCnt;
00312
00313 void checkPosReady();
00314 void checkVelReady();
00315 void checkForceReady();
00316 #endif
00317 };
00318
00319 class DataStreamMsg : public CMessage_DataStreamMsg {
00320 public:
00321
00322 ResizeArray<char> data;
00323
00324 static void* pack(DataStreamMsg* msg);
00325 static DataStreamMsg* unpack(void *ptr);
00326
00327 };
00328
00329
00330 class CollectVectorVarMsg : public CMessage_CollectVectorVarMsg
00331 {
00332 public:
00333 enum DataStatus {VectorValid, FloatVectorValid, BothValid};
00334 public:
00335 int seq;
00336 int size;
00337 DataStatus status;
00338 AtomID *aid;
00339 Vector *data;
00340 FloatVector *fdata;
00341 };
00342
00343 #ifdef MEM_OPT_VERSION
00344 class CollectMidVectorInstance{
00345 public:
00346
00347 CollectMidVectorInstance(void) : seq(-10) {
00348 CProxy_ParallelIOMgr io(CkpvAccess(BOCclass_group).ioMgr);
00349 ParallelIOMgr *ioMgr = io.ckLocalBranch();
00350 ioMgr->getMyAtomsRangeOnOutput(fromAtomID, toAtomID);
00351 }
00352
00353 CollectMidVectorInstance(int s) {
00354 CProxy_ParallelIOMgr io(CkpvAccess(BOCclass_group).ioMgr);
00355 ParallelIOMgr *ioMgr = io.ckLocalBranch();
00356 ioMgr->getMyAtomsRangeOnOutput(fromAtomID, toAtomID);
00357 reset(s);
00358 }
00359
00360 void free() { seq = -10; }
00361 int notfree() { return ( seq != -10 ); }
00362
00363 void reset(int s) {
00364 if ( s == -10 ) NAMD_bug("seq == free in CollectionMidMaster");
00365 seq = s;
00366 remaining = toAtomID-fromAtomID+1;
00367 data.resize(0);
00368 fdata.resize(0);
00369 }
00370
00371
00372
00373 int append(int size, AtomID *a, Vector *d, FloatVector *fd){
00374 if (d) {
00375 if ( size ) data.resize(toAtomID-fromAtomID+1);
00376 Vector *ptr = data.begin();
00377 for(int i = 0; i < size; ++i) { ptr[a[i]-fromAtomID] = d[i]; }
00378 }
00379 if (fd) {
00380 if ( size ) fdata.resize(toAtomID-fromAtomID+1);
00381 FloatVector *ptr = fdata.begin();
00382 for(int i = 0; i < size; ++i) { ptr[a[i]-fromAtomID] = fd[i]; }
00383 }
00384 remaining -= size;
00385
00386 return ready();
00387 }
00388
00389 int ready(void) { return ( ! remaining ); }
00390
00391 int seq;
00392 Lattice lattice;
00393 ResizeArray<Vector> data;
00394 ResizeArray<FloatVector> fdata;
00395
00396
00397 int fromAtomID;
00398 int toAtomID;
00399
00400 private:
00401 int remaining;
00402 };
00403
00404
00405
00406
00407
00408
00409
00410
00411 class CollectionMidMaster{
00412 public:
00413
00414 CollectionMidMaster(ParallelIOMgr *pIO_) : pIO(pIO_) { parOut = new ParOutput(pIO_->myOutputRank); }
00415 ~CollectionMidMaster(void) { delete parOut; }
00416
00417 int receivePositions(CollectVectorVarMsg *msg) {return positions.submitData(msg);}
00418 int receiveVelocities(CollectVectorVarMsg *msg) {return velocities.submitData(msg);}
00419 int receiveForces(CollectVectorVarMsg *msg) {return forces.submitData(msg);}
00420
00421 void disposePositions(int seq);
00422 void disposeVelocities(int seq);
00423 void disposeForces(int seq);
00424
00425 CollectMidVectorInstance *getReadyPositions(int seq) { return positions.getReady(seq); }
00426
00427
00428
00429 class CollectVectorSequence{
00430 public:
00431 int submitData(CollectVectorVarMsg *msg){
00432 int seq = msg->seq;
00433 CollectMidVectorInstance **c = data.begin();
00434 CollectMidVectorInstance **c_e = data.end();
00435 for( ; c != c_e && (*c)->seq != seq; ++c );
00436 if ( c == c_e ){
00437 c = data.begin();
00438 for( ; c != c_e && (*c)->notfree(); ++c );
00439 if ( c == c_e ) {
00440 data.add(new CollectMidVectorInstance(seq));
00441 c = data.end() - 1;
00442 }
00443 (*c)->reset(seq);
00444 }
00445 AtomID *i = msg->aid;
00446 Vector *d = msg->data;
00447 FloatVector *fd = msg->fdata;
00448 if(msg->status==CollectVectorVarMsg::VectorValid) {
00449 fd = NULL;
00450 }else if(msg->status==CollectVectorVarMsg::FloatVectorValid){
00451 d = NULL;
00452 }
00453 return (*c)->append(msg->size,i,d,fd);
00454 }
00455
00456 CollectMidVectorInstance* getReady(int seq){
00457 CollectMidVectorInstance **c = data.begin();
00458 CollectMidVectorInstance **c_e = data.end();
00459 for( ; c != c_e && (*c)->seq != seq; ++c );
00460 CmiAssert(c != c_e);
00461 return *c;
00462 }
00463
00464 ResizeArray<CollectMidVectorInstance*> data;
00465 };
00466
00467 private:
00468 CollectVectorSequence positions;
00469 CollectVectorSequence velocities;
00470 CollectVectorSequence forces;
00471 ParallelIOMgr *pIO;
00472 ParOutput *parOut;
00473 };
00474 #endif
00475
00476 #endif
00477