00001
00007 #include <string.h>
00008 #include <stdlib.h>
00009 #include "Communicate.h"
00010 #include "MStream.h"
00011 #include "charm++.h"
00012
00013 CkpvStaticDeclare(CmmTable, CsmMessages);
00014 CkpvStaticDeclare(int, CsmAcks);
00015
00016 static void CsmHandler(void *msg)
00017 {
00018 if ( CmiMyRank() ) NAMD_bug("Communicate CsmHandler on non-rank-zero pe");
00019
00020 int *m = (int *) ((char *)msg+CmiMsgHeaderSizeBytes);
00021
00022 CmmPut(CkpvAccess(CsmMessages), 2, m, msg);
00023 }
00024
00025 static void CsmAckHandler(void *msg)
00026 {
00027 if ( CmiMyRank() ) NAMD_bug("Communicate CsmAckHandler on non-rank-zero pe");
00028 CmiFree(msg);
00029 CkpvAccess(CsmAcks) += 1;
00030 }
00031
00032 Communicate::Communicate(void)
00033 {
00034 CkpvInitialize(CmmTable, CsmMessages);
00035 CsmHandlerIndex = CmiRegisterHandler((CmiHandler) CsmHandler);
00036 CsmAckHandlerIndex = CmiRegisterHandler((CmiHandler) CsmAckHandler);
00037 CkpvAccess(CsmMessages) = CmmNew();
00038 if ( CmiMyNode() * 2 + 2 < CmiNumNodes() ) nchildren = 2;
00039 else if ( CmiMyNode() * 2 + 1 < CmiNumNodes() ) nchildren = 1;
00040 else nchildren = 0;
00041 CkpvInitialize(int, CsmAcks);
00042 CkpvAccess(CsmAcks) = nchildren;
00043 }
00044
00045
00046 Communicate::~Communicate(void)
00047 {
00048
00049 }
00050
00051 MIStream *Communicate::newInputStream(int PE, int tag)
00052 {
00053 MIStream *st = new MIStream(this, PE, tag);
00054 return st;
00055 }
00056
00057 MOStream *Communicate::newOutputStream(int PE, int tag, unsigned int bufSize)
00058 {
00059 MOStream *st = new MOStream(this, PE, tag, bufSize);
00060 return st;
00061 }
00062
00063 void *Communicate::getMessage(int PE, int tag)
00064 {
00065 if ( CmiMyRank() ) NAMD_bug("Communicate::getMessage called on non-rank-zero Pe\n");
00066
00067 int itag[2], rtag[2];
00068 void *msg;
00069
00070 itag[0] = (PE==(-1)) ? (CmmWildCard) : PE;
00071 itag[1] = (tag==(-1)) ? (CmmWildCard) : tag;
00072 while((msg=CmmGet(CkpvAccess(CsmMessages),2,itag,rtag))==0) {
00073 CmiDeliverMsgs(0);
00074 }
00075
00076 char *ackmsg = (char *) CmiAlloc(CmiMsgHeaderSizeBytes);
00077 CmiSetHandler(ackmsg, CsmAckHandlerIndex);
00078 CmiSyncSend(CmiNodeFirst((CmiMyNode()-1)/2), CmiMsgHeaderSizeBytes, ackmsg);
00079
00080 while ( CkpvAccess(CsmAcks) < nchildren ) {
00081 CmiDeliverMsgs(0);
00082 }
00083 CkpvAccess(CsmAcks) = 0;
00084
00085 int size = SIZEFIELD(msg);
00086 for ( int i = 2; i >= 1; --i ) {
00087 int node = CmiMyNode() * 2 + i;
00088 if ( node < CmiNumNodes() ) {
00089 CmiSyncSend(CmiNodeFirst(node),size,(char*)msg);
00090 }
00091 }
00092
00093 return msg;
00094 }
00095
00096 void Communicate::sendMessage(int PE, void *msg, int size)
00097 {
00098 if ( CmiMyPe() ) NAMD_bug("Communicate::sendMessage not from Pe 0");
00099
00100 while ( CkpvAccess(CsmAcks) < nchildren ) {
00101 CmiDeliverMsgs(0);
00102 }
00103 CkpvAccess(CsmAcks) = 0;
00104
00105 CmiSetHandler(msg, CsmHandlerIndex);
00106 switch(PE) {
00107 case ALL:
00108 NAMD_bug("Unexpected Communicate::sendMessage(ALL,...)");
00109
00110 break;
00111 case ALLBUTME:
00112
00113 if ( CmiNumNodes() > 2 ) {
00114 CmiSyncSend(CmiNodeFirst(2),size,(char*)msg);
00115 }
00116 if ( CmiNumNodes() > 1 ) {
00117 CmiSyncSend(CmiNodeFirst(1),size,(char*)msg);
00118 }
00119 break;
00120 default:
00121 NAMD_bug("Unexpected Communicate::sendMessage(PEL,...)");
00122
00123 break;
00124 }
00125 }