MStream.C

Go to the documentation of this file.
00001 
00007 #include <string.h>
00008 #include "Communicate.h"
00009 #include "MStream.h"
00010 #include "converse.h"
00011 
00012 #define MIN_DEBUG_LEVEL 2
00013 //#define DEBUGM
00014 #include "Debug.h"
00015 
00016 struct StreamMessage {
00017   char header[CmiMsgHeaderSizeBytes];
00018   int PE;
00019   int tag;
00020   size_t len; // sizeof the data
00021   unsigned int index; // index of packet in stream
00022   unsigned int checksum;
00023   StreamMessage *next; // for linked list of early packets
00024   char data[1];
00025 };
00026 
00027 MIStream::MIStream(Communicate *c, int p, int t)
00028 {
00029   cobj = c;
00030   PE = p;
00031   tag = t;
00032   msg = (StreamMessage *) 0;
00033   early = (StreamMessage *) 0;
00034   currentIndex = 0;
00035   checksum = 0;
00036 }
00037 
00038 MIStream::~MIStream()
00039 {
00040   if(msg!=0)
00041     CmiFree(msg);
00042 }
00043 
00044 MOStream::MOStream(Communicate *c, int p, int t, size_t size)
00045 {
00046   cobj = c;
00047   PE = p;
00048   tag = t;
00049   bufLen = size;
00050   msgBuf = (StreamMessage *)CmiAlloc(sizeof(StreamMessage)+size);
00051   msgBuf->PE = CmiMyPe();
00052   msgBuf->tag = tag;
00053   msgBuf->len = 0;
00054   msgBuf->index = 0;
00055   msgBuf->next = (StreamMessage *)0;
00056   msgBuf->checksum = 0;
00057 }
00058 
00059 MOStream::~MOStream()
00060 {
00061   if(msgBuf != 0)
00062     CmiFree(msgBuf);
00063 }
00064 
00065 static int checkSum(StreamMessage *msg)
00066 {
00067   int checksum = 0;
00068   for ( size_t i=0; i < msg->len; i++ ) {
00069     checksum += (unsigned char) msg->data[i];
00070   }
00071   if ( checksum != msg->checksum ) {
00072     DebugM(5,"Error on " << msg->tag << ":" << msg->index <<
00073           " of length " << msg->len <<
00074           " with checksum " << ((int)checksum) <<
00075           " vs " << ((int)(msg->checksum)) <<"\n");
00076     NAMD_bug("MStream checksums do not agree!");
00077   }
00078   return 1;
00079 }
00080 
00081 MIStream *MIStream::Get(char *buf, size_t len)
00082 {
00083   while(len) {
00084     if(msg==0) {
00085       if ( early && (early->index < currentIndex) ) {
00086           DebugM(3,"Duplicate message " << early->index <<
00087             " from Pe(" << msg->PE << ")" <<
00088             " on stack while waiting for " << currentIndex <<
00089             " from Pe(" << PE << ").\n");
00090           NAMD_bug("MIStream::Get - duplicate message on stack!");
00091       }
00092       if ( early && (early->index == currentIndex) ) {
00093         DebugM(2,"Popping message " << currentIndex << " from stack.\n");
00094         msg = early;
00095         early = early->next;
00096         msg->next = (StreamMessage *)0;
00097       } else {
00098         DebugM(1,"Receiving message.\n");
00099         msg = (StreamMessage *) cobj->getMessage(PE, tag);
00100         checkSum(msg);
00101       }
00102       while ( msg->index != currentIndex ) {
00103         if ( msg->index < currentIndex ) {
00104           DebugM(3,"Duplicate message " << msg->index <<
00105             " from Pe(" << msg->PE << ")" <<
00106             " received while waiting for " << currentIndex <<
00107             " from Pe(" << PE << ").\n");
00108           NAMD_bug("MIStream::Get - duplicate message received!");
00109         }
00110         DebugM(2,"Pushing message " << msg->index << " on stack.\n");
00111         if ( (! early) || (early->index > msg->index) ) {
00112           msg->next = early;
00113           early = msg;
00114         } else {
00115           StreamMessage *cur = early;
00116           while ( cur->next && (cur->next->index < msg->index) ) {
00117             cur = cur->next;
00118           }
00119           msg->next = cur->next;
00120           cur->next = msg;
00121         }
00122         DebugM(1,"Receiving message again.\n");
00123         msg = (StreamMessage *) cobj->getMessage(PE, tag);
00124         checkSum(msg);
00125       } 
00126       currentPos = 0;
00127       currentIndex += 1;
00128     }  // end of if (msg==0)
00129     if(currentPos+len <= msg->len) {
00130       memcpy(buf, &(msg->data[currentPos]), len);
00131       currentPos += len;
00132       len = 0;
00133     } else {
00134       size_t b = msg->len-currentPos;
00135       memcpy(buf, &(msg->data[currentPos]), b);
00136       len -= b;
00137       buf += b;
00138       currentPos += b;
00139     }
00140     if(currentPos == msg->len) {
00141       CmiFree(msg);
00142       msg = 0;
00143     }
00144   }
00145   return this;
00146 }
00147 
00148 MOStream *MOStream::Put(char *buf, size_t len)
00149 {
00150   while(len) {
00151     if(msgBuf->len + len <= bufLen) {
00152       memcpy(&(msgBuf->data[msgBuf->len]), buf, len);
00153       msgBuf->len += len;
00154       len = 0;
00155     } else {
00156       size_t b = bufLen - msgBuf->len;
00157       memcpy(&(msgBuf->data[msgBuf->len]), buf, b);
00158       msgBuf->len = bufLen;
00159       if ( msgBuf->index && ! ((msgBuf->index) % 100) ) {
00160         DebugM(3,"Sending message " << msgBuf->index << ".\n");
00161       }
00162       msgBuf->checksum = 0;
00163       for ( size_t i=0; i < msgBuf->len; i++ ) {
00164         msgBuf->checksum += (unsigned char) msgBuf->data[i];
00165       }
00166       cobj->sendMessage(PE, (void *)msgBuf, bufLen+sizeof(StreamMessage)-1);
00167       msgBuf->len = 0;
00168       msgBuf->index += 1;
00169       len -= b;
00170       buf += b;
00171     }
00172   }
00173   return this;
00174 }
00175 
00176 void MOStream::end(void)
00177 {
00178   if ( msgBuf->len == 0 ) return; // don't send empty message
00179   if ( msgBuf->index && ! ((msgBuf->index) % 100) ) {
00180     DebugM(3,"Sending message " << msgBuf->index << ".\n");
00181   }
00182   msgBuf->checksum = 0;
00183   for ( size_t i=0; i < msgBuf->len; i++ ) {
00184     msgBuf->checksum += (unsigned char) msgBuf->data[i];
00185   }
00186   cobj->sendMessage(PE,(void*)msgBuf,msgBuf->len+sizeof(StreamMessage)-1);
00187   msgBuf->len = 0;
00188   msgBuf->index += 1;
00189 }
00190 

Generated on Thu Sep 21 01:17:13 2017 for NAMD by  doxygen 1.4.7