22 #include "LdbCoordinator.decl.h" 30 #define MIN_DEBUG_LEVEL 3 41 #include "ComputeMgr.decl.h" 45 #if CONVERSE_VERSION_ELAN 46 extern "C" void enableBlockingReceives();
47 extern "C" void disableBlockingReceives();
56 #ifndef LB_MANAGER_VERSION 72 if ( msg->
to != CkMyPe() ) {
73 CProxy_LdbCoordinator ldbProxy(thisgroup);
74 ldbProxy[CkMyPe()].RecvMigrate(msg);
82 CkPrintf(
"I'm supposed to set stats\n");
87 CkPrintf(
"I'm supposed to query load\n");
93 #if CONVERSE_VERSION_ELAN 113 CkCallback cb(CkIndex_LdbCoordinator::nodeDone(NULL), 0, thisgroup);
114 contribute(0, NULL, CkReduction::random, cb);
119 if (CkpvAccess(LdbCoordinator_instance) == NULL) {
120 CkpvAccess(LdbCoordinator_instance) =
this;
122 NAMD_bug(
"LdbCoordinator instanced twice on same node!");
147 #ifndef LB_MANAGER_VERSION 158 #ifdef LB_MANAGER_VERSION 204 CkPrintf(
"LDB: Central LB being created...\n");
207 CkPrintf(
"LDB: Hybrid LB being created...\n");
219 int lastLdbStep =
simParams->lastLdbStep;
220 int stepsPerCycle =
simParams->stepsPerCycle;
258 NAMD_die(
"Disaggreement in patchMap data.\n");
267 #
if (defined(NAMD_CUDA) || defined(NAMD_HIP) || defined(NAMD_MIC))
268 #if defined(NAMD_MIC) 276 #
if (defined(NAMD_CUDA) || defined(NAMD_HIP)) && defined(BONDED_CUDA)
343 NAMD_bug(
"LdbCoordinator found too many local patches!");
360 #
if (defined(NAMD_CUDA) || defined(NAMD_HIP) || defined(NAMD_MIC))
361 #if defined(NAMD_MIC) 369 #
if (defined(NAMD_CUDA) || defined(NAMD_HIP)) && defined(BONDED_CUDA)
401 if ( ! c )
NAMD_bug(
"LdbCoordinator::initialize() null compute pointer");
406 #
if (defined(NAMD_CUDA) || defined(NAMD_HIP)) && defined(BONDED_CUDA)
435 if ( ! c )
NAMD_bug(
"LdbCoordinator::initialize() null compute pointer");
450 if ( ! c )
NAMD_bug(
"LdbCoordinator::initialize() null compute pointer 2");
561 #if 0 //replaced by traceBarrier at Controller and Sequencer 562 if (traceAvailable()) {
563 static int specialTracing = 0;
564 if (
ldbCycleNum == 1 && traceIsOn() == 0) specialTracing = 1;
565 if (specialTracing) {
584 int freq =
simParams->multigratorPressureFreq;
587 if ((step % freq) != 0) dstep = freq - (step % freq);
591 if (step==0) numPressureCycles--;
613 CmiAssert(
id >=0 &&
id <
nPatches);
618 DebugM(10,
"::patchLoad() Unexpected patch reporting in\n");
636 iout <<
"LDB: ============= START OF LOAD BALANCING ============== " << CmiWallTimer() <<
"\n" <<
endi;
637 DebugM(3,
"Controller reached load balance barrier.\n");
641 CProxy_LdbCoordinator(thisgroup).barrier();
652 NAMD_bug(
"Load balancer received wrong number of events.\n");
661 iout <<
"LDB: ============== END OF LOAD BALANCING =============== " << CmiWallTimer() <<
"\n" <<
endi;
675 CProxy_ComputeMgr cm(CkpvAccess(BOCclass_group).computeMgr);
686 if ( m->
to != CkMyPe() ) {
689 CProxy_LdbCoordinator ldbProxy(thisgroup);
690 ldbProxy[m->
to].ExpectMigrate(m);
698 if ( m->
from != CkMyPe() ) {
708 DebugM(3,
"updateComputesReady()\n");
710 CProxy_LdbCoordinator(thisgroup).resume();
711 CkStartQD(CkIndex_LdbCoordinator::resumeReady((CkQdMsg*)0),&thishandle);
727 iout <<
"LDB: =============== DONE WITH MIGRATION ================ " << CmiWallTimer() <<
"\n" <<
endi;
728 DebugM(3,
"resumeReady()\n");
731 CProxy_LdbCoordinator(thisgroup).resume2();
738 #if CONVERSE_VERSION_ELAN 774 for (
int i = 0; i < numNeighbors; ++i ) {
776 if ( proxyNode != myNode ) {
778 for ( j = 0; j < nProxyNodes; ++j ) {
779 if ( neighborNodes[j] == proxyNode )
break;
781 if ( j == nProxyNodes ) {
782 neighborNodes[nProxyNodes] = proxyNode;
795 CkPrintf(
"%d:Patch report:\n",CkMyPe());
803 curLoc += sprintf(curLoc,
"%5d: %5d ",i,
patchNAtoms[i]);
806 if (((j % 4) == 0) && j)
809 CkPrintf(
"[%d]%s\n",CkMyPe(),outputBuf);
814 CkPrintf(
"%d:Compute report:\n",CkMyPe());
828 fprintf(fp,
"%4d ",nProxyNodes);
830 for(
int i=0;i<nProxyNodes;i++)
831 fprintf(fp,
"%4d ",neighborNodes[i]);
835 CProxy_LdbCoordinator(thisgroup)[0].collectLoads(msg);
840 if ( collPes == 0 ) {
842 initTotalProxies = 0;
843 finalTotalProxies = 0;
844 initMaxPeProxies = 0;
845 finalMaxPeProxies = 0;
846 initMaxPatchProxies = 0;
847 finalMaxPatchProxies = 0;
859 #define COLL_MAX(F) if ( msg->F > F ) F = msg->F; 860 #define COLL_AVG(F) F += msg->F * (double) numPes / (double) CkNumPes(); 861 #define COLL_SUM(F) F += msg->F; 880 if ( collPes == CkNumPes() ) {
882 iout <<
"LDB: TIME " << initTime <<
" LOAD: AVG " << initAvgPeLoad
883 <<
" MAX " << initMaxPeLoad <<
" PROXIES: TOTAL " << initTotalProxies <<
" MAXPE " <<
884 initMaxPeProxies <<
" MAXPATCH " << initMaxPatchProxies <<
" " <<
"None" 885 <<
" MEM: " << initMemory <<
" MB\n";
886 if ( reverted )
iout <<
"LDB: Reverting to original mapping on " << reverted <<
" balancers\n";
887 iout <<
"LDB: TIME " << finalTime <<
" LOAD: AVG " << finalAvgPeLoad
888 <<
" MAX " << finalMaxPeLoad <<
" PROXIES: TOTAL " << finalTotalProxies <<
" MAXPE " <<
889 finalMaxPeProxies <<
" MAXPATCH " << finalMaxPatchProxies <<
" " << msg->
strategyName 890 <<
" MEM: " << finalMemory <<
" MB\n";
898 #include "LdbCoordinator.def.h"
int requiredProxies(PatchID id, int [])
void sendCollectLoads(CollectLoadsMsg *)
void LdbCoordinator_initproc()
represents bonded compute
Controller * controllerThread
void collectLoads(CollectLoadsMsg *)
void resumeReady(CkQdMsg *msg)
static PatchMap * Object()
Sequencer ** sequencerThreads
SimParameters * simParameters
int nStatsMessagesExpected
LDObjHandle * patchHandles
void updateComputesReady()
void AtSyncBarrierReached(void)
void createLoadBalancer()
std::ostream & endi(std::ostream &s)
void Migrate(LDObjHandle handle, int dest)
void printRequiredProxies(PatchID id, FILE *fp)
HomePatch * homePatch(PatchID pid)
represents nonbonded or self compute
void awakenSequencers(void)
void patchLoad(PatchID id, int nAtoms, int timestep)
void ResumeFromSync(void)
void initialize(PatchMap *pmap, ComputeMap *cmap, int reinit=0)
int numPatches(void) const
LdbMigrateMsg * migrateMsgs
void CreateNamdHybridLB()
void NAMD_bug(const char *err_msg)
ComputeType type(ComputeID cid)
void rebalance(Sequencer *seq, PatchID id)
const int & LdbIdField(const LdbId &id, const int index)
void NAMD_die(const char *err_msg)
static LdbCoordinator * Object()
static void staticQueryEstLoadFn(LDOMHandle h)
void ExpectMigrate(LdbMigrateMsg *)
void nodeDone(CkReductionMsg *)
#define LDBAL_CENTRALIZED
static void staticReceiveAtSync(void *data)
int basenode(int pid) const
int downstreamNeighbors(int pid, PatchID *neighbor_ids)
Compute * compute(ComputeID cid)
static ComputeMap * Object()
void printLocalLdbReport(void)
computeInfo * computeArray
int nStatsMessagesReceived
int numPids(ComputeID cid)
static void staticMigrateFn(LDObjHandle handle, int dest)
int pid(ComputeID cid, int i)
LDBarrierClient ldBarrierHandle
static void staticResumeFromSync(void *data)
static void staticStatsFn(LDOMHandle h, int state)
void updateComputes(int, CkGroupID)
processorInfo * processorArray
void ExecuteMigrations(void)
void RecvMigrate(LdbMigrateMsg *)