NAMD
MStream.C
Go to the documentation of this file.
1 
7 #include <string.h>
8 #include "Communicate.h"
9 #include "MStream.h"
10 #include "converse.h"
11 
12 #define MIN_DEBUG_LEVEL 2
13 //#define DEBUGM
14 #include "Debug.h"
15 
16 struct StreamMessage {
17  char header[CmiMsgHeaderSizeBytes];
18  int PE;
19  int tag;
20  size_t len; // sizeof the data
21  unsigned int index; // index of packet in stream
22  unsigned int checksum;
23  StreamMessage *next; // for linked list of early packets
24  char data[1];
25 };
26 
28 {
29  cobj = c;
30  PE = p;
31  tag = t;
32  msg = (StreamMessage *) 0;
33  early = (StreamMessage *) 0;
34  currentIndex = 0;
35  checksum = 0;
36 }
37 
39 {
40  if(msg!=0)
41  CmiFree(msg);
42 }
43 
44 MOStream::MOStream(Communicate *c, int p, int t, size_t size)
45 {
46  cobj = c;
47  PE = p;
48  tag = t;
49  bufLen = size;
50  msgBuf = (StreamMessage *)CmiAlloc(sizeof(StreamMessage)+size);
51  msgBuf->PE = CmiMyPe();
52  msgBuf->tag = tag;
53  msgBuf->len = 0;
54  msgBuf->index = 0;
55  msgBuf->next = (StreamMessage *)0;
56  msgBuf->checksum = 0;
57 }
58 
60 {
61  if(msgBuf != 0)
62  CmiFree(msgBuf);
63 }
64 
65 static int checkSum(StreamMessage *msg)
66 {
67  int checksum = 0;
68  for ( size_t i=0; i < msg->len; i++ ) {
69  checksum += (unsigned char) msg->data[i];
70  }
71  if ( checksum != msg->checksum ) {
72  DebugM(5,"Error on " << msg->tag << ":" << msg->index <<
73  " of length " << msg->len <<
74  " with checksum " << ((int)checksum) <<
75  " vs " << ((int)(msg->checksum)) <<"\n");
76  NAMD_bug("MStream checksums do not agree!");
77  }
78  return 1;
79 }
80 
81 MIStream *MIStream::Get(char *buf, size_t len)
82 {
83  while(len) {
84  if(msg==0) {
85  if ( early && (early->index < currentIndex) ) {
86  DebugM(3,"Duplicate message " << early->index <<
87  " from Pe(" << msg->PE << ")" <<
88  " on stack while waiting for " << currentIndex <<
89  " from Pe(" << PE << ").\n");
90  NAMD_bug("MIStream::Get - duplicate message on stack!");
91  }
92  if ( early && (early->index == currentIndex) ) {
93  DebugM(2,"Popping message " << currentIndex << " from stack.\n");
94  msg = early;
95  early = early->next;
96  msg->next = (StreamMessage *)0;
97  } else {
98  DebugM(1,"Receiving message.\n");
99  msg = (StreamMessage *) cobj->getMessage(PE, tag);
100  checkSum(msg);
101  }
102  while ( msg->index != currentIndex ) {
103  if ( msg->index < currentIndex ) {
104  DebugM(3,"Duplicate message " << msg->index <<
105  " from Pe(" << msg->PE << ")" <<
106  " received while waiting for " << currentIndex <<
107  " from Pe(" << PE << ").\n");
108  NAMD_bug("MIStream::Get - duplicate message received!");
109  }
110  DebugM(2,"Pushing message " << msg->index << " on stack.\n");
111  if ( (! early) || (early->index > msg->index) ) {
112  msg->next = early;
113  early = msg;
114  } else {
115  StreamMessage *cur = early;
116  while ( cur->next && (cur->next->index < msg->index) ) {
117  cur = cur->next;
118  }
119  msg->next = cur->next;
120  cur->next = msg;
121  }
122  DebugM(1,"Receiving message again.\n");
123  msg = (StreamMessage *) cobj->getMessage(PE, tag);
124  checkSum(msg);
125  }
126  currentPos = 0;
127  currentIndex += 1;
128  } // end of if (msg==0)
129  if(currentPos+len <= msg->len) {
130  memcpy(buf, &(msg->data[currentPos]), len);
131  currentPos += len;
132  len = 0;
133  } else {
134  size_t b = msg->len-currentPos;
135  memcpy(buf, &(msg->data[currentPos]), b);
136  len -= b;
137  buf += b;
138  currentPos += b;
139  }
140  if(currentPos == msg->len) {
141  CmiFree(msg);
142  msg = 0;
143  }
144  }
145  return this;
146 }
147 
148 MOStream *MOStream::Put(char *buf, size_t len)
149 {
150  while(len) {
151  if(msgBuf->len + len <= bufLen) {
152  memcpy(&(msgBuf->data[msgBuf->len]), buf, len);
153  msgBuf->len += len;
154  len = 0;
155  } else {
156  size_t b = bufLen - msgBuf->len;
157  memcpy(&(msgBuf->data[msgBuf->len]), buf, b);
158  msgBuf->len = bufLen;
159  if ( msgBuf->index && ! ((msgBuf->index) % 100) ) {
160  DebugM(3,"Sending message " << msgBuf->index << ".\n");
161  }
162  msgBuf->checksum = 0;
163  for ( size_t i=0; i < msgBuf->len; i++ ) {
164  msgBuf->checksum += (unsigned char) msgBuf->data[i];
165  }
166  cobj->sendMessage(PE, (void *)msgBuf, bufLen+sizeof(StreamMessage)-1);
167  msgBuf->len = 0;
168  msgBuf->index += 1;
169  len -= b;
170  buf += b;
171  }
172  }
173  return this;
174 }
175 
176 void MOStream::end(void)
177 {
178  if ( msgBuf->len == 0 ) return; // don't send empty message
179  if ( msgBuf->index && ! ((msgBuf->index) % 100) ) {
180  DebugM(3,"Sending message " << msgBuf->index << ".\n");
181  }
182  msgBuf->checksum = 0;
183  for ( size_t i=0; i < msgBuf->len; i++ ) {
184  msgBuf->checksum += (unsigned char) msgBuf->data[i];
185  }
186  cobj->sendMessage(PE,(void*)msgBuf,msgBuf->len+sizeof(StreamMessage)-1);
187  msgBuf->len = 0;
188  msgBuf->index += 1;
189 }
190 
~MIStream()
Definition: MStream.C:38
unsigned int checksum
Definition: MStream.C:22
void end(void)
Definition: MStream.C:176
void * getMessage(int PE, int tag)
Definition: Communicate.C:83
#define DebugM(x, y)
Definition: Debug.h:59
StreamMessage * next
Definition: MStream.C:23
char header[CmiMsgHeaderSizeBytes]
Definition: MStream.C:17
MIStream(Communicate *c, int pe, int tag)
Definition: MStream.C:27
void NAMD_bug(const char *err_msg)
Definition: common.C:129
MOStream(Communicate *c, int pe, int tag, size_t bufSize)
Definition: MStream.C:44
size_t len
Definition: MStream.C:20
~MOStream()
Definition: MStream.C:59
static int checkSum(StreamMessage *msg)
Definition: MStream.C:65
void sendMessage(int PE, void *msg, int size)
Definition: Communicate.C:113
char data[1]
Definition: MStream.C:24
unsigned int index
Definition: MStream.C:21