00001
00007 #include "UniqueSet.h"
00008 #include "UniqueSetIter.h"
00009 #include "BroadcastMgr.decl.h"
00010 #include "BroadcastMgr.h"
00011 #include "BroadcastClient.h"
00012 #include "BroadcastObject.h"
00013 #include "ProcessorPrivate.h"
00014 #define MIN_DEBUG_LEVEL 3
00015
00016 #include "Debug.h"
00017
00018 BroadcastMgr::~BroadcastMgr(void) {
00019 UniqueSetIter<BOID> boidIter(boid);
00020 for (boidIter = boidIter.begin(); boidIter != boidIter.end(); boidIter++) {
00021 delete boidIter->broadcastSet;
00022 if (boidIter->taggedMsg) {
00023 delete boidIter->taggedMsg;
00024 }
00025 }
00026 }
00027
00028
00029 int
00030 BroadcastMgr::getbuf(BroadcastClient &b, int tag, void *msg) {
00031 int rval = -1;
00032 TaggedMsg *tm;
00033 BOID* boidTmp = boid.find(BOID(b.id));
00034 if (!boidTmp) {
00035 return(-2);
00036 }
00037 if ( (tm = (boidTmp->taggedMsg->find(TaggedMsg(tag)))) ) {
00038 rval = tm->msgSize;
00039 memcpy(msg, tm->msg, tm->msgSize);
00040 if (!--(tm->counter)) {
00041 (boid.find(BOID(b.id)))->taggedMsg->del(TaggedMsg(tag));
00042 }
00043 }
00044 return(rval);
00045 }
00046
00047
00048 void
00049 BroadcastMgr::send(BroadcastClient &b, int tag, void *buf, size_t size) {
00050 BroadcastMsg* msg = new BroadcastMsg;
00051 memcpy((void*)(msg->msg),buf,size);
00052 msg->size = (int)size;
00053 msg->tag = tag;
00054 msg->id = b.id;
00055 msg->node = CkMyPe();
00056 CProxy_BroadcastMgr(thisgroup).recvBroadcast(msg);
00057 }
00058
00059 void
00060 BroadcastMgr::subscribe(BroadcastClient &bc) {
00061 BOID *b;
00062 if (!(b = boid.find(BOID(bc.id)))) {
00063 boid.add(BOID(bc.id));
00064 b = boid.find(BOID(bc.id));
00065 b->broadcastSet = new UniqueSet<BroadcastClientElem>;
00066 b->taggedMsg = new UniqueSet<TaggedMsg>;
00067 }
00068 b->broadcastSet->add(BroadcastClientElem(&bc));
00069 }
00070
00071 void
00072 BroadcastMgr::unsubscribe(BroadcastClient &bc) {
00073 BOID *b;
00074 if ( (b = boid.find(BOID(bc.id))) ) {
00075 b->broadcastSet->del(BroadcastClientElem(&bc));
00076 if (!b->broadcastSet->size()) {
00077 delete b->broadcastSet;
00078 b->broadcastSet = 0;
00079 delete b->taggedMsg;
00080 b->taggedMsg = 0;
00081 }
00082 }
00083 }
00084
00085 void
00086 BroadcastMgr::recvBroadcast(BroadcastMsg *msg) {
00087 BOID *b;
00088 int counter;
00089
00090 if ( (b = boid.find(BOID(msg->id))) ) {
00091
00092 counter = b->broadcastSet->size();
00093 if (msg->node == CkMyPe()) counter--;
00094 if ( counter < 0 ) NAMD_bug("BroadcastMgr::recvBroadcast counter < 0");
00095 else if ( counter > 0 ) {
00096 b->taggedMsg->add(TaggedMsg(msg->tag,msg->size,counter,msg->msg));
00097
00098
00099 UniqueSetIter<BroadcastClientElem> bcIter(*(b->broadcastSet));
00100 for (bcIter = bcIter.begin(); bcIter != bcIter.end(); bcIter++) {
00101 bcIter->broadcastClient->awaken(msg->id, msg->tag);
00102 }
00103 }
00104 }
00105 delete msg;
00106 }
00107
00108 #include "BroadcastMgr.def.h"
00109