00001
00007 #include <string.h>
00008 #include "Communicate.h"
00009 #include "MStream.h"
00010 #include "converse.h"
00011
00012 #define MIN_DEBUG_LEVEL 2
00013
00014 #include "Debug.h"
00015
00016 struct StreamMessage {
00017 char header[CmiMsgHeaderSizeBytes];
00018 int PE;
00019 int tag;
00020 size_t len;
00021 unsigned int index;
00022 unsigned int checksum;
00023 StreamMessage *next;
00024 char data[1];
00025 };
00026
00027 MIStream::MIStream(Communicate *c, int p, int t)
00028 {
00029 cobj = c;
00030 PE = p;
00031 tag = t;
00032 msg = (StreamMessage *) 0;
00033 early = (StreamMessage *) 0;
00034 currentIndex = 0;
00035 checksum = 0;
00036 }
00037
00038 MIStream::~MIStream()
00039 {
00040 if(msg!=0)
00041 CmiFree(msg);
00042 }
00043
00044 MOStream::MOStream(Communicate *c, int p, int t, size_t size)
00045 {
00046 cobj = c;
00047 PE = p;
00048 tag = t;
00049 bufLen = size;
00050 msgBuf = (StreamMessage *)CmiAlloc(sizeof(StreamMessage)+size);
00051 msgBuf->PE = CmiMyPe();
00052 msgBuf->tag = tag;
00053 msgBuf->len = 0;
00054 msgBuf->index = 0;
00055 msgBuf->next = (StreamMessage *)0;
00056 msgBuf->checksum = 0;
00057 }
00058
00059 MOStream::~MOStream()
00060 {
00061 if(msgBuf != 0)
00062 CmiFree(msgBuf);
00063 }
00064
00065 static int checkSum(StreamMessage *msg)
00066 {
00067 int checksum = 0;
00068 for ( size_t i=0; i < msg->len; i++ ) {
00069 checksum += (unsigned char) msg->data[i];
00070 }
00071 if ( checksum != msg->checksum ) {
00072 DebugM(5,"Error on " << msg->tag << ":" << msg->index <<
00073 " of length " << msg->len <<
00074 " with checksum " << ((int)checksum) <<
00075 " vs " << ((int)(msg->checksum)) <<"\n");
00076 NAMD_bug("MStream checksums do not agree!");
00077 }
00078 return 1;
00079 }
00080
00081 MIStream *MIStream::Get(char *buf, size_t len)
00082 {
00083 while(len) {
00084 if(msg==0) {
00085 if ( early && (early->index < currentIndex) ) {
00086 DebugM(3,"Duplicate message " << early->index <<
00087 " from Pe(" << msg->PE << ")" <<
00088 " on stack while waiting for " << currentIndex <<
00089 " from Pe(" << PE << ").\n");
00090 NAMD_bug("MIStream::Get - duplicate message on stack!");
00091 }
00092 if ( early && (early->index == currentIndex) ) {
00093 DebugM(2,"Popping message " << currentIndex << " from stack.\n");
00094 msg = early;
00095 early = early->next;
00096 msg->next = (StreamMessage *)0;
00097 } else {
00098 DebugM(1,"Receiving message.\n");
00099 msg = (StreamMessage *) cobj->getMessage(PE, tag);
00100 checkSum(msg);
00101 }
00102 while ( msg->index != currentIndex ) {
00103 if ( msg->index < currentIndex ) {
00104 DebugM(3,"Duplicate message " << msg->index <<
00105 " from Pe(" << msg->PE << ")" <<
00106 " received while waiting for " << currentIndex <<
00107 " from Pe(" << PE << ").\n");
00108 NAMD_bug("MIStream::Get - duplicate message received!");
00109 }
00110 DebugM(2,"Pushing message " << msg->index << " on stack.\n");
00111 if ( (! early) || (early->index > msg->index) ) {
00112 msg->next = early;
00113 early = msg;
00114 } else {
00115 StreamMessage *cur = early;
00116 while ( cur->next && (cur->next->index < msg->index) ) {
00117 cur = cur->next;
00118 }
00119 msg->next = cur->next;
00120 cur->next = msg;
00121 }
00122 DebugM(1,"Receiving message again.\n");
00123 msg = (StreamMessage *) cobj->getMessage(PE, tag);
00124 checkSum(msg);
00125 }
00126 currentPos = 0;
00127 currentIndex += 1;
00128 }
00129 if(currentPos+len <= msg->len) {
00130 memcpy(buf, &(msg->data[currentPos]), len);
00131 currentPos += len;
00132 len = 0;
00133 } else {
00134 size_t b = msg->len-currentPos;
00135 memcpy(buf, &(msg->data[currentPos]), b);
00136 len -= b;
00137 buf += b;
00138 currentPos += b;
00139 }
00140 if(currentPos == msg->len) {
00141 CmiFree(msg);
00142 msg = 0;
00143 }
00144 }
00145 return this;
00146 }
00147
00148 MOStream *MOStream::Put(char *buf, size_t len)
00149 {
00150 while(len) {
00151 if(msgBuf->len + len <= bufLen) {
00152 memcpy(&(msgBuf->data[msgBuf->len]), buf, len);
00153 msgBuf->len += len;
00154 len = 0;
00155 } else {
00156 size_t b = bufLen - msgBuf->len;
00157 memcpy(&(msgBuf->data[msgBuf->len]), buf, b);
00158 msgBuf->len = bufLen;
00159 if ( msgBuf->index && ! ((msgBuf->index) % 100) ) {
00160 DebugM(3,"Sending message " << msgBuf->index << ".\n");
00161 }
00162 msgBuf->checksum = 0;
00163 for ( size_t i=0; i < msgBuf->len; i++ ) {
00164 msgBuf->checksum += (unsigned char) msgBuf->data[i];
00165 }
00166 cobj->sendMessage(PE, (void *)msgBuf, bufLen+sizeof(StreamMessage)-1);
00167 msgBuf->len = 0;
00168 msgBuf->index += 1;
00169 len -= b;
00170 buf += b;
00171 }
00172 }
00173 return this;
00174 }
00175
00176 void MOStream::end(void)
00177 {
00178 if ( msgBuf->len == 0 ) return;
00179 if ( msgBuf->index && ! ((msgBuf->index) % 100) ) {
00180 DebugM(3,"Sending message " << msgBuf->index << ".\n");
00181 }
00182 msgBuf->checksum = 0;
00183 for ( size_t i=0; i < msgBuf->len; i++ ) {
00184 msgBuf->checksum += (unsigned char) msgBuf->data[i];
00185 }
00186 cobj->sendMessage(PE,(void*)msgBuf,msgBuf->len+sizeof(StreamMessage)-1);
00187 msgBuf->len = 0;
00188 msgBuf->index += 1;
00189 }
00190