Communicate.C

Go to the documentation of this file.
00001 
00007 #include <string.h>
00008 #include <stdlib.h>
00009 #include "Communicate.h"
00010 #include "MStream.h"
00011 #include "charm++.h"
00012 
00013 CkpvStaticDeclare(CmmTable, CsmMessages);
00014 CkpvStaticDeclare(int, CsmAcks);
00015 
00016 static void CsmHandler(void *msg)
00017 {
00018   if ( CmiMyRank() ) NAMD_bug("Communicate CsmHandler on non-rank-zero pe");
00019   // get start of user message
00020   int *m = (int *) ((char *)msg+CmiMsgHeaderSizeBytes);
00021   // sending node  & tag act as tags
00022   CmmPut(CkpvAccess(CsmMessages), 2, m, msg);
00023 }
00024 
00025 static void CsmAckHandler(void *msg)
00026 {
00027   if ( CmiMyRank() ) NAMD_bug("Communicate CsmAckHandler on non-rank-zero pe");
00028   CmiFree(msg);
00029   CkpvAccess(CsmAcks) += 1;
00030 }
00031 
00032 Communicate::Communicate(void) 
00033 {
00034   CkpvInitialize(CmmTable, CsmMessages);
00035   CsmHandlerIndex = CmiRegisterHandler((CmiHandler) CsmHandler);
00036   CsmAckHandlerIndex = CmiRegisterHandler((CmiHandler) CsmAckHandler);
00037   CkpvAccess(CsmMessages) = CmmNew();
00038 
00039   int parent_node = 0;
00040   int self = CkMyNode();
00041   int range_begin = 0;
00042   int range_end = CkNumNodes();
00043   while ( self != range_begin ) {
00044     parent_node = range_begin;
00045     ++range_begin;
00046     int split = range_begin + ( range_end - range_begin ) / 2;
00047     if ( self < split ) { range_end = split; }
00048     else { range_begin = split; }
00049   }
00050   int send_near = self + 1;
00051   int send_far = send_near + ( range_end - send_near ) / 2;
00052 
00053   parent = CkNodeFirst(parent_node);
00054   nchildren = 0;
00055   if ( send_far < range_end ) children[nchildren++] = CkNodeFirst(send_far);
00056   if ( send_near < send_far ) children[nchildren++] = CkNodeFirst(send_near);
00057 
00058   CkpvInitialize(int, CsmAcks);
00059   CkpvAccess(CsmAcks) = nchildren;
00060 
00061   ackmsg = (char *) CmiAlloc(CmiMsgHeaderSizeBytes);
00062   CmiSetHandler(ackmsg, CsmAckHandlerIndex);
00063 }
00064 
00065 
00066 Communicate::~Communicate(void) 
00067 {
00068   CmiFree(ackmsg);
00069 }
00070 
00071 MIStream *Communicate::newInputStream(int PE, int tag)
00072 {
00073   MIStream *st = new MIStream(this, PE, tag);
00074   return st;
00075 }
00076 
00077 MOStream *Communicate::newOutputStream(int PE, int tag, unsigned int bufSize)
00078 {
00079   MOStream *st = new MOStream(this, PE, tag, bufSize);
00080   return st;
00081 }
00082 
00083 void *Communicate::getMessage(int PE, int tag)
00084 {
00085   if ( CmiMyRank() ) NAMD_bug("Communicate::getMessage called on non-rank-zero Pe\n");
00086 
00087   int itag[2], rtag[2];
00088   void *msg;
00089 
00090   itag[0] = (PE==(-1)) ? (CmmWildCard) : PE;
00091   itag[1] = (tag==(-1)) ? (CmmWildCard) : tag;
00092   while((msg=CmmGet(CkpvAccess(CsmMessages),2,itag,rtag))==0) {
00093     CmiDeliverMsgs(0);
00094     // CmiDeliverSpecificMsg(CsmHandlerIndex);
00095   }
00096 
00097   CmiSyncSend(parent, CmiMsgHeaderSizeBytes, ackmsg);
00098 
00099   while ( CkpvAccess(CsmAcks) < nchildren ) {
00100     CmiDeliverMsgs(0);
00101     // CmiDeliverSpecificMsg(CsmAckHandlerIndex);
00102   }
00103   CkpvAccess(CsmAcks) = 0;
00104 
00105   int size = SIZEFIELD(msg);
00106   for ( int i = 0; i < nchildren; ++i ) {
00107     CmiSyncSend(children[i],size,(char*)msg);
00108   }
00109 
00110   return msg;
00111 }
00112 
00113 void Communicate::sendMessage(int PE, void *msg, int size)
00114 {
00115   if ( CmiMyPe() ) NAMD_bug("Communicate::sendMessage not from Pe 0");
00116 
00117   while ( CkpvAccess(CsmAcks) < nchildren ) {
00118     CmiDeliverMsgs(0);
00119     // CmiDeliverSpecificMsg(CsmAckHandlerIndex);
00120   }
00121   CkpvAccess(CsmAcks) = 0;
00122 
00123   CmiSetHandler(msg, CsmHandlerIndex);
00124   switch(PE) {
00125     case ALL:
00126       NAMD_bug("Unexpected Communicate::sendMessage(ALL,...)");
00127       //CmiSyncBroadcastAll(size, (char *)msg);
00128       break;
00129     case ALLBUTME:
00130       //CmiSyncBroadcast(size, (char *)msg);
00131       for ( int i = 0; i < nchildren; ++i ) {
00132         CmiSyncSend(children[i],size,(char*)msg);
00133       }
00134       break;
00135     default:
00136       NAMD_bug("Unexpected Communicate::sendMessage(PEL,...)");
00137       //CmiSyncSend(PE, size, (char *)msg);
00138       break;
00139   }
00140 }

Generated on Mon Nov 20 01:17:10 2017 for NAMD by  doxygen 1.4.7