Main Page   Namespace List   Class Hierarchy   Alphabetical List   Compound List   File List   Namespace Members   Compound Members   File Members   Related Pages  

cmd_parallel.C

Go to the documentation of this file.
00001 /***************************************************************************
00002  *cr                                                                       
00003  *cr            (C) Copyright 1995-2019 The Board of Trustees of the           
00004  *cr                        University of Illinois                       
00005  *cr                         All Rights Reserved                        
00006  *cr                                                                   
00007  ***************************************************************************/
00008 
00009 /***************************************************************************
00010  * RCS INFORMATION:
00011  *
00012  *      $RCSfile: cmd_parallel.C,v $
00013  *      $Author: johns $        $Locker:  $             $State: Exp $
00014  *      $Revision: 1.35 $       $Date: 2020/07/08 04:19:30 $
00015  *
00016  ***************************************************************************
00017  * DESCRIPTION:
00018  *   MPI-related text commands
00019  *
00020  * These commands are not logged or otherwise sent through the normal
00021  * VMD command queue as they are latency sensitive, and there would be
00022  * no obvious reason one would want to log them to files or to the console.
00023  *
00024  ***************************************************************************/
00025 
00026 #if defined(VMDMPI)
00027 #include <mpi.h>    // XXX this is a short term hack for testing only! 
00028                     // We should not be making MPI calls directly here,
00029                     // but rather calling via VMDApp, so that we can
00030                     // implement the needed MPI functionality in a dynamically
00031                     // loaded plugin, rather than being hard-compiled in...
00032 
00033 // Check to see if we have to pass the MPI_IN_PLACE flag
00034 // for in-place allgather reductions (same approach as Tachyon)
00035 #if !defined(USE_MPI_IN_PLACE)
00036 #if (MPI_VERSION >= 2) || defined(MPI_IN_PLACE)
00037 #define USE_MPI_IN_PLACE 1
00038 #endif
00039 #endif
00040 
00041 #endif
00042 
00043 #include <stdio.h>
00044 #include <string.h>
00045 #include <stdlib.h>
00046 #include <tcl.h>
00047 #include "config.h"
00048 #include "CommandQueue.h"
00049 #include "Command.h"
00050 #include "VMDApp.h"
00051 #include "Inform.h" // debugging
00052 #include "WKFThreads.h"
00053 
00054 #define VMD_MPI_TAG_ALLREDUCE_ARGLENGTH 1
00055 #define VMD_MPI_TAG_ALLREDUCE_PAYLOAD   2
00056 #define VMD_MPI_TAG_FOR_REQUEST         3
00057 
00058 
00059 
00060 //
00061 // XXX hack to provide Swift/T with access to VMD's communicator
00062 //
00063 #if defined(VMDMPI)
00064 //
00065 // XXX global variable hack that needs to go away...
00066 // MPI communicator made available to Swift/T during startup
00067 //
00068 MPI_Comm turbine_adlb_comm;
00069 #endif
00070 
00071 extern int swift_mpi_init(Tcl_Interp *interp) {
00072 #if defined(VMDMPI)
00073   if (getenv("VMDNOSWIFTCOMM") == NULL) {
00074     if (MPI_SUCCESS == MPI_Comm_dup(MPI_COMM_WORLD, &turbine_adlb_comm)) {
00075       Tcl_Obj* TURBINE_ADLB_COMM = Tcl_NewStringObj("TURBINE_ADLB_COMM", -1);
00076       // XXX this is another gross hack.  This passes the MPI communicator pointer 
00077       //     as if it were a long, to make it available through Tcl, 
00078       //     but there MUST be a better way.  This is copied from what was done in NAMD.
00079       Tcl_Obj* adlb_comm_ptr = Tcl_NewLongObj((long) &turbine_adlb_comm);
00080       Tcl_ObjSetVar2(interp, TURBINE_ADLB_COMM, NULL, adlb_comm_ptr, 0);
00081     }
00082   }
00083 #endif
00084 
00085   return 0;
00086 }
00087 
00088 
00089 //
00090 // MPI dynamic work scheduler function
00091 //
00092 #if defined(VMDMPI)
00093 
00094 typedef struct {
00095   int numnodes;
00096   wkf_tasktile_t loop;
00097   wkf_shared_iterator_t iter;
00098 } parallel_for_parms;
00099 
00100 extern "C" void *vmd_mpi_parallel_for_scheduler(void *voidparms) {
00101   parallel_for_parms *parfor = (parallel_for_parms *) voidparms;
00102 
00103   // Run the for loop management code on node zero.
00104   // Do the work on all the other nodes...
00105 #if defined(VMDTHREADS)
00106   int i;
00107   wkf_tasktile_t curtile;
00108   while (wkf_shared_iterator_next_tile(&parfor->iter, 1, &curtile) != WKF_SCHED_DONE) {
00109     i = curtile.start;
00110 #else
00111   int i;
00112   for (i=parfor->loop.start; i<parfor->loop.end; i++) {
00113 #endif
00114     int reqnode;
00115     MPI_Status rcvstat;
00116     MPI_Recv(&reqnode, 1, MPI_INT, MPI_ANY_SOURCE, VMD_MPI_TAG_FOR_REQUEST, 
00117              MPI_COMM_WORLD, &rcvstat); 
00118     MPI_Send(&i, 1, MPI_INT, reqnode, VMD_MPI_TAG_FOR_REQUEST, 
00119              MPI_COMM_WORLD);
00120   }
00121 
00122   // tell all nodes we're done with all of the work
00123   int node;
00124   for (node=1; node<parfor->numnodes; node++) {
00125     int reqnode;
00126     MPI_Status rcvstat;
00127     MPI_Recv(&reqnode, 1, MPI_INT, MPI_ANY_SOURCE, VMD_MPI_TAG_FOR_REQUEST, 
00128              MPI_COMM_WORLD, &rcvstat); 
00129 
00130     i=-1; // indicate that the for loop is completed
00131     MPI_Send(&i, 1, MPI_INT, reqnode, VMD_MPI_TAG_FOR_REQUEST, 
00132              MPI_COMM_WORLD);
00133   }
00134 
00135   return NULL;
00136 }
00137 
00138 #endif
00139 
00140 
00141 
00142 int text_cmd_parallel(ClientData cd, Tcl_Interp *interp, int argc, const char *argv[]) {
00143   VMDApp *app = (VMDApp *)cd;
00144 
00145   if(argc<2) {
00146     Tcl_SetResult(interp,
00147       (char *)
00148       "Parallel job query commands:\n"
00149       "  parallel nodename\n"
00150       "  parallel noderank\n"
00151       "  parallel nodecount\n"
00152       "Parallel collective operations (all nodes MUST participate):\n"
00153       "  parallel allgather <object>\n"
00154       "  parallel allreduce <tcl reduction proc> <object>\n"
00155       "  parallel barrier\n"
00156       "  parallel for <startcount> <endcount> <tcl callback proc> <user data>",
00157       TCL_STATIC);
00158     return TCL_ERROR;
00159   }
00160 
00161   // XXX hack to make Swift/T cooperate with VMD when using VMD's MPI
00162   // communicator
00163   if (!strcmp(argv[1], "swift_clone_communicator")) {
00164     swift_mpi_init(interp);
00165     return TCL_OK;
00166   }
00167 
00168   // return the MPI node name
00169   if (!strcmp(argv[1], "nodename")) {
00170     Tcl_Obj *tcl_result = Tcl_NewListObj(0, NULL);
00171     Tcl_ListObjAppendElement(interp, tcl_result, Tcl_NewStringObj(app->par_name(), strlen(app->par_name())));
00172     Tcl_SetObjResult(interp, tcl_result);
00173     return TCL_OK;
00174   }
00175 
00176   // return the MPI node rank
00177   if (!strcmp(argv[1], "noderank")) {
00178     Tcl_Obj *tcl_result = Tcl_NewListObj(0, NULL);
00179     Tcl_ListObjAppendElement(interp, tcl_result, Tcl_NewIntObj(app->par_rank()));
00180     Tcl_SetObjResult(interp, tcl_result);
00181     return TCL_OK;
00182   }
00183 
00184   // return the MPI node count
00185   if (!strcmp(argv[1], "nodecount")) {
00186     Tcl_Obj *tcl_result = Tcl_NewListObj(0, NULL);
00187     Tcl_ListObjAppendElement(interp, tcl_result, Tcl_NewIntObj(app->par_size()));
00188     Tcl_SetObjResult(interp, tcl_result);
00189     return TCL_OK;
00190   }
00191 
00192   // execute an MPI barrier
00193   if(!strupncmp(argv[1], "barrier", CMDLEN) && argc==2) {
00194     app->par_barrier();
00195     return TCL_OK;
00196   }
00197 
00198 
00199   // Execute a parallel for loop across all nodes
00200   //
00201   //  parallel for <startcount> <endcount> <callback proc> <user data>",
00202   //
00203   if (!strupncmp(argv[1], "for", CMDLEN)) {
00204     int isok = (argc == 6);
00205     int N = app->par_size();
00206     int start, end;
00207 
00208     if (Tcl_GetInt(interp, argv[2], &start) != TCL_OK ||
00209         Tcl_GetInt(interp, argv[3], &end) != TCL_OK) {
00210       isok = 0;
00211     }
00212 
00213     //
00214     // If there's only one node, short-circuit the parallel for
00215     //
00216     if (N == 1) {
00217       if (!isok) {
00218         Tcl_SetResult(interp, (char *) "invalid parallel for, missing parameter", TCL_STATIC);
00219         return TCL_ERROR;
00220       }
00221 
00222       // run for loop on one node...
00223       int i;
00224       for (i=start; i<=end; i++) { 
00225         char istr[128] = { 0 };
00226         sprintf(istr, "%d", i);
00227         if (Tcl_VarEval(interp, argv[4], " ", istr, " {",
00228                         argv[5], "} ", NULL) != TCL_OK) {
00229           Tcl_SetResult(interp, (char *) "error occured during parallel for", TCL_STATIC);
00230         }
00231       }
00232 
00233       return TCL_OK;
00234     }
00235 
00236 #if defined(VMDMPI)
00237     int allok = 0;
00238 
00239     // Check all node result codes before we continue with the reduction
00240     MPI_Allreduce(&isok, &allok, 1, MPI_INT, MPI_LAND, MPI_COMM_WORLD);
00241 
00242     // XXX we may want to verify that all nodes are going to call the same
00243     // reduction proc here before continuing further.
00244 
00245     if (!allok) {
00246       Tcl_SetResult(interp, (char *) "invalid parallel for, missing parameter on one or more nodes", TCL_STATIC);
00247       return TCL_ERROR;
00248     }
00249 
00250     // Run the for loop management code on node zero.
00251     // Do the work on all the other nodes...
00252     int i;
00253     if (app->par_rank() == 0) {
00254       // use multithreaded code path
00255       parallel_for_parms parfor;
00256       memset(&parfor, 0, sizeof(parfor));
00257       parfor.numnodes = N;
00258       parfor.loop.start=start;
00259       parfor.loop.end=end+1;
00260       wkf_shared_iterator_init(&parfor.iter);
00261       wkf_shared_iterator_set(&parfor.iter, &parfor.loop);
00262 
00263 #if defined(VMDTHREADS)
00264       // run the MPI scheduler in a new child thread
00265       wkf_thread_t pft;
00266       wkf_thread_create(&pft, vmd_mpi_parallel_for_scheduler, &parfor);
00267 
00268       // run the Tcl in the main thread
00269       wkf_tasktile_t curtile;
00270       while (wkf_shared_iterator_next_tile(&parfor.iter, 1, &curtile) != WKF_SCHED_DONE) {
00271         i = curtile.start;
00272         char istr[128] = { 0 };
00273         sprintf(istr, "%d", i);
00274         if (Tcl_VarEval(interp, argv[4], " ", istr, " {",
00275                         argv[5], "} ", NULL) != TCL_OK) {
00276           Tcl_SetResult(interp, (char *) "error occured during parallel for", TCL_STATIC);
00277         }
00278       }
00279 
00280       // join up with the MPI scheduler thread
00281       wkf_thread_join(pft, NULL);
00282 #else
00283       // if no threads, node zero only runs the scheduler and doesn't do work
00284       vmd_mpi_parallel_for_scheduler(&parfor);
00285 #endif
00286 
00287       wkf_shared_iterator_destroy(&parfor.iter);
00288     } else {
00289       char istr[128] = { 0 };
00290       int done=0;
00291       int mynode=app->par_rank();
00292       while (!done) {
00293         MPI_Send(&mynode, 1, MPI_INT, 0, VMD_MPI_TAG_FOR_REQUEST, 
00294                  MPI_COMM_WORLD);
00295 
00296         MPI_Status rcvstat;
00297         MPI_Recv(&i, 1, MPI_INT, MPI_ANY_SOURCE, VMD_MPI_TAG_FOR_REQUEST, 
00298                  MPI_COMM_WORLD, &rcvstat); 
00299         if (i == -1) {
00300           done = 1;
00301         } else {
00302           sprintf(istr, "%d", i);
00303           if (Tcl_VarEval(interp, argv[4], " ", istr, " {",
00304                           argv[5], "} ", NULL) != TCL_OK) {
00305             Tcl_SetResult(interp, (char *) "error occured during parallel for", TCL_STATIC);
00306           }
00307         }
00308       }
00309     }
00310 #endif
00311 
00312     return TCL_OK;
00313   }
00314 
00315 
00316   // Execute an allgather producing a Tcl list of the per-node contributions
00317   //
00318   // parallel allgather <object>
00319   //
00320   if (!strupncmp(argv[1], "allgather", CMDLEN)) {
00321     int isok = (argc == 3);
00322     int N = app->par_size();
00323 
00324     // when running on a single-node or MPI is disabled at runtime, 
00325     // bypass all MPI calls with the single-node fast path, which makes 
00326     // the code both faster and implicitly handles the case where 
00327     // MPI support is compiled and linked in, but disabled at runtime,
00328     // e.g., when running a VMD binary on a Cray login node.
00329     if (N == 1) {
00330       if (!isok) {
00331         Tcl_SetResult(interp, (char *) "invalid parallel gather, missing parameter on one or more nodes", TCL_STATIC);
00332         return TCL_ERROR;
00333       }
00334 
00335       Tcl_Obj *tcl_result = Tcl_NewListObj(0, NULL);
00336       Tcl_ListObjAppendElement(interp, tcl_result, Tcl_NewStringObj(argv[2], strlen(argv[2])));
00337       Tcl_SetObjResult(interp, tcl_result);
00338       return TCL_OK;
00339     } 
00340 #if defined(VMDMPI)
00341     else {
00342       int allok = 0;
00343       int i;
00344 
00345       // Check all node result codes before we continue with the gather
00346       MPI_Allreduce(&isok, &allok, 1, MPI_INT, MPI_LAND, MPI_COMM_WORLD);
00347 
00348       if (!allok) {
00349         Tcl_SetResult(interp, (char *) "invalid parallel gather, missing parameter on one or more nodes", TCL_STATIC);
00350         return TCL_ERROR;
00351       }
00352 
00353       // Collect parameter size data so we can allocate result buffers
00354       // before executing the gather
00355       int *szlist = new int[N];
00356       szlist[app->par_rank()] = strlen(argv[2])+1;
00357 
00358 #if defined(USE_MPI_IN_PLACE)
00359       // MPI >= 2.x implementations (e.g. NCSA/Cray Blue Waters)
00360       MPI_Allgather(MPI_IN_PLACE, 1, MPI_INT,
00361                     &szlist[0], 1, MPI_INT, MPI_COMM_WORLD);
00362 #else
00363       // MPI 1.x
00364       MPI_Allgather(&szlist[app->par_rank()], 1, MPI_INT,
00365                     &szlist[0], 1, MPI_INT, MPI_COMM_WORLD);
00366 #endif
00367 
00368       int totalsz = 0;
00369       int *displist = new int[N];
00370       for (i=0; i<N; i++) {
00371         displist[i]=totalsz;
00372         totalsz+=szlist[i];
00373       }
00374 
00375       char *recvbuf = new char[totalsz];
00376       memset(recvbuf, 0, totalsz);
00377 
00378       // Copy this node's data into the correct array position
00379       strcpy(&recvbuf[displist[app->par_rank()]], argv[2]);
00380 
00381       // Perform the parallel gather 
00382 #if defined(USE_MPI_IN_PLACE)
00383       // MPI >= 2.x implementations (e.g. NCSA/Cray Blue Waters)
00384       MPI_Allgatherv(MPI_IN_PLACE, szlist[app->par_rank()], MPI_BYTE,
00385                      &recvbuf[0], szlist, displist, MPI_BYTE, MPI_COMM_WORLD);
00386 #else
00387       // MPI 1.x
00388       MPI_Allgatherv(&recvbuf[displist[app->par_rank()]], szlist[app->par_rank()], MPI_BYTE,
00389                      &recvbuf[0], szlist, displist, MPI_BYTE, MPI_COMM_WORLD);
00390 #endif
00391 
00392       // Build Tcl result from the array of results
00393       Tcl_Obj *tcl_result = Tcl_NewListObj(0, NULL);
00394       for (i=0; i<N; i++) {
00395         Tcl_ListObjAppendElement(interp, tcl_result, Tcl_NewStringObj(&recvbuf[displist[i]], szlist[i]-1));
00396       }
00397       Tcl_SetObjResult(interp, tcl_result);
00398 
00399       delete [] recvbuf;
00400       delete [] displist;
00401       delete [] szlist;
00402       return TCL_OK;
00403     }
00404 #endif
00405   }
00406 
00407 
00408   //
00409   // Execute an All-Reduce across all of the nodes.
00410   // The user must provide a Tcl proc that performs the appropriate reduction
00411   // operation for a pair of data items, resulting in a single item.
00412   // Since the user may pass floating point data or perform reductions 
00413   // that give very slightly different answers depending on the order of
00414   // operations, the architecture or the host, or whether reductions on 
00415   // a given host are occuring on the CPU or on a heterogeneous accelerator 
00416   // or GPU of some kind, we must ensure that all nodes get a bit-identical
00417   // result.  When heterogeneous accelerators are involved, we can really 
00418   // only guarantee this by implementing the All-Reduce with a 
00419   // Reduce-then-Broadcast approach, where the reduction collapses the
00420   // result down to node zero, which then does a broadcast to all peers. 
00421   //
00422   // parallel allreduce <tcl reduction proc> <object>
00423   //
00424   if (!strupncmp(argv[1], "allreduce", CMDLEN)) {
00425     int isok = (argc == 4);
00426     int N = app->par_size();
00427 
00428     //
00429     // If there's only one node, short-circuit the full parallel reduction
00430     //
00431     if (N == 1) {
00432       if (!isok) {
00433         Tcl_SetResult(interp, (char *) "invalid parallel reduction, missing parameter", TCL_STATIC);
00434         return TCL_ERROR;
00435       }
00436 
00437       // return our result, no other reduction is necessary
00438       Tcl_SetObjResult(interp, Tcl_NewStringObj(argv[3], strlen(argv[3])));
00439       return TCL_OK;
00440     }
00441 
00442 #if 1 && defined(VMDMPI)
00443     //
00444     // All-Reduce implementation based on a ring reduction followed by a 
00445     // broadcast from node zero.  This implementation gaurantees strict
00446     // ordering and will properly handle the case where one or more nodes
00447     // perform their reduction with slightly differing floating point 
00448     // rounding than others (e.g. using GPUs, heterogeneous nodes, etc),
00449     // and it works with any number of nodes.  While NOT latency-optimal,
00450     // this implementation is close to bandwidth-optimal which is helpful
00451     // for workstation clusters on non-switched networks or networks with
00452     // switches that cannot operate in a fully non-blocking manner.
00453     //
00454     int allok = 0;
00455 
00456     // Check all node result codes before we continue with the reduction
00457     MPI_Allreduce(&isok, &allok, 1, MPI_INT, MPI_LAND, MPI_COMM_WORLD);
00458 
00459     // XXX we may want to verify that all nodes are going to call the same
00460     // reduction proc here before continuing further.
00461 
00462     if (!allok) {
00463       Tcl_SetResult(interp, (char *) "invalid parallel reduction, missing parameter on one or more nodes", TCL_STATIC);
00464       return TCL_ERROR;
00465     }
00466 
00467     // copy incoming data into initial "result" object
00468     Tcl_Obj *resultobj = Tcl_NewStringObj((const char *) argv[3], strlen(argv[3])+1);
00469 
00470     // A ring-based all-reduce implementation which should be 
00471     // close to bandwidth-optimal, at the cost of additional latency.
00472     int src=app->par_rank(); // src node is this node
00473     int Ldest = (N + src + 1) % N; // compute left peer
00474     int Rdest = (N + src - 1) % N; // compute right peer
00475     MPI_Status status;
00476 
00477     if (src != 0) {
00478       int recvsz = 0;
00479 
00480       // Post blocking receive for data size
00481       MPI_Recv(&recvsz, 1, MPI_INT, Ldest,
00482                VMD_MPI_TAG_ALLREDUCE_ARGLENGTH, MPI_COMM_WORLD, &status);
00483 
00484       // Allocate or resize receive buffer 
00485       char * recvbuf = (char *) malloc(recvsz);
00486 
00487       // Post non-blocking receive for data
00488       MPI_Recv(recvbuf, recvsz, MPI_BYTE, Ldest,
00489                VMD_MPI_TAG_ALLREDUCE_PAYLOAD, MPI_COMM_WORLD, &status);
00490 
00491       // Perform reduction
00492       // Perform the reduction operation on our existing and incoming data.
00493       // We build a Tcl command string with the user-defined proc, this 
00494       // node's previous resultand, and the incoming data, and evaluate it.
00495       if (Tcl_VarEval(interp, argv[2], " ", 
00496                       Tcl_GetString(resultobj), " ", 
00497                       recvbuf, NULL) != TCL_OK) {
00498         printf("Error occured during reduction!\n");    
00499       }
00500 
00501       // Prep for next reduction step.  Set result object to result of
00502       // the latest communication/reduction phase.
00503       resultobj = Tcl_GetObjResult(interp);
00504 
00505       // Free the receive buffer
00506       free(recvbuf);
00507     } 
00508   
00509     //
00510     // All nodes
00511     //  
00512     char *sendbuf = Tcl_GetString(resultobj);
00513     int sendsz = strlen(sendbuf)+1;
00514 
00515     // Post blocking send for data size
00516     MPI_Send(&sendsz, 1, MPI_INT, Rdest,
00517              VMD_MPI_TAG_ALLREDUCE_ARGLENGTH, MPI_COMM_WORLD);
00518 
00519     // Post blocking send for data
00520     MPI_Send(sendbuf, sendsz, MPI_BYTE, Rdest,
00521              VMD_MPI_TAG_ALLREDUCE_PAYLOAD, MPI_COMM_WORLD);
00522 
00523     if (src == 0) {
00524       int recvsz = 0;
00525 
00526       // Post blocking receive for data size
00527       MPI_Recv(&recvsz, 1, MPI_INT, Ldest,
00528                VMD_MPI_TAG_ALLREDUCE_ARGLENGTH, MPI_COMM_WORLD, &status);
00529 
00530       // Allocate or resize receive buffer 
00531       char * recvbuf = (char *) malloc(recvsz);
00532 
00533       // Post non-blocking receive for data
00534       MPI_Recv(recvbuf, recvsz, MPI_BYTE, Ldest,
00535                VMD_MPI_TAG_ALLREDUCE_PAYLOAD, MPI_COMM_WORLD, &status);
00536 
00537       // Perform reduction
00538       // Perform the reduction operation on our existing and incoming data.
00539       // We build a Tcl command string with the user-defined proc, this 
00540       // node's previous result and the incoming data, and evaluate it.
00541       if (Tcl_VarEval(interp, argv[2], " ", 
00542                       Tcl_GetString(resultobj), " ", 
00543                       recvbuf, NULL) != TCL_OK) {
00544         printf("Error occured during reduction!\n");    
00545       }
00546 
00547       // Prep for next reduction step.  Set result object to result of
00548       // the latest communication/reduction phase.
00549       resultobj = Tcl_GetObjResult(interp);
00550 
00551       // Free the receive buffer
00552       free(recvbuf);
00553     } 
00554 
00555     //
00556     // Broadcast final result from root to peers
00557     //
00558     if (src == 0) {
00559       // update send buffer for root node before broadcast
00560       sendbuf = Tcl_GetString(resultobj);
00561       sendsz = strlen(sendbuf)+1;
00562       MPI_Bcast(&sendsz, 1, MPI_INT, 0, MPI_COMM_WORLD);
00563       MPI_Bcast(sendbuf, sendsz, MPI_BYTE, 0, MPI_COMM_WORLD);
00564     } else {
00565       int recvsz = 0;
00566       MPI_Bcast(&recvsz, 1, MPI_INT, 0, MPI_COMM_WORLD);
00567 
00568       // Allocate or resize receive buffer 
00569       char * recvbuf = (char *) malloc(recvsz);
00570 
00571       MPI_Bcast(recvbuf, recvsz, MPI_BYTE, 0, MPI_COMM_WORLD);
00572 
00573       // Set the final Tcl result if necessary
00574       Tcl_SetObjResult(interp, Tcl_NewStringObj(recvbuf, recvsz-1));
00575 
00576       // Free the receive buffer
00577       free(recvbuf);
00578     }
00579 
00580     return TCL_OK;
00581 
00582 #elif defined(VMDMPI)
00583 
00584     //
00585     // Power-of-two-only hypercube/butterfly/recursive doubling 
00586     // All-Reduce implementation.  This implementation can't be used
00587     // in the case that we have either a non-power-of-two node count or
00588     // in the case where we have heterogeneous processing units that may
00589     // yield different floating point rounding.  For now we leave this
00590     // implementation in the code for performance comparisons until we work
00591     // out the changes necessary to make it closer to bandwidth-optimal,
00592     // heterogeneous-safe, and non-power-of-two capable.
00593     //
00594     int allok = 0;
00595     int i;
00596 
00597     // Check all node result codes before we continue with the reduction
00598     MPI_Allreduce(&isok, &allok, 1, MPI_INT, MPI_LAND, MPI_COMM_WORLD);
00599 
00600     // XXX we may want to verify that all nodes are going to call the same
00601     // reduction proc here before continuing further.
00602 
00603     if (!allok) {
00604       Tcl_SetResult(interp, (char *) "invalid parallel reduction, missing parameter on one or more nodes", TCL_STATIC);
00605       return TCL_ERROR;
00606     }
00607 
00608     // Calculate number of reduction phases required
00609     int log2N;
00610     for (log2N=0; N>1; N>>=1) {
00611       log2N++;
00612 
00613       // XXX bail out of we don't have a power-of-two node count, 
00614       //     at least until we implement 3-2 reduction phases
00615       if ((N & 1) && (N > 1)) {
00616         Tcl_SetResult(interp, (char *) "parallel allreduce only allowed for even power-of-two node count", TCL_STATIC);
00617         return TCL_ERROR;
00618       }
00619     }
00620     N = app->par_size();
00621 
00622     // copy incoming data into initial "result" object
00623     Tcl_Obj *resultobj = Tcl_NewStringObj((const char *) argv[3], strlen(argv[3])+1);
00624 
00625     // An all-reduce tree with hypercube connectivity with 
00626     // log2(N) communication/reduction phases.  At each phase, we compute
00627     // the peer/destination node we will communicate with using an XOR of
00628     // our node ID with the current hypercube dimension.  If we have an
00629     // incomplete hypercube topology (e.g. non-power-of-two node count), 
00630     // we have to do special 3-2 communication rounds (not implemented yet).
00631     // The current implementation requires that all existing nodes 
00632     // participate, and that they contribute a valid data item.
00633     // If we wish to support reductions where a node may not contribute,
00634     // we would need to handle that similarly to a peer node that doesn't 
00635     // exist, but we would likely determine this during the parameter length
00636     // exchange step.
00637     int src=app->par_rank(); // src node is this node
00638     for (i=0; i<log2N; i++) {
00639       int mask = 1 << i;     // generate bitmask to use in the XOR
00640       int dest = src ^ mask; // XOR src node with bitmask to find dest node
00641       Tcl_Obj *oldresultobj = resultobj; // track old result 
00642 
00643       // Check to make sure dest node exists for non-power-of-two 
00644       // node counts (an incomplete hypercube).  If not, skip to the next
00645       // communication/reduction phase.
00646       if (dest < N) {
00647         char *sendbuf = Tcl_GetString(oldresultobj);
00648         int sendsz = strlen(sendbuf)+1;
00649         int recvsz = 0;
00650         MPI_Request handle;
00651         MPI_Status status;
00652 
00653         //
00654         // Exchange required receive buffer size for data exchange with peer
00655         //
00656 
00657         // Post non-blocking receive for data size
00658         MPI_Irecv(&recvsz, 1, MPI_INT, dest,
00659                   VMD_MPI_TAG_ALLREDUCE_ARGLENGTH, MPI_COMM_WORLD, &handle);
00660 
00661         // Post blocking send for data size
00662         MPI_Send(&sendsz, 1, MPI_INT, dest,
00663                  VMD_MPI_TAG_ALLREDUCE_ARGLENGTH, MPI_COMM_WORLD);
00664 
00665         // Wait for non-blocking receive of data size to complete
00666         MPI_Wait(&handle, &status); 
00667 
00668 // printf("src[%d], dest[%d], value '%s', recvsz: %d\n", src, dest, sendbuf, recvsz);
00669 
00670         // Allocate or resize receive buffer 
00671         char * recvbuf = (char *) malloc(recvsz);
00672 
00673         //
00674         // Exchange the data payload
00675         // 
00676 
00677         // Post non-blocking receive for data
00678         MPI_Irecv(recvbuf, recvsz, MPI_BYTE, dest,
00679                   VMD_MPI_TAG_ALLREDUCE_PAYLOAD, MPI_COMM_WORLD, &handle);
00680         
00681         // Post blocking send for data
00682         MPI_Send(sendbuf, sendsz, MPI_BYTE, dest,
00683                  VMD_MPI_TAG_ALLREDUCE_PAYLOAD, MPI_COMM_WORLD);
00684 
00685         // Wait for receive of data
00686         MPI_Wait(&handle, &status); 
00687 
00688         // Perform the reduction operation on our existing and incoming data.
00689         // We build a Tcl command string with the user-defined proc, this 
00690         // node's previous result and the incoming data, and evaluate it.
00691         if (Tcl_VarEval(interp, argv[2], " ", 
00692                         sendbuf, " ", recvbuf, NULL) != TCL_OK) {
00693           printf("Error occured during reduction!\n");    
00694         }
00695 
00696         // Free the receive buffer
00697         free(recvbuf);
00698 
00699         // Prep for next reduction step.  Set result object to result of
00700         // the latest communication/reduction phase.
00701         resultobj = Tcl_GetObjResult(interp);
00702       }
00703     }
00704 
00705     // Set the final Tcl result if necessary
00706     Tcl_SetObjResult(interp, resultobj);
00707 
00708     return TCL_OK;
00709 #endif
00710   }
00711 
00712   Tcl_SetResult(interp, (char *) "invalid parallel subcommand.", TCL_STATIC);
00713 
00714   return TCL_ERROR;
00715 }
00716 

Generated on Wed Dec 4 02:43:03 2024 for VMD (current) by doxygen1.2.14 written by Dimitri van Heesch, © 1997-2002