DataExchanger.C

Go to the documentation of this file.
00001 
00006 #include <stdio.h>
00007 #include "DataExchanger.h"
00008 #include "ProcessorPrivate.h"
00009 #include "common.h"
00010 #include "Node.h"
00011 #include "CollectionMaster.h"
00012 #include "Output.h"
00013 #include "ScriptTcl.h"
00014 #include "qd.h"
00015 
00016 #if CMK_HAS_PARTITION
00017 #ifdef CmiMyPartitionSize
00018 extern "C" {
00019   void setDefaultPartitionParams() {
00020     // if(!CmiMyNodeGlobal()) printf("NAMD setDefaultPartitionParams called\n");
00021     CmiSetPartitionScheme(3);  // recursive bisection
00022   }
00023 }
00024 #endif
00025 #endif
00026 
00027 static int recvRedCalledEarly;
00028 
00029 CpvDeclare(int, breakScheduler);
00030 CpvDeclare(int, inEval);
00031 
00032 //functions to receive and invoke chare's entry methods
00033 extern "C" {
00034   void packSend(int dst, int dstPart, const char *data, int size, int handler, int code) {
00035     int msgsize = sizeof(DataMessage) + size;
00036     DataMessage *dmsg = (DataMessage *)CmiAlloc(msgsize);
00037     dmsg->setMessage(data,CkMyPe(),CmiMyPartition(),size,handler,code);
00038 #if CMK_HAS_PARTITION
00039     CmiInterSyncSendAndFree(dst,dstPart,msgsize,(char*)dmsg);
00040 #else
00041     CmiSyncSendAndFree(dst,msgsize,(char*)dmsg);
00042 #endif
00043   }
00044 
00045   void sendReplicaDcdInit(int dstPart, ReplicaDcdInitMsg *msg, int msgsize) {
00046     CmiSetHandler(msg->core, CkpvAccess(recv_replica_dcd_init_idx));
00047 #if CMK_HAS_PARTITION
00048     CmiInterSyncSendAndFree(0,dstPart,msgsize,(char*)msg);
00049 #else
00050     CmiSyncSendAndFree(0,msgsize,(char*)msg);
00051 #endif
00052     CollectionMaster::Object()->blockPositions();  // ensure ordering
00053     CkpvAccess(_qd)->create();  // ensure completion
00054   }
00055 
00056   void sendReplicaDcdData(int dstPart, ReplicaDcdDataMsg *msg, int msgsize) {
00057     CmiSetHandler(msg->core, CkpvAccess(recv_replica_dcd_data_idx));
00058 #if CMK_HAS_PARTITION
00059     CmiInterSyncSendAndFree(0,dstPart,msgsize,(char*)msg);
00060 #else
00061     CmiSyncSendAndFree(0,msgsize,(char*)msg);
00062 #endif
00063     CollectionMaster::Object()->blockPositions();  // ensure ordering
00064     CkpvAccess(_qd)->create();  // ensure completion
00065   }
00066 
00067   void sendReplicaDcdAck(int dstPart, ReplicaDcdAckMsg *msg) {
00068     CmiSetHandler(msg->core, CkpvAccess(recv_replica_dcd_ack_idx));
00069     int msgsize = sizeof(ReplicaDcdAckMsg);
00070 #if CMK_HAS_PARTITION
00071     CmiInterSyncSendAndFree(0,dstPart,msgsize,(char*)msg);
00072 #else
00073     CmiSyncSendAndFree(0,msgsize,(char*)msg);
00074 #endif
00075   }
00076 
00077   void recvReplicaDcdInit(ReplicaDcdInitMsg *msg) {
00078     Node::Object()->output->recvReplicaDcdInit(msg);
00079     CmiFree(msg);
00080   }
00081 
00082   void recvReplicaDcdData(ReplicaDcdDataMsg *msg) {
00083     Node::Object()->output->recvReplicaDcdData(msg);
00084     CmiFree(msg);
00085   }
00086 
00087   void recvReplicaDcdAck(ReplicaDcdAckMsg *msg) {
00088     CmiFree(msg);
00089     CollectionMaster::Object()->unblockPositions();
00090     CkpvAccess(_qd)->process();
00091   }
00092 
00093   void recvData(DataMessage *dmsg) {
00094     Pointer msg(dmsg);
00095     if ( CkpvAccess(BOCclass_group).dataExchanger.isZero() ) NAMD_bug("BOCgroup::dataExchanger is zero in recvData!");
00096     CPROXY_DE(CkpvAccess(BOCclass_group).dataExchanger)[CkMyPe()].recv_data(msg);
00097   }
00098 
00099   void recvAck(DataMessage *dmsg) {
00100     if ( CkpvAccess(BOCclass_group).dataExchanger.isZero() ) NAMD_bug("BOCgroup::dataExchanger is zero in recvAck!");
00101     CPROXY_DE(CkpvAccess(BOCclass_group).dataExchanger)[CkMyPe()].recv_ack();
00102     CmiFree(dmsg);
00103   }
00104 
00105   void recvBcast(DataMessage *dmsg) {
00106     if ( CkpvAccess(BOCclass_group).dataExchanger.isZero() ) NAMD_bug("BOCgroup::dataExchanger is zero in recvBcast!");
00107     CPROXY_DE(CkpvAccess(BOCclass_group).dataExchanger)[CkMyPe()].recv_bcast();
00108     CmiFree(dmsg);
00109   }
00110 
00111   void recvRed(DataMessage *dmsg) {
00112     if ( CkpvAccess(BOCclass_group).dataExchanger.isZero() ) {
00113       ++recvRedCalledEarly;
00114       CmiFree(dmsg);
00115       return;
00116     }
00117     CPROXY_DE(CkpvAccess(BOCclass_group).dataExchanger)[CkMyPe()].recv_red();
00118     CmiFree(dmsg);
00119   }
00120 
00121   void recvEvalCommand(DataMessage *dmsg) {
00122     Pointer msg(dmsg);
00123     if ( CkpvAccess(BOCclass_group).dataExchanger.isZero() ) NAMD_bug("BOCgroup::dataExchanger is zero in recvEvalCommand!");
00124     CPROXY_DE(CkpvAccess(BOCclass_group).dataExchanger)[CkMyPe()].recv_eval_command(msg);
00125   }
00126 
00127   void recvEvalResult(DataMessage *dmsg) {
00128     Pointer msg(dmsg);
00129     if ( CkpvAccess(BOCclass_group).dataExchanger.isZero() ) NAMD_bug("BOCgroup::dataExchanger is zero in recvEvalResult!");
00130     CPROXY_DE(CkpvAccess(BOCclass_group).dataExchanger)[CkMyPe()].recv_eval_result(msg);
00131   }
00132 
00133   void replica_send(char *sndbuf, int sendcount, int destPart, int destPE) {
00134     if ( CpvAccess(inEval) ) {
00135       packSend(destPE,destPart,sndbuf,sendcount,CkpvAccess(recv_data_idx),1);
00136       return;
00137     }
00138     Pointer sendPointer(sndbuf);
00139     CPROXY_DE(CkpvAccess(BOCclass_group).dataExchanger)[CkMyPe()].send(sendPointer,sendcount,destPart,destPE); 
00140     CpvAccess(breakScheduler) = 0;
00141     while(!CpvAccess(breakScheduler)) CsdSchedulePoll();
00142   }
00143 
00144   void replica_recv(DataMessage **precvMsg, int srcPart, int srcPE) {
00145     Pointer recvPointer((char *) precvMsg);
00146     CPROXY_DE(CkpvAccess(BOCclass_group).dataExchanger)[CkMyPe()].recv(recvPointer,srcPart,srcPE);
00147     CpvAccess(breakScheduler) = 0;
00148     while(!CpvAccess(breakScheduler)) CsdSchedulePoll();
00149   }
00150 
00151   void replica_sendRecv(char *sndbuf, int sendcount, int destPart, int destPE, DataMessage **precvMsg, int srcPart, int srcPE)  {
00152     Pointer sendPointer(sndbuf);
00153     Pointer recvPointer((char *) precvMsg);
00154     CPROXY_DE(CkpvAccess(BOCclass_group).dataExchanger)[CkMyPe()].sendRecv(sendPointer,sendcount,destPart,destPE,recvPointer,srcPart,srcPE);
00155     CpvAccess(breakScheduler) = 0;
00156     while(!CpvAccess(breakScheduler)) CsdSchedulePoll();
00157   }
00158 
00159   void replica_eval(char *cmdbuf, int targPart, int targPE, DataMessage **precvMsg) {
00160     Pointer sendPointer(cmdbuf);
00161     Pointer recvPointer((char *) precvMsg);
00162     int sendcount = strlen(cmdbuf) + 1;
00163     CPROXY_DE(CkpvAccess(BOCclass_group).dataExchanger)[CkMyPe()].eval(sendPointer,sendcount,targPart,targPE,recvPointer);
00164     CpvAccess(breakScheduler) = 0;
00165     while(!CpvAccess(breakScheduler)) CsdSchedulePoll();
00166   }
00167 
00168   void replica_barrier() {
00169     for ( ; recvRedCalledEarly > 0; --recvRedCalledEarly ) CPROXY_DE(CkpvAccess(BOCclass_group).dataExchanger)[CkMyPe()].recv_red();
00170     CPROXY_DE(CkpvAccess(BOCclass_group).dataExchanger)[CkMyPe()].barrier();
00171     CpvAccess(breakScheduler) = 0;
00172     while(!CpvAccess(breakScheduler)) CsdSchedulePoll();
00173   }
00174 
00175   void replica_bcast(char *buf, int count, int root) {
00176     if ( root != 0 ) NAMD_bug("replica_bcast root must be zero");
00177     int rank = CmiMyPartition();
00178     int size = CmiNumPartitions();
00179     int i;
00180     for ( i=1; i<size; i*=2 );
00181     for ( i/=2; i>0; i/=2 ) {
00182       if ( rank & (i - 1) ) continue;
00183       if ( rank & i ) {
00184         int src = rank - i;
00185         //CkPrintf("rank %d recv from %d\n", rank, src);
00186         DataMessage *recvMsg = NULL;
00187         replica_recv(&recvMsg, src, CkMyPe());
00188         if ( recvMsg == NULL ) NAMD_bug("recvMsg == NULL in replica_bcast");
00189         if ( recvMsg->size != count ) NAMD_bug("size != count in replica_bcast");
00190         memcpy(buf, recvMsg->data, count);
00191         CmiFree(recvMsg);
00192       } else {
00193         int dst = rank + i;
00194         if ( dst < size ) {
00195           //CkPrintf("rank %d send to %d\n", rank, dst);
00196           replica_send(buf, count, dst, CkMyPe());
00197         }
00198       }
00199     }
00200   }
00201 
00202   void replica_min_double(double *dat, int count) {
00203     int rank = CmiMyPartition();
00204     int size = CmiNumPartitions();
00205     for ( int i=1; i<size; i*=2 ) {
00206       if ( rank & i ) {
00207         int dst = rank - i;
00208         //CkPrintf("rank %d send to %d\n", rank, dst);
00209         replica_send((char*)dat, count * sizeof(double), dst, CkMyPe());
00210       } else {
00211         int src = rank + i;
00212         if ( src < size ) {
00213           //CkPrintf("rank %d recv from %d\n", rank, src);
00214           DataMessage *recvMsg = NULL;
00215           replica_recv(&recvMsg, src, CkMyPe());
00216           if ( recvMsg == NULL ) NAMD_bug("recvMsg == NULL in replica_bcast");
00217           if ( recvMsg->size != count * sizeof(double) ) NAMD_bug("size != count in replica_min_double");
00218           double *rdat = new double[count];
00219           memcpy(rdat, recvMsg->data, count * sizeof(double));
00220           CmiFree(recvMsg);
00221           for ( int j=0; j<count; ++j ) {
00222             if ( rdat[j] < dat[j] ) dat[j] = rdat[j];
00223           }
00224           delete [] rdat;
00225         }
00226       }
00227       if ( rank & (2 * i - 1) ) break;
00228     }
00229     replica_bcast((char*)dat, count * sizeof(double), 0);
00230   }
00231 } //endof extern C
00232 
00233 #if CMK_IMMEDIATE_MSG && CMK_SMP && ! ( CMK_MULTICORE || CMK_SMP_NO_COMMTHD )
00234 extern "C" void CmiPushImmediateMsg(void *msg);
00235 
00236 class SleepCommthdMsg {
00237   public:
00238   char core[CmiMsgHeaderSizeBytes];
00239 };
00240 
00241 void recvSleepCommthdMsg(SleepCommthdMsg *msg) {
00242   if ( CkMyRank() != CkMyNodeSize() ) NAMD_bug("recvSleepCommthdMsg called on PE instead of communication thread");
00243   usleep(1000);
00244   CmiDelayImmediate();  // re-enqueue for next cycle
00245 }
00246 #endif
00247 
00248 void initializeReplicaConverseHandlers() {
00249   CkpvInitialize(int, recv_data_idx);
00250   CkpvInitialize(int, recv_ack_idx);
00251   CkpvInitialize(int, recv_bcast_idx);
00252   CkpvInitialize(int, recv_red_idx);
00253   CkpvInitialize(int, recv_eval_command_idx);
00254   CkpvInitialize(int, recv_eval_result_idx);
00255   CkpvInitialize(int, recv_replica_dcd_init_idx);
00256   CkpvInitialize(int, recv_replica_dcd_data_idx);
00257   CkpvInitialize(int, recv_replica_dcd_ack_idx);
00258 
00259   CkpvAccess(recv_data_idx) = CmiRegisterHandler((CmiHandler)recvData);                   
00260   CkpvAccess(recv_ack_idx) = CmiRegisterHandler((CmiHandler)recvAck);                     
00261   CkpvAccess(recv_red_idx) = CmiRegisterHandler((CmiHandler)recvRed);                     
00262   CkpvAccess(recv_bcast_idx) = CmiRegisterHandler((CmiHandler)recvBcast);                 
00263   CkpvAccess(recv_eval_command_idx) = CmiRegisterHandler((CmiHandler)recvEvalCommand);    
00264   CkpvAccess(recv_eval_result_idx) = CmiRegisterHandler((CmiHandler)recvEvalResult);      
00265   CkpvAccess(recv_replica_dcd_init_idx) = CmiRegisterHandler((CmiHandler)recvReplicaDcdInit);
00266   CkpvAccess(recv_replica_dcd_data_idx) = CmiRegisterHandler((CmiHandler)recvReplicaDcdData);
00267   CkpvAccess(recv_replica_dcd_ack_idx) = CmiRegisterHandler((CmiHandler)recvReplicaDcdAck);
00268 
00269 #if CMK_IMMEDIATE_MSG && CMK_SMP && ! ( CMK_MULTICORE || CMK_SMP_NO_COMMTHD )
00270   int sleep_commthd_idx = CmiRegisterHandler((CmiHandler)recvSleepCommthdMsg);
00271   if ( CkMyPe() == 0 && CkNumNodes() == 1 ) {
00272     CkPrintf("Charm++ communication thread will sleep due to single-process run.\n");
00273     SleepCommthdMsg *msg = (SleepCommthdMsg *)malloc(sizeof(SleepCommthdMsg));
00274     CmiSetHandler(msg, sleep_commthd_idx);
00275     CmiBecomeImmediate(msg);
00276     CmiPushImmediateMsg(msg);
00277   }
00278 #endif
00279 }
00280 
00281 //======================================================================
00282 // Public functions
00283 //----------------------------------------------------------------------
00284 DataExchanger::DataExchanger()
00285 {
00286   CpvInitialize(int, breakScheduler);
00287   CpvAccess(breakScheduler) = 1;
00288   CpvInitialize(int, inEval);
00289   CpvAccess(inEval) = 0;
00290   if(CmiMyPartition() == 0) 
00291     parent = -1;
00292   else 
00293     parent = (CmiMyPartition()+1)/TREE_WIDTH - 1;
00294   firstChild = (CmiMyPartition()+1)*TREE_WIDTH - 1;
00295   numChildren = CmiNumPartitions() - firstChild;
00296   if(numChildren > TREE_WIDTH)
00297     numChildren = TREE_WIDTH;
00298   
00299   recv_data_idx = CkpvAccess(recv_data_idx);
00300   recv_ack_idx = CkpvAccess(recv_ack_idx);
00301   recv_red_idx = CkpvAccess(recv_red_idx);
00302   recv_bcast_idx = CkpvAccess(recv_bcast_idx);
00303   recv_eval_command_idx = CkpvAccess(recv_eval_command_idx);
00304   recv_eval_result_idx = CkpvAccess(recv_eval_result_idx);
00305 }
00306 
00307 //----------------------------------------------------------------------
00308 DataExchanger::~DataExchanger(void)
00309 { }
00310 
00311 
00312 #include "DataExchanger.def.h"

Generated on Sun Nov 19 01:17:13 2017 for NAMD by  doxygen 1.4.7