NAMD
ReductionMgr.C
Go to the documentation of this file.
1 
7 /*
8  The order of execution is expected to be:
9  0. instantiate object
10  ------------------ (processing barrier)
11  (mode 0) 1. register() and subscribe()
12  ------------------
13  (mode 1) 2. submit() and request()
14  ------------------
15  (mode 2) 3. unregister() and unsubscribe()
16  ------------------ (processing barrier)
17  4. destroy object
18  Doing this out-of-order will cause errors.
19 
20  Assumes that *only* one thread will require() a specific sequence's data.
21 */
22 
23 #include <stdlib.h>
24 #include <stdio.h>
25 
26 #include "InfoStream.h"
27 #include "PatchMap.h" // for patchMap
28 
29 #include "Node.h"
30 #include "SimParameters.h"
31 
32 #include "ReductionMgr.decl.h"
33 #include "ReductionMgr.h"
34 
35 // #define DEBUGM
36 #define MIN_DEBUG_LEVEL 4
37 #include "Debug.h"
38 
39 // Used to register and unregister reductions to downstream nodes
40 class ReductionRegisterMsg : public CMessage_ReductionRegisterMsg {
41 public:
43  int dataSize;
45 };
46 
47 // Used to send reduction data to downstream nodes
48 class ReductionSubmitMsg : public CMessage_ReductionSubmitMsg {
49 public:
53  int dataSize;
55 };
56 
57 ReductionSet::ReductionSet(int setID, int size, int numChildren) {
58  if ( setID == REDUCTIONS_BASIC || setID == REDUCTIONS_AMD ) {
59  if ( size != -1 ) {
60  NAMD_bug("ReductionSet size specified for REDUCTIONS_BASIC or REDUCTIONS_AMD.");
61  }
63  }
64  if ( size == -1 ) NAMD_bug("ReductionSet size not specified.");
65  dataSize = size;
66  reductionSetID = setID;
69  dataQueue = 0;
71  threadIsWaiting = 0;
72  addToRemoteSequenceNumber = new int[numChildren];
73 }
74 
76 
77  ReductionSetData *current = dataQueue;
78 
79  while ( current ) {
80  ReductionSetData *next = current->next;
81  delete current;
82  current = next;
83  }
84  delete [] addToRemoteSequenceNumber;
85 }
86 
87 // possibly create and return data for a particular seqNum
89 
90  ReductionSetData **current = &dataQueue;
91 
92  while ( *current ) {
93  if ( (*current)->sequenceNumber == seqNum ) return *current;
94  current = &((*current)->next);
95  }
96 
97 //iout << "seq " << seqNum << " created on " << CkMyPe() << "\n" << endi;
98  *current = new ReductionSetData(seqNum, dataSize);
99  return *current;
100 }
101 
102 // possibly delete data for a particular seqNum
104 
105  ReductionSetData **current = &dataQueue;
106 
107  while ( *current ) {
108  if ( (*current)->sequenceNumber == seqNum ) break;
109  current = &((*current)->next);
110  }
111 
112  if ( ! *current ) { NAMD_die("ReductionSet::removeData on missing seqNum"); }
113 
114  ReductionSetData *toremove = *current;
115  *current = (*current)->next;
116  return toremove;
117 }
118 
119 void ReductionMgr::buildSpanTree(const int pe,
120  const int max_intranode_children,
121  const int max_internode_children,
122  int* parent,
123  int* num_children,
124  int** children)
125 {
126  // If pe is a first-node, children are same-node pes and perhaps some
127  // other first-nodes, and parents are other first-nodes. If pe is not a
128  // first-node, build the spanning tree among the children, and the parent
129  // is the corresponding first-node
130 
131  // No matter what, build list of PEs on my node first
132  const int num_pes = CkNumPes();
133  const int num_node_pes = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(pe));
134  int *node_pes = new int[num_node_pes];
135  int pe_index = -1;
136  const int first_pe = CmiGetFirstPeOnPhysicalNode(CmiPhysicalNodeID(pe));
137  int num_nodes = 0;
138  int *node_ids = new int[num_pes];
139  int first_pe_index = -1;
140  int my_parent_index;
141 
142  // Make sure PE 0 is a first-node
143  if (pe == 0 && first_pe != pe) {
144  NAMD_die("PE 0 is not the first physical node. This shouldn't happen");
145  }
146  // Get all the PEs on my node, and also build the list of all first-nodes
147  int i;
148  int node_pe_count=0;
149  for (i = 0; i < num_pes; i++) {
150  // Save first-nodes
151  if (CmiGetFirstPeOnPhysicalNode(CmiPhysicalNodeID(i)) == i) {
152  node_ids[num_nodes] = i;
153  if (i == first_pe)
154  first_pe_index = num_nodes;
155  num_nodes++;
156  }
157 
158  // Also, find pes on my node
159  const int i1 = (i + first_pe) % num_pes;
160  if (CmiPeOnSamePhysicalNode(first_pe,i1)) {
161  if ( node_pe_count == num_node_pes )
162  NAMD_bug("ReductionMgr::buildSpanTree found inconsistent physical node data from Charm++ runtime");
163  node_pes[node_pe_count] = i1;
164  if (pe == i1)
165  pe_index = node_pe_count;
166  node_pe_count++;
167  }
168  }
169  if ( pe_index < 0 || first_pe_index < 0 )
170  NAMD_bug("ReductionMgr::buildSpanTree found inconsistent physical node data from Charm++ runtime");
171 
172  // Any PE might have children on the same node, plus, if its a first-node,
173  // it may have several children on other nodes
174 
175  int first_loc_child_index = pe_index * max_intranode_children + 1;
176  int last_loc_child_index
177  = first_loc_child_index + max_intranode_children - 1;
178  if (first_loc_child_index > num_node_pes) {
179  first_loc_child_index = num_node_pes;
180  last_loc_child_index = num_node_pes;
181  } else {
182  if (last_loc_child_index >= num_node_pes)
183  last_loc_child_index = num_node_pes-1;
184  }
185 // CkPrintf("Local [%d] firstpe %d max %d num %d firstloc %d lastloc %d\n",
186 // pe,pe_index,max_intranode_children,num_node_pes,
187 // first_loc_child_index,last_loc_child_index);
188 
189  int first_rem_child_index = num_nodes;
190  int last_rem_child_index = num_nodes;
191  int rem_children=0;
192  int *rem_child_index = new int[max_internode_children];
193 
194  if (first_pe != pe) {
195  // I'm not a first_pe, so I have no more children, and my parent
196  // is someone else on my node
197  my_parent_index = (pe_index-1)/max_intranode_children;
198  *parent = node_pes[my_parent_index];
199  } else {
200  // I am a first_pe, so I may have additional children
201  // on other nodes, and my parent will be on another node
202 
203  int range_begin = 0;
204  int range_end = num_nodes;
205 
206  if (pe == 0) {
207  my_parent_index = -1;
208  *parent = -1;
209  } else {
210  my_parent_index = 0;
211  while ( first_pe_index != range_begin ) {
212  my_parent_index = range_begin;
213  ++range_begin;
214  for ( int i = 0; i < max_internode_children; ++i ) {
215  int split = range_begin + ( range_end - range_begin ) / ( max_internode_children - i );
216  if ( first_pe_index < split ) { range_end = split; break; }
217  else { range_begin = split; }
218  }
219  }
220  *parent = node_ids[my_parent_index];
221  }
222 
223  // now we know parent and need only repeat calculation of children
224  int prev_child_index = range_begin;
225  ++range_begin;
226  for ( int i = 0; i < max_internode_children; ++i ) {
227  if ( range_begin >= range_end ) break;
228  if ( range_begin > prev_child_index ) {
229  rem_child_index[rem_children++] = prev_child_index = range_begin;
230  }
231  range_begin += ( range_end - range_begin ) / ( max_internode_children - i );
232  }
233  }
234 
235  *num_children = 0;
236  //CkPrintf("TREE pe %d my_parent %d %d\n",pe,my_parent_index,*parent);
237 
238  int loc_children=0;
239  if (first_loc_child_index != num_node_pes) {
240  loc_children = last_loc_child_index - first_loc_child_index + 1;
241  *num_children += loc_children;
242 // CkPrintf("TREE pe %d %d local children\n",pe,loc_children);
243 // } else {
244 // CkPrintf("TREE pe %d No local children\n",pe);
245  }
246 
247  if (rem_children) {
248  *num_children += rem_children;
249 // CkPrintf("TREE pe %d %d rem children\n",pe,rem_children);
250 // } else {
251 // CkPrintf("TREE pe %d No rem children\n",pe);
252  }
253  if (*num_children == 0)
254  *children = 0;
255  else {
256  *children = new int[*num_children];
257 // CkPrintf("TREE pe %d children %d\n",pe,*num_children);
258  int k;
259  int child=0;
260  if (loc_children > 0) {
261  for(k=first_loc_child_index; k <= last_loc_child_index; k++) {
262 // CkPrintf("TREE pe %d loc child[%d,%d] %d\n",pe,child,k,node_pes[k]);
263  (*children)[child++]=node_pes[k];
264  }
265  }
266  if (rem_children > 0) {
267  for(k=0; k < rem_children; k++) {
268 // CkPrintf("TREE pe %d rem child[%d,%d] %d\n",pe,child,k,node_ids[rem_child_index[k]]);
269  (*children)[child++]=node_ids[rem_child_index[k]];
270  }
271  }
272  }
273  delete [] rem_child_index;
274  delete [] node_ids;
275  delete [] node_pes;
276 }
277 
278 // constructor
280  if (CkpvAccess(ReductionMgr_instance) == 0) {
281  CkpvAccess(ReductionMgr_instance) = this;
282  } else {
283  DebugM(1, "ReductionMgr::ReductionMgr() - another instance exists!\n");
284  }
285 
287  &myParent,&numChildren,&children);
288 
289 // CkPrintf("TREE [%d] parent %d %d children\n",
290 // CkMyPe(),myParent,numChildren);
291 // if (numChildren > 0) {
292 // for(int i=0; i < numChildren; i++) {
293 // CkPrintf("TREE [%d] child %d %d\n",CkMyPe(),i,children[i]);
294 // }
295 // }
296 
297  // fill in the spanning tree fields
298 #if 0 // Old spanning tree
299  if (CkMyPe() == 0) {
300  myParent = -1;
301  } else {
302  myParent = (CkMyPe()-1)/REDUCTION_MAX_CHILDREN;
303  }
304  firstChild = CkMyPe()*REDUCTION_MAX_CHILDREN + 1;
305  if (firstChild > CkNumPes()) firstChild = CkNumPes();
306  lastChild = firstChild + REDUCTION_MAX_CHILDREN;
307  if (lastChild > CkNumPes()) lastChild = CkNumPes();
308 #endif
309 
310  // initialize data
311  for(int i=0; i<REDUCTION_MAX_SET_ID; i++) {
312  reductionSets[i] = 0;
313  }
314 
315  DebugM(1,"ReductionMgr() instantiated.\n");
316 }
317 
318 // destructor
320  if (children != 0)
321  delete [] children;
322  for(int i=0; i<REDUCTION_MAX_SET_ID; i++) {
323  delete reductionSets[i];
324  }
325 
326 }
327 
328 // possibly create and return reduction set
329 ReductionSet* ReductionMgr::getSet(int setID, int size) {
330  if ( reductionSets[setID] == 0 ) {
331  reductionSets[setID] = new ReductionSet(setID,size,numChildren);
332  if ( ! isRoot() ) {
334  msg->reductionSetID = setID;
335  msg->dataSize = size;
336  msg->sourceNode = CkMyPe();
337  CProxy_ReductionMgr reductionProxy(thisgroup);
338  reductionProxy[myParent].remoteRegister(msg);
339  }
340  } else if ( setID == REDUCTIONS_BASIC || setID == REDUCTIONS_AMD ) {
341  if ( size != -1 ) NAMD_bug("ReductionMgr::getSet size set");
342  } else if ( size < 0 || reductionSets[setID]->dataSize != size ) {
343  NAMD_bug("ReductionMgr::getSet size mismatch");
344  }
345  return reductionSets[setID];
346 }
347 
348 // possibly delete reduction set
349 void ReductionMgr::delSet(int setID) {
350  ReductionSet *set = reductionSets[setID];
351  if ( set && ! set->submitsRegistered & ! set->requireRegistered ) {
352  if ( ! isRoot() ) {
354  msg->reductionSetID = setID;
355  msg->sourceNode = CkMyPe();
356  CProxy_ReductionMgr reductionProxy(thisgroup);
357  reductionProxy[myParent].remoteUnregister(msg);
358  }
359  delete set;
360  reductionSets[setID] = 0;
361  }
362 }
363 
364 // register local submit
366  ReductionSet *set = getSet(setID, size);
367  ReductionSetData *data = set->getData(set->nextSequenceNumber);
368  if ( data->submitsRecorded ) {
369  NAMD_die("ReductionMgr::willSubmit called while reductions outstanding!");
370  }
371 
372  set->submitsRegistered++;
373 
374  SubmitReduction *handle = new SubmitReduction;
375  handle->reductionSetID = setID;
376  handle->sequenceNumber = set->nextSequenceNumber;
377  handle->master = this;
378  handle->data = data->data;
379 
380  return handle;
381 }
382 
383 // unregister local submit
384 void ReductionMgr::remove(SubmitReduction* handle) {
385  int setID = handle->reductionSetID;
386  ReductionSet *set = reductionSets[setID];
387  if ( set->getData(set->nextSequenceNumber)->submitsRecorded ) {
388  NAMD_die("SubmitReduction deleted while reductions outstanding!");
389  }
390 
391  set->submitsRegistered--;
392 
393  delSet(setID);
394 }
395 
396 // local submit
397 void ReductionMgr::submit(SubmitReduction* handle) {
398  int setID = handle->reductionSetID;
399  int seqNum = handle->sequenceNumber;
400  ReductionSet *set = reductionSets[setID];
401  ReductionSetData *data = set->getData(seqNum);
402 
403  data->submitsRecorded++;
404  if ( data->submitsRecorded == set->submitsRegistered ) {
405  mergeAndDeliver(set,seqNum);
406  }
407 
408  handle->sequenceNumber = ++seqNum;
409  handle->data = set->getData(seqNum)->data;
410 }
411 
412 // register submit from child
414 
415  int setID = msg->reductionSetID;
416  int size = msg->dataSize;
417  ReductionSet *set = getSet(setID,size);
418  if ( set->getData(set->nextSequenceNumber)->submitsRecorded ) {
419  NAMD_die("ReductionMgr::remoteRegister called while reductions outstanding on parent!");
420  }
421 
422  set->submitsRegistered++;
423  set->addToRemoteSequenceNumber[childIndex(msg->sourceNode)]
424  = set->nextSequenceNumber;
425 // CkPrintf("[%d] reduction register received from node[%d] %d\n",
426 // CkMyPe(),childIndex(msg->sourceNode),msg->sourceNode);
427 
428  delete msg;
429 }
430 
431 // unregister submit from child
433 
434  int setID = msg->reductionSetID;
435  ReductionSet *set = reductionSets[setID];
436  if ( set->getData(set->nextSequenceNumber)->submitsRecorded ) {
437  NAMD_die("SubmitReduction deleted while reductions outstanding on parent!");
438  }
439 
440  set->submitsRegistered--;
441 
442  delSet(setID);
443  delete msg;
444 }
445 
446 // data submitted from child
448  int setID = msg->reductionSetID;
449  ReductionSet *set = reductionSets[setID];
450  int seqNum = msg->sequenceNumber
451  + set->addToRemoteSequenceNumber[childIndex(msg->sourceNode)];
452 
453 //iout << "seq " << seqNum << " from " << msg->sourceNode << " received on " << CkMyPe() << "\n" << endi;
454  int size = msg->dataSize;
455  if ( size != set->dataSize ) {
456  NAMD_bug("ReductionMgr::remoteSubmit data sizes do not match.");
457  }
458 
459  BigReal *newData = msg->data;
460  ReductionSetData *data = set->getData(seqNum);
461  BigReal *curData = data->data;
462 #ifdef ARCH_POWERPC
463 #pragma disjoint (*curData, *newData)
464 #pragma unroll(4)
465 #endif
466  if ( setID == REDUCTIONS_MINIMIZER ) {
467  for ( int i = 0; i < size; ++i ) {
468  if ( newData[i] > curData[i] ) {
469  curData[i] = newData[i];
470  }
471  }
472  } else {
473  for ( int i = 0; i < size; ++i ) {
474  curData[i] += newData[i];
475  }
476  }
477 // CkPrintf("[%d] reduction Submit received from node[%d] %d\n",
478 // CkMyPe(),childIndex(msg->sourceNode),msg->sourceNode);
479  delete msg;
480 
481  data->submitsRecorded++;
482  if ( data->submitsRecorded == set->submitsRegistered ) {
483  mergeAndDeliver(set,seqNum);
484  }
485 }
486 
487 // common code for submission and delivery
488 void ReductionMgr::mergeAndDeliver(ReductionSet *set, int seqNum) {
489 
490 //iout << "seq " << seqNum << " complete on " << CkMyPe() << "\n" << endi;
491 
492  set->nextSequenceNumber++; // should match all clients
493 
494  ReductionSetData *data = set->getData(seqNum);
495  if ( data->submitsRecorded != set->submitsRegistered ) {
496  NAMD_bug("ReductionMgr::mergeAndDeliver not ready to deliver.");
497  }
498 
499  if ( isRoot() ) {
500  if ( set->requireRegistered ) {
501  if ( set->threadIsWaiting && set->waitingForSequenceNumber == seqNum) {
502  // awaken the thread so it can take the data
503  CthAwaken(set->waitingThread);
504  }
505  } else {
506  NAMD_die("ReductionSet::deliver will never deliver data");
507  }
508  } else {
509  // send data to parent
511  msg->reductionSetID = set->reductionSetID;
512  msg->sourceNode = CkMyPe();
513  msg->sequenceNumber = seqNum;
514  msg->dataSize = set->dataSize;
515  for ( int i = 0; i < msg->dataSize; ++i ) {
516  msg->data[i] = data->data[i];
517  }
518  CProxy_ReductionMgr reductionProxy(thisgroup);
519  reductionProxy[myParent].remoteSubmit(msg);
520  delete set->removeData(seqNum);
521  }
522 
523 }
524 
525 // register require
527  ReductionSet *set = getSet(setID,size);
528  set->requireRegistered++;
529  if ( set->getData(set->nextSequenceNumber)->submitsRecorded ) {
530  NAMD_die("ReductionMgr::willRequire called while reductions outstanding!");
531  }
532 
533  RequireReduction *handle = new RequireReduction;
534  handle->reductionSetID = setID;
535  handle->sequenceNumber = set->nextSequenceNumber;
536  handle->master = this;
537 
538  return handle;
539 }
540 
541 // unregister require
542 void ReductionMgr::remove(RequireReduction* handle) {
543  int setID = handle->reductionSetID;
544  ReductionSet *set = reductionSets[setID];
545  if ( set->getData(set->nextSequenceNumber)->submitsRecorded ) {
546  NAMD_die("RequireReduction deleted while reductions outstanding!");
547  }
548 
549  set->requireRegistered--;
550 
551  delSet(setID);
552 }
553 
554 // require the data from a thread
555 void ReductionMgr::require(RequireReduction* handle) {
556  int setID = handle->reductionSetID;
557  ReductionSet *set = reductionSets[setID];
558  int seqNum = handle->sequenceNumber;
559  ReductionSetData *data = set->getData(seqNum);
560  if ( data->submitsRecorded < set->submitsRegistered ) {
561  set->threadIsWaiting = 1;
562  set->waitingForSequenceNumber = seqNum;
563  set->waitingThread = CthSelf();
564 //iout << "seq " << seqNum << " waiting\n" << endi;
565  CthSuspend();
566  }
567  set->threadIsWaiting = 0;
568 
569 //iout << "seq " << seqNum << " consumed\n" << endi;
570  delete handle->currentData;
571  handle->currentData = set->removeData(seqNum);
572  handle->data = handle->currentData->data;
573  handle->sequenceNumber = ++seqNum;
574 }
575 
576 
577 #include "ReductionMgr.def.h"
578 // nothing should be placed below here
579 
void buildSpanTree(const int pe, const int max_intranode_children, const int max_internode_children, int *parent, int *num_children, int **children)
Definition: ReductionMgr.C:119
int nextSequenceNumber
Definition: ReductionMgr.h:216
ReductionSetData * next
Definition: ReductionMgr.h:199
int * addToRemoteSequenceNumber
Definition: ReductionMgr.h:228
ReductionSetData * getData(int seqNum)
Definition: ReductionMgr.C:88
#define DebugM(x, y)
Definition: Debug.h:59
ReductionSet(int setID, int size, int numChildren)
Definition: ReductionMgr.C:57
void remoteRegister(ReductionRegisterMsg *msg)
Definition: ReductionMgr.C:413
SubmitReduction * willSubmit(int setID, int size=-1)
Definition: ReductionMgr.C:365
friend class SubmitReduction
Definition: ReductionMgr.h:236
#define REDUCTION_MAX_CHILDREN
Definition: ReductionMgr.h:186
static Units next(Units u)
Definition: ParseOptions.C:48
void NAMD_bug(const char *err_msg)
Definition: common.C:129
CthThread waitingThread
Definition: ReductionMgr.h:225
int waitingForSequenceNumber
Definition: ReductionMgr.h:224
int requireRegistered
Definition: ReductionMgr.h:222
ReductionSetData * removeData(int seqNum)
Definition: ReductionMgr.C:103
int submitsRegistered
Definition: ReductionMgr.h:217
void NAMD_die(const char *err_msg)
Definition: common.C:85
std::vector< std::string > split(const std::string &text, std::string delimiter)
Definition: MoleculeQM.C:73
void remoteSubmit(ReductionSubmitMsg *msg)
Definition: ReductionMgr.C:447
RequireReduction * willRequire(int setID, int size=-1)
Definition: ReductionMgr.C:526
friend class RequireReduction
Definition: ReductionMgr.h:237
void remoteUnregister(ReductionRegisterMsg *msg)
Definition: ReductionMgr.C:432
double BigReal
Definition: common.h:114
ReductionSetData * dataQueue
Definition: ReductionMgr.h:219