NAMD
ReductionMgr.C
Go to the documentation of this file.
1 
7 /*
8  The order of execution is expected to be:
9  0. instantiate object
10  ------------------ (processing barrier)
11  (mode 0) 1. register() and subscribe()
12  ------------------
13  (mode 1) 2. submit() and request()
14  ------------------
15  (mode 2) 3. unregister() and unsubscribe()
16  ------------------ (processing barrier)
17  4. destroy object
18  Doing this out-of-order will cause errors.
19 
20  Assumes that *only* one thread will require() a specific sequence's data.
21 */
22 
23 #include <stdlib.h>
24 #include <stdio.h>
25 
26 #include "InfoStream.h"
27 #include "PatchMap.h" // for patchMap
28 
29 #include "Node.h"
30 #include "SimParameters.h"
31 
32 #include "ReductionMgr.decl.h"
33 #include "ReductionMgr.h"
34 
35 // #define DEBUGM
36 #define MIN_DEBUG_LEVEL 4
37 #include "Debug.h"
38 #include <atomic>
39 
40 // Used to register and unregister reductions to downstream nodes
41 class ReductionRegisterMsg : public CMessage_ReductionRegisterMsg {
42 public:
44  int dataSize;
46 };
47 
48 // Used to send reduction data to downstream nodes
49 class ReductionSubmitMsg : public CMessage_ReductionSubmitMsg {
50 public:
54  int dataSize;
56 };
57 
58 ReductionSet::ReductionSet(int setID, int size, int numChildren) {
59  if ( setID == REDUCTIONS_BASIC || setID == REDUCTIONS_AMD ) {
60  if ( size != -1 ) {
61  NAMD_bug("ReductionSet size specified for REDUCTIONS_BASIC or REDUCTIONS_AMD.");
62  }
64  }
65  if ( size == -1 ) NAMD_bug("ReductionSet size not specified.");
66  dataSize = size;
67  reductionSetID = setID;
70  dataQueue = 0;
72  threadIsWaiting = 0;
73  addToRemoteSequenceNumber = new int[numChildren];
74 }
75 
77 
78  ReductionSetData *current = dataQueue;
79 
80  while ( current ) {
81  ReductionSetData *next = current->next;
82  delete current;
83  current = next;
84  }
85  delete [] addToRemoteSequenceNumber;
86 }
87 
88 // possibly create and return data for a particular seqNum
90 
91  ReductionSetData **current = &dataQueue;
92 
93  while ( *current ) {
94  if ( (*current)->sequenceNumber == seqNum ) return *current;
95  current = &((*current)->next);
96  }
97 
98 //iout << "seq " << seqNum << " created on " << CkMyPe() << "\n" << endi;
99  *current = new ReductionSetData(seqNum, dataSize);
100  return *current;
101 }
102 
103 // possibly delete data for a particular seqNum
105 
106  ReductionSetData **current = &dataQueue;
107 
108  while ( *current ) {
109  if ( (*current)->sequenceNumber == seqNum ) break;
110  current = &((*current)->next);
111  }
112 
113  if ( ! *current ) { NAMD_die("ReductionSet::removeData on missing seqNum"); }
114 
115  ReductionSetData *toremove = *current;
116  *current = (*current)->next;
117  return toremove;
118 }
119 
120 void ReductionMgr::buildSpanTree(const int pe,
121  const int max_intranode_children,
122  const int max_internode_children,
123  int* parent,
124  int* num_children,
125  int** children)
126 {
127  // If pe is a first-node, children are same-node pes and perhaps some
128  // other first-nodes, and parents are other first-nodes. If pe is not a
129  // first-node, build the spanning tree among the children, and the parent
130  // is the corresponding first-node
131 
132  // No matter what, build list of PEs on my node first
133  const int num_pes = CkNumPes();
134  const int num_node_pes = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(pe));
135  int *node_pes = new int[num_node_pes];
136  int pe_index = -1;
137  const int first_pe = CmiGetFirstPeOnPhysicalNode(CmiPhysicalNodeID(pe));
138  int num_nodes = 0;
139  int *node_ids = new int[num_pes];
140  int first_pe_index = -1;
141  int my_parent_index;
142 
143  // Make sure PE 0 is a first-node
144  if (pe == 0 && first_pe != pe) {
145  NAMD_die("PE 0 is not the first physical node. This shouldn't happen");
146  }
147  // Get all the PEs on my node, and also build the list of all first-nodes
148  int i;
149  int node_pe_count=0;
150  for (i = 0; i < num_pes; i++) {
151  // Save first-nodes
152  if (CmiGetFirstPeOnPhysicalNode(CmiPhysicalNodeID(i)) == i) {
153  node_ids[num_nodes] = i;
154  if (i == first_pe)
155  first_pe_index = num_nodes;
156  num_nodes++;
157  }
158 
159  // Also, find pes on my node
160  const int i1 = (i + first_pe) % num_pes;
161  if (CmiPeOnSamePhysicalNode(first_pe,i1)) {
162  if ( node_pe_count == num_node_pes )
163  NAMD_bug("ReductionMgr::buildSpanTree found inconsistent physical node data from Charm++ runtime");
164  node_pes[node_pe_count] = i1;
165  if (pe == i1)
166  pe_index = node_pe_count;
167  node_pe_count++;
168  }
169  }
170  if ( pe_index < 0 || first_pe_index < 0 )
171  NAMD_bug("ReductionMgr::buildSpanTree found inconsistent physical node data from Charm++ runtime");
172 
173  // Any PE might have children on the same node, plus, if its a first-node,
174  // it may have several children on other nodes
175 
176  int first_loc_child_index = pe_index * max_intranode_children + 1;
177  int last_loc_child_index
178  = first_loc_child_index + max_intranode_children - 1;
179  if (first_loc_child_index > num_node_pes) {
180  first_loc_child_index = num_node_pes;
181  last_loc_child_index = num_node_pes;
182  } else {
183  if (last_loc_child_index >= num_node_pes)
184  last_loc_child_index = num_node_pes-1;
185  }
186 // CkPrintf("Local [%d] firstpe %d max %d num %d firstloc %d lastloc %d\n",
187 // pe,pe_index,max_intranode_children,num_node_pes,
188 // first_loc_child_index,last_loc_child_index);
189 
190  int first_rem_child_index = num_nodes;
191  int last_rem_child_index = num_nodes;
192  int rem_children=0;
193  int *rem_child_index = new int[max_internode_children];
194 
195  if (first_pe != pe) {
196  // I'm not a first_pe, so I have no more children, and my parent
197  // is someone else on my node
198  my_parent_index = (pe_index-1)/max_intranode_children;
199  *parent = node_pes[my_parent_index];
200  } else {
201  // I am a first_pe, so I may have additional children
202  // on other nodes, and my parent will be on another node
203 
204  int range_begin = 0;
205  int range_end = num_nodes;
206 
207  if (pe == 0) {
208  my_parent_index = -1;
209  *parent = -1;
210  } else {
211  my_parent_index = 0;
212  while ( first_pe_index != range_begin ) {
213  my_parent_index = range_begin;
214  ++range_begin;
215  for ( int i = 0; i < max_internode_children; ++i ) {
216  int split = range_begin + ( range_end - range_begin ) / ( max_internode_children - i );
217  if ( first_pe_index < split ) { range_end = split; break; }
218  else { range_begin = split; }
219  }
220  }
221  *parent = node_ids[my_parent_index];
222  }
223 
224  // now we know parent and need only repeat calculation of children
225  int prev_child_index = range_begin;
226  ++range_begin;
227  for ( int i = 0; i < max_internode_children; ++i ) {
228  if ( range_begin >= range_end ) break;
229  if ( range_begin > prev_child_index ) {
230  rem_child_index[rem_children++] = prev_child_index = range_begin;
231  }
232  range_begin += ( range_end - range_begin ) / ( max_internode_children - i );
233  }
234  }
235 
236  *num_children = 0;
237  //CkPrintf("TREE pe %d my_parent %d %d\n",pe,my_parent_index,*parent);
238 
239  int loc_children=0;
240  if (first_loc_child_index != num_node_pes) {
241  loc_children = last_loc_child_index - first_loc_child_index + 1;
242  *num_children += loc_children;
243 // CkPrintf("TREE pe %d %d local children\n",pe,loc_children);
244 // } else {
245 // CkPrintf("TREE pe %d No local children\n",pe);
246  }
247 
248  if (rem_children) {
249  *num_children += rem_children;
250 // CkPrintf("TREE pe %d %d rem children\n",pe,rem_children);
251 // } else {
252 // CkPrintf("TREE pe %d No rem children\n",pe);
253  }
254  if (*num_children == 0)
255  *children = 0;
256  else {
257  *children = new int[*num_children];
258 // CkPrintf("TREE pe %d children %d\n",pe,*num_children);
259  int k;
260  int child=0;
261  if (loc_children > 0) {
262  for(k=first_loc_child_index; k <= last_loc_child_index; k++) {
263 // CkPrintf("TREE pe %d loc child[%d,%d] %d\n",pe,child,k,node_pes[k]);
264  (*children)[child++]=node_pes[k];
265  }
266  }
267  if (rem_children > 0) {
268  for(k=0; k < rem_children; k++) {
269 // CkPrintf("TREE pe %d rem child[%d,%d] %d\n",pe,child,k,node_ids[rem_child_index[k]]);
270  (*children)[child++]=node_ids[rem_child_index[k]];
271  }
272  }
273  }
274  delete [] rem_child_index;
275  delete [] node_ids;
276  delete [] node_pes;
277 }
278 
279 // constructor
281  if (CkpvAccess(ReductionMgr_instance) == 0) {
282  CkpvAccess(ReductionMgr_instance) = this;
283  } else {
284  DebugM(1, "ReductionMgr::ReductionMgr() - another instance exists!\n");
285  }
286 
288  &myParent,&numChildren,&children);
289 
290 // CkPrintf("TREE [%d] parent %d %d children\n",
291 // CkMyPe(),myParent,numChildren);
292 // if (numChildren > 0) {
293 // for(int i=0; i < numChildren; i++) {
294 // CkPrintf("TREE [%d] child %d %d\n",CkMyPe(),i,children[i]);
295 // }
296 // }
297 
298  // fill in the spanning tree fields
299 #if 0 // Old spanning tree
300  if (CkMyPe() == 0) {
301  myParent = -1;
302  } else {
303  myParent = (CkMyPe()-1)/REDUCTION_MAX_CHILDREN;
304  }
305  firstChild = CkMyPe()*REDUCTION_MAX_CHILDREN + 1;
306  if (firstChild > CkNumPes()) firstChild = CkNumPes();
307  lastChild = firstChild + REDUCTION_MAX_CHILDREN;
308  if (lastChild > CkNumPes()) lastChild = CkNumPes();
309 #endif
310 
311  // initialize data
312  for(int i=0; i<REDUCTION_MAX_SET_ID; i++) {
313  reductionSets[i] = 0;
314  }
315 
316  DebugM(1,"ReductionMgr() instantiated.\n");
317 }
318 
319 // destructor
321  if (children != 0)
322  delete [] children;
323  for(int i=0; i<REDUCTION_MAX_SET_ID; i++) {
324  delete reductionSets[i];
325  }
326 
327 }
328 
329 // possibly create and return reduction set
330 ReductionSet* ReductionMgr::getSet(int setID, int size) {
331  if ( reductionSets[setID] == 0 ) {
332  reductionSets[setID] = new ReductionSet(setID,size,numChildren);
333  if ( ! isRoot() ) {
335  msg->reductionSetID = setID;
336  msg->dataSize = size;
337  msg->sourceNode = CkMyPe();
338  CProxy_ReductionMgr reductionProxy(thisgroup);
339  reductionProxy[myParent].remoteRegister(msg);
340  }
341  } else if ( setID == REDUCTIONS_BASIC || setID == REDUCTIONS_AMD ) {
342  if ( size != -1 ) NAMD_bug("ReductionMgr::getSet size set");
343  } else if ( size < 0 || reductionSets[setID]->dataSize != size ) {
344  NAMD_bug("ReductionMgr::getSet size mismatch");
345  }
346  return reductionSets[setID];
347 }
348 
349 // possibly delete reduction set
350 void ReductionMgr::delSet(int setID) {
351  ReductionSet *set = reductionSets[setID];
352  if ( set && ! set->submitsRegistered & ! set->requireRegistered ) {
353  if ( ! isRoot() ) {
355  msg->reductionSetID = setID;
356  msg->sourceNode = CkMyPe();
357  CProxy_ReductionMgr reductionProxy(thisgroup);
358  reductionProxy[myParent].remoteUnregister(msg);
359  }
360  delete set;
361  reductionSets[setID] = 0;
362  }
363 }
364 
365 // register local submit
367  ReductionSet *set = getSet(setID, size);
368  ReductionSetData *data = set->getData(set->nextSequenceNumber);
369  if ( data->submitsRecorded ) {
370  NAMD_die("ReductionMgr::willSubmit called while reductions outstanding!");
371  }
372 
373  set->submitsRegistered++;
374 
375  SubmitReduction *handle = new SubmitReduction;
376  handle->reductionSetID = setID;
377  handle->sequenceNumber = set->nextSequenceNumber;
378  handle->master = this;
379  handle->data = data->data;
380 
381  return handle;
382 }
383 
384 // unregister local submit
385 void ReductionMgr::remove(SubmitReduction* handle) {
386  int setID = handle->reductionSetID;
387  ReductionSet *set = reductionSets[setID];
388  if ( set->getData(set->nextSequenceNumber)->submitsRecorded ) {
389  NAMD_die("SubmitReduction deleted while reductions outstanding!");
390  }
391 
392  set->submitsRegistered--;
393 
394  delSet(setID);
395 }
396 
397 // local submit
398 void ReductionMgr::submit(SubmitReduction* handle) {
399  int setID = handle->reductionSetID;
400  int seqNum = handle->sequenceNumber;
401  ReductionSet *set = reductionSets[setID];
402  ReductionSetData *data = set->getData(seqNum);
403 
404  data->submitsRecorded++;
405  if ( data->submitsRecorded == set->submitsRegistered ) {
406  mergeAndDeliver(set,seqNum);
407  }
408 
409  handle->sequenceNumber = ++seqNum;
410  handle->data = set->getData(seqNum)->data;
411 }
412 
413 // register submit from child
415 
416  int setID = msg->reductionSetID;
417  int size = msg->dataSize;
418  ReductionSet *set = getSet(setID,size);
419  if ( set->getData(set->nextSequenceNumber)->submitsRecorded ) {
420  NAMD_die("ReductionMgr::remoteRegister called while reductions outstanding on parent!");
421  }
422 
423  set->submitsRegistered++;
424  set->addToRemoteSequenceNumber[childIndex(msg->sourceNode)]
425  = set->nextSequenceNumber;
426 // CkPrintf("[%d] reduction register received from node[%d] %d\n",
427 // CkMyPe(),childIndex(msg->sourceNode),msg->sourceNode);
428 
429  delete msg;
430 }
431 
432 // unregister submit from child
434 
435  int setID = msg->reductionSetID;
436  ReductionSet *set = reductionSets[setID];
437  if ( set->getData(set->nextSequenceNumber)->submitsRecorded ) {
438  NAMD_die("SubmitReduction deleted while reductions outstanding on parent!");
439  }
440 
441  set->submitsRegistered--;
442 
443  delSet(setID);
444  delete msg;
445 }
446 
447 // data submitted from child
449  int setID = msg->reductionSetID;
450  ReductionSet *set = reductionSets[setID];
451  int seqNum = msg->sequenceNumber
452  + set->addToRemoteSequenceNumber[childIndex(msg->sourceNode)];
453 
454 //iout << "seq " << seqNum << " from " << msg->sourceNode << " received on " << CkMyPe() << "\n" << endi;
455  int size = msg->dataSize;
456  if ( size != set->dataSize ) {
457  NAMD_bug("ReductionMgr::remoteSubmit data sizes do not match.");
458  }
459 
460  BigReal *newData = msg->data;
461  ReductionSetData *data = set->getData(seqNum);
462  BigReal *curData = data->data;
463 #ifdef ARCH_POWERPC
464 #pragma disjoint (*curData, *newData)
465 #pragma unroll(4)
466 #endif
467  if ( setID == REDUCTIONS_MINIMIZER ) {
468  for ( int i = 0; i < size; ++i ) {
469  if ( newData[i] > curData[i] ) {
470  curData[i] = newData[i];
471  }
472  }
473  } else {
474  for ( int i = 0; i < size; ++i ) {
475  curData[i] += newData[i];
476  }
477  }
478 // CkPrintf("[%d] reduction Submit received from node[%d] %d\n",
479 // CkMyPe(),childIndex(msg->sourceNode),msg->sourceNode);
480  delete msg;
481 
482  data->submitsRecorded++;
483  if ( data->submitsRecorded == set->submitsRegistered ) {
484  mergeAndDeliver(set,seqNum);
485  }
486 }
487 
488 // common code for submission and delivery
489 void ReductionMgr::mergeAndDeliver(ReductionSet *set, int seqNum) {
490 
491 //iout << "seq " << seqNum << " complete on " << CkMyPe() << "\n" << endi;
492 
493  set->nextSequenceNumber++; // should match all clients
494 
495  ReductionSetData *data = set->getData(seqNum);
496  if ( data->submitsRecorded != set->submitsRegistered ) {
497  NAMD_bug("ReductionMgr::mergeAndDeliver not ready to deliver.");
498  }
499 
500  if ( isRoot() ) {
501  if ( set->requireRegistered ) {
502  if ( set->threadIsWaiting && set->waitingForSequenceNumber == seqNum) {
503  // awaken the thread so it can take the data
504  CthAwaken(set->waitingThread);
505  }
506  } else {
507  NAMD_die("ReductionSet::deliver will never deliver data");
508  }
509  } else {
510  // send data to parent
511  ReductionSubmitMsg *msg = new(set->dataSize) ReductionSubmitMsg;
512  msg->reductionSetID = set->reductionSetID;
513  msg->sourceNode = CkMyPe();
514  msg->sequenceNumber = seqNum;
515  msg->dataSize = set->dataSize;
516  for ( int i = 0; i < msg->dataSize; ++i ) {
517  msg->data[i] = data->data[i];
518  }
519  CProxy_ReductionMgr reductionProxy(thisgroup);
520  reductionProxy[myParent].remoteSubmit(msg);
521  delete set->removeData(seqNum);
522  }
523 
524 }
525 
526 // register require
528  ReductionSet *set = getSet(setID,size);
529  set->requireRegistered++;
530  if ( set->getData(set->nextSequenceNumber)->submitsRecorded ) {
531  NAMD_die("ReductionMgr::willRequire called while reductions outstanding!");
532  }
533 
534  RequireReduction *handle = new RequireReduction;
535  handle->reductionSetID = setID;
536  handle->sequenceNumber = set->nextSequenceNumber;
537  handle->master = this;
538 
539  return handle;
540 }
541 
542 // unregister require
543 void ReductionMgr::remove(RequireReduction* handle) {
544  int setID = handle->reductionSetID;
545  ReductionSet *set = reductionSets[setID];
546  if ( set->getData(set->nextSequenceNumber)->submitsRecorded ) {
547  NAMD_die("RequireReduction deleted while reductions outstanding!");
548  }
549 
550  set->requireRegistered--;
551 
552  delSet(setID);
553 }
554 
555 // require the data from a thread
556 void ReductionMgr::require(RequireReduction* handle) {
557  int setID = handle->reductionSetID;
558  ReductionSet *set = reductionSets[setID];
559  int seqNum = handle->sequenceNumber;
560  ReductionSetData *data = set->getData(seqNum);
561  if ( data->submitsRecorded < set->submitsRegistered ) {
562  set->threadIsWaiting = 1;
563  set->waitingForSequenceNumber = seqNum;
564  set->waitingThread = CthSelf();
565 //iout << "seq " << seqNum << " waiting\n" << endi;
566  CthSuspend();
567  }
568  set->threadIsWaiting = 0;
569 
570 //iout << "seq " << seqNum << " consumed\n" << endi;
571  delete handle->currentData;
572  handle->currentData = set->removeData(seqNum);
573  handle->data = handle->currentData->data;
574  handle->sequenceNumber = ++seqNum;
575 }
576 
577 //these will return void for now
579  reducedValue = 0.0;
580  valueLock = CmiCreateLock();
581 }
582 
584  CmiDestroyLock(this->valueLock);
585 }
586 
587 // atomic updates to reducedValue
588 double ReductionValue::operator+=(double rvalue){
589  //reducedValue.fetch_add(rvalue, std::memory_order_relaxed);
590  CmiLock(valueLock);
591  reducedValue += rvalue;
592  CmiUnlock(valueLock);
593  return reducedValue;
594 }
595 
596 
597 double ReductionValue::operator+=(int rvalue){
598  //reducedValue.fetch_add(rvalue, std::memory_order_relaxed);
599  CmiLock(valueLock);
600  reducedValue += rvalue;
601  CmiUnlock(valueLock);
602  return reducedValue;
603 }
604 
605 double ReductionValue::operator+=(unsigned int rvalue){
606  //reducedValue.fetch_add(rvalue, std::memory_order_relaxed);
607  CmiLock(valueLock);
608  reducedValue += rvalue;
609  CmiUnlock(valueLock);
610  return reducedValue;
611 }
612 
613 ReductionValue::operator int(){
614  return (int)reducedValue;
615 }
616 
617 ReductionValue::operator double(){
618  return reducedValue;
619 }
620 
622  //memset(set, 0, sizeof(ReductionValue)*REDUCTION_MAX_RESERVED);
623  for(int i = 0; i < REDUCTION_MAX_RESERVED; i++){
624  this->set[i].reducedValue = 0.0;
625  }
626 
627 }
628 
630  return set[index];
631 }
632 
634  return set[index];
635 }
636 
638  // this memset is going to eff everything up i think
639  //memset(set, 0, sizeof(ReductionValue)*REDUCTION_MAX_RESERVED);
640 #pragma unroll
641  for(int i = 0; i < REDUCTION_MAX_RESERVED; i++){
642  this->set[i].reducedValue = 0.0;
643  }
644 }
645 
646 
648 #pragma unroll
649  for(int i = 0; i < REDUCTION_MAX_RESERVED; i++){
650  this->set[i].reducedValue = other->set[i].reducedValue;
651  }
652 }
653 
655 
656 }
657 
658 
659 #include "ReductionMgr.def.h"
660 // nothing should be placed below here
661 
void buildSpanTree(const int pe, const int max_intranode_children, const int max_internode_children, int *parent, int *num_children, int **children)
Definition: ReductionMgr.C:120
int nextSequenceNumber
Definition: ReductionMgr.h:217
ReductionValue & operator[](int index)
Definition: ReductionMgr.C:629
ReductionSetData * next
Definition: ReductionMgr.h:200
int * addToRemoteSequenceNumber
Definition: ReductionMgr.h:229
ReductionSetData * getData(int seqNum)
Definition: ReductionMgr.C:89
#define DebugM(x, y)
Definition: Debug.h:75
ReductionSet(int setID, int size, int numChildren)
Definition: ReductionMgr.C:58
void remoteRegister(ReductionRegisterMsg *msg)
Definition: ReductionMgr.C:414
SubmitReduction * willSubmit(int setID, int size=-1)
Definition: ReductionMgr.C:366
friend class SubmitReduction
Definition: ReductionMgr.h:237
double operator+=(double other)
Definition: ReductionMgr.C:588
#define REDUCTION_MAX_CHILDREN
Definition: ReductionMgr.h:187
CmiNodeLock valueLock
Definition: ReductionMgr.h:352
static Units next(Units u)
Definition: ParseOptions.C:48
void NAMD_bug(const char *err_msg)
Definition: common.C:195
ReductionValue & item(int index)
Definition: ReductionMgr.C:633
int requireRegistered
Definition: ReductionMgr.h:223
ReductionSetData * removeData(int seqNum)
Definition: ReductionMgr.C:104
int submitsRegistered
Definition: ReductionMgr.h:218
void NAMD_die(const char *err_msg)
Definition: common.C:147
std::vector< std::string > split(const std::string &text, std::string delimiter)
Definition: MoleculeQM.C:74
void remoteSubmit(ReductionSubmitMsg *msg)
Definition: ReductionMgr.C:448
void setVal(const NodeReduction *other)
Definition: ReductionMgr.C:647
RequireReduction * willRequire(int setID, int size=-1)
Definition: ReductionMgr.C:527
friend class RequireReduction
Definition: ReductionMgr.h:238
void remoteUnregister(ReductionRegisterMsg *msg)
Definition: ReductionMgr.C:433
double reducedValue
Definition: ReductionMgr.h:351
double BigReal
Definition: common.h:123
ReductionSetData * dataQueue
Definition: ReductionMgr.h:220