12 #define MIN_DEBUG_LEVEL 2 51 msgBuf->
PE = CmiMyPe();
68 for (
size_t i=0; i < msg->
len; i++ ) {
69 checksum += (
unsigned char) msg->
data[i];
73 " of length " << msg->
len <<
74 " with checksum " << ((
int)checksum) <<
75 " vs " << ((
int)(msg->
checksum)) <<
"\n");
76 NAMD_bug(
"MStream checksums do not agree!");
81 MIStream *MIStream::Get(
char *buf,
size_t len)
85 if ( early && (early->
index < currentIndex) ) {
87 " from Pe(" << msg->
PE <<
")" <<
88 " on stack while waiting for " << currentIndex <<
89 " from Pe(" << PE <<
").\n");
90 NAMD_bug(
"MIStream::Get - duplicate message on stack!");
92 if ( early && (early->
index == currentIndex) ) {
93 DebugM(2,
"Popping message " << currentIndex <<
" from stack.\n");
98 DebugM(1,
"Receiving message.\n");
102 while ( msg->
index != currentIndex ) {
103 if ( msg->
index < currentIndex ) {
105 " from Pe(" << msg->
PE <<
")" <<
106 " received while waiting for " << currentIndex <<
107 " from Pe(" << PE <<
").\n");
108 NAMD_bug(
"MIStream::Get - duplicate message received!");
110 DebugM(2,
"Pushing message " << msg->
index <<
" on stack.\n");
111 if ( (! early) || (early->
index > msg->
index) ) {
122 DebugM(1,
"Receiving message again.\n");
129 if(currentPos+len <= msg->len) {
130 memcpy(buf, &(msg->
data[currentPos]), len);
134 size_t b = msg->
len-currentPos;
135 memcpy(buf, &(msg->
data[currentPos]), b);
140 if(currentPos == msg->
len) {
148 MOStream *MOStream::Put(
char *buf,
size_t len)
151 if(msgBuf->
len + len <= bufLen) {
152 memcpy(&(msgBuf->
data[msgBuf->
len]), buf, len);
156 size_t b = bufLen - msgBuf->
len;
157 memcpy(&(msgBuf->
data[msgBuf->
len]), buf, b);
158 msgBuf->
len = bufLen;
159 if ( msgBuf->
index && ! ((msgBuf->
index) % 100) ) {
160 DebugM(3,
"Sending message " << msgBuf->
index <<
".\n");
163 for (
size_t i=0; i < msgBuf->
len; i++ ) {
178 if ( msgBuf->
len == 0 )
return;
179 if ( msgBuf->
index && ! ((msgBuf->
index) % 100) ) {
180 DebugM(3,
"Sending message " << msgBuf->
index <<
".\n");
183 for (
size_t i=0; i < msgBuf->
len; i++ ) {
void * getMessage(int PE, int tag)
char header[CmiMsgHeaderSizeBytes]
MIStream(Communicate *c, int pe, int tag)
void NAMD_bug(const char *err_msg)
MOStream(Communicate *c, int pe, int tag, size_t bufSize)
static int checkSum(StreamMessage *msg)
void sendMessage(int PE, void *msg, int size)