Main Page   Namespace List   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members   File Members   Related Pages  

cmd_parallel.C

Go to the documentation of this file.
00001 /***************************************************************************
00002  *cr                                                                       
00003  *cr            (C) Copyright 1995-2011 The Board of Trustees of the           
00004  *cr                        University of Illinois                       
00005  *cr                         All Rights Reserved                        
00006  *cr                                                                   
00007  ***************************************************************************/
00008 
00009 /***************************************************************************
00010  * RCS INFORMATION:
00011  *
00012  *      $RCSfile: cmd_parallel.C,v $
00013  *      $Author: johns $        $Locker:  $             $State: Exp $
00014  *      $Revision: 1.27 $       $Date: 2012/03/20 15:43:24 $
00015  *
00016  ***************************************************************************
00017  * DESCRIPTION:
00018  *   MPI-related text commands
00019  *
00020  * These commands are not logged or otherwise sent through the normal
00021  * VMD command queue as they are latency sensitive, and there would be
00022  * no obvious reason one would want to log them to files or to the console.
00023  *
00024  ***************************************************************************/
00025 
00026 #if defined(VMDMPI)
00027 #include <mpi.h>    // XXX this is a short term hack for testing only! 
00028                     // We should not be making MPI calls directly here,
00029                     // but rather calling via VMDApp, so that we can
00030                     // implement the needed MPI functionality in a dynamically
00031                     // loaded plugin, rather than being hard-compiled in...
00032 #endif
00033 
00034 #include <stdio.h>
00035 #include <string.h>
00036 #include <stdlib.h>
00037 #include <tcl.h>
00038 #include "config.h"
00039 #include "CommandQueue.h"
00040 #include "Command.h"
00041 #include "VMDApp.h"
00042 #include "Inform.h" // debugging
00043 #include "WKFThreads.h"
00044 
00045 #define VMD_MPI_TAG_ALLREDUCE_ARGLENGTH 1
00046 #define VMD_MPI_TAG_ALLREDUCE_PAYLOAD   2
00047 #define VMD_MPI_TAG_FOR_REQUEST         3
00048 
00049 
00050 //
00051 // MPI dynamic work scheduler function
00052 //
00053 #if defined(VMDMPI)
00054 
00055 typedef struct {
00056   int numnodes;
00057   wkf_tasktile_t loop;
00058   wkf_shared_iterator_t iter;
00059 } parallel_for_parms;
00060 
00061 extern "C" void *vmd_mpi_parallel_for_scheduler(void *voidparms) {
00062   parallel_for_parms *parfor = (parallel_for_parms *) voidparms;
00063 
00064   // Run the for loop management code on node zero.
00065   // Do the work on all the other nodes...
00066 #if defined(VMDTHREADS)
00067   int i;
00068   wkf_tasktile_t curtile;
00069   while (wkf_shared_iterator_next_tile(&parfor->iter, 1, &curtile) != WKF_SCHED_DONE) {
00070     i = curtile.start;
00071 #else
00072   int i;
00073   for (i=parfor->loop.start; i<parfor->loop.end; i++) {
00074 #endif
00075     int reqnode;
00076     MPI_Status rcvstat;
00077     MPI_Recv(&reqnode, 1, MPI_INT, MPI_ANY_SOURCE, VMD_MPI_TAG_FOR_REQUEST, 
00078              MPI_COMM_WORLD, &rcvstat); 
00079     MPI_Send(&i, 1, MPI_INT, reqnode, VMD_MPI_TAG_FOR_REQUEST, 
00080              MPI_COMM_WORLD);
00081   }
00082 
00083   // tell all nodes we're done with all of the work
00084   int node;
00085   for (node=1; node<parfor->numnodes; node++) {
00086     int reqnode;
00087     MPI_Status rcvstat;
00088     MPI_Recv(&reqnode, 1, MPI_INT, MPI_ANY_SOURCE, VMD_MPI_TAG_FOR_REQUEST, 
00089              MPI_COMM_WORLD, &rcvstat); 
00090 
00091     i=-1; // indicate that the for loop is completed
00092     MPI_Send(&i, 1, MPI_INT, reqnode, VMD_MPI_TAG_FOR_REQUEST, 
00093              MPI_COMM_WORLD);
00094   }
00095 
00096   return NULL;
00097 }
00098 
00099 #endif
00100 
00101 
00102 
00103 int text_cmd_parallel(ClientData cd, Tcl_Interp *interp, int argc, const char *argv[]) {
00104   VMDApp *app = (VMDApp *)cd;
00105 
00106   if(argc<2) {
00107     Tcl_SetResult(interp,
00108       (char *)
00109       "Parallel job query commands:\n"
00110       "  parallel nodename\n"
00111       "  parallel noderank\n"
00112       "  parallel nodecount\n"
00113       "Parallel collective operations (all nodes MUST participate):\n"
00114       "  parallel allgather <object>\n"
00115       "  parallel allreduce <tcl reduction proc> <object>\n"
00116       "  parallel barrier\n"
00117       "  parallel for <startcount> <endcount> <tcl callback proc> <user data>",
00118       TCL_STATIC);
00119     return TCL_ERROR;
00120   }
00121 
00122   // return the MPI node name
00123   if (!strcmp(argv[1], "nodename")) {
00124     Tcl_Obj *tcl_result = Tcl_NewListObj(0, NULL);
00125     Tcl_ListObjAppendElement(interp, tcl_result, Tcl_NewStringObj(app->par_name(), strlen(app->par_name())));
00126     Tcl_SetObjResult(interp, tcl_result);
00127     return TCL_OK;
00128   }
00129 
00130   // return the MPI node rank
00131   if (!strcmp(argv[1], "noderank")) {
00132     Tcl_Obj *tcl_result = Tcl_NewListObj(0, NULL);
00133     Tcl_ListObjAppendElement(interp, tcl_result, Tcl_NewIntObj(app->par_rank()));
00134     Tcl_SetObjResult(interp, tcl_result);
00135     return TCL_OK;
00136   }
00137 
00138   // return the MPI node count
00139   if (!strcmp(argv[1], "nodecount")) {
00140     Tcl_Obj *tcl_result = Tcl_NewListObj(0, NULL);
00141     Tcl_ListObjAppendElement(interp, tcl_result, Tcl_NewIntObj(app->par_size()));
00142     Tcl_SetObjResult(interp, tcl_result);
00143     return TCL_OK;
00144   }
00145 
00146   // execute an MPI barrier
00147   if(!strupncmp(argv[1], "barrier", CMDLEN) && argc==2) {
00148     app->par_barrier();
00149     return TCL_OK;
00150   }
00151 
00152   // Execute a parallel for loop across all nodes
00153   //
00154   //  parallel for <startcount> <endcount> <callback proc> <user data>",
00155   //
00156   if(!strupncmp(argv[1], "for", CMDLEN)) {
00157     int isok = (argc == 6);
00158     int N = app->par_size();
00159     int start, end;
00160 
00161     if (Tcl_GetInt(interp, argv[2], &start) != TCL_OK ||
00162         Tcl_GetInt(interp, argv[3], &end) != TCL_OK) {
00163       isok = 0;
00164     }
00165 
00166     //
00167     // If there's only one node, short-circuit the parallel for
00168     //
00169     if (N == 1) {
00170       if (!isok) {
00171         Tcl_SetResult(interp, (char *) "invalid parallel for, missing parameter", TCL_STATIC);
00172         return TCL_ERROR;
00173       }
00174 
00175       // run for loop on one node...
00176       int i;
00177       for (i=start; i<=end; i++) { 
00178         char istr[128];
00179         sprintf(istr, "%d", i);
00180         if (Tcl_VarEval(interp, argv[4], " ", istr, " {",
00181                         argv[5], "} ", NULL) != TCL_OK) {
00182           Tcl_SetResult(interp, (char *) "error occured during parallel for", TCL_STATIC);
00183         }
00184       }
00185 
00186       return TCL_OK;
00187     }
00188 
00189 #if 1 && defined(VMDMPI)
00190     int allok = 0;
00191 
00192     // Check all node result codes before we continue with the reduction
00193     MPI_Allreduce(&isok, &allok, 1, MPI_INT, MPI_LAND, MPI_COMM_WORLD);
00194 
00195     // XXX we may want to verify that all nodes are going to call the same
00196     // reduction proc here before continuing further.
00197 
00198     if (!allok) {
00199       Tcl_SetResult(interp, (char *) "invalid parallel for, missing parameter on one or more nodes", TCL_STATIC);
00200       return TCL_ERROR;
00201     }
00202 
00203     // Run the for loop management code on node zero.
00204     // Do the work on all the other nodes...
00205     int i;
00206     if (app->par_rank() == 0) {
00207       // use multithreaded code path
00208 #if 1
00209       parallel_for_parms parfor;
00210       memset(&parfor, 0, sizeof(parfor));
00211       parfor.numnodes = N;
00212       parfor.loop.start=start;
00213       parfor.loop.end=end+1;
00214       wkf_shared_iterator_init(&parfor.iter);
00215       wkf_shared_iterator_set(&parfor.iter, &parfor.loop);
00216 
00217 #if defined(VMDTHREADS)
00218       // run the MPI scheduler in a new child thread
00219       wkf_thread_t pft;
00220       wkf_thread_create(&pft, vmd_mpi_parallel_for_scheduler, &parfor);
00221 
00222       // run the Tcl in the main thread
00223       wkf_tasktile_t curtile;
00224       while (wkf_shared_iterator_next_tile(&parfor.iter, 1, &curtile) != WKF_SCHED_DONE) {
00225         i = curtile.start;
00226         char istr[128];
00227         sprintf(istr, "%d", i);
00228         if (Tcl_VarEval(interp, argv[4], " ", istr, " {",
00229                         argv[5], "} ", NULL) != TCL_OK) {
00230           Tcl_SetResult(interp, (char *) "error occured during parallel for", TCL_STATIC);
00231         }
00232       }
00233 
00234       // join up with the MPI scheduler thread
00235       wkf_thread_join(pft, NULL);
00236 #else
00237       vmd_mpi_parallel_for_scheduler(&parfor);
00238 #endif
00239 
00240       wkf_shared_iterator_destroy(&parfor.iter);
00241 
00242 #else
00243       for (i=start; i<=end; i++) {
00244         int reqnode;
00245         MPI_Status rcvstat;
00246         MPI_Recv(&reqnode, 1, MPI_INT, MPI_ANY_SOURCE, VMD_MPI_TAG_FOR_REQUEST, 
00247                  MPI_COMM_WORLD, &rcvstat); 
00248         MPI_Send(&i, 1, MPI_INT, reqnode, VMD_MPI_TAG_FOR_REQUEST, 
00249                  MPI_COMM_WORLD);
00250       }
00251 
00252       // tell all nodes we're done with all of the work
00253       int node;
00254       for (node=1; node<N; node++) {
00255         int reqnode;
00256         MPI_Status rcvstat;
00257         MPI_Recv(&reqnode, 1, MPI_INT, MPI_ANY_SOURCE, VMD_MPI_TAG_FOR_REQUEST, 
00258                  MPI_COMM_WORLD, &rcvstat); 
00259 
00260         i=-1; // indicate that the for loop is completed
00261         MPI_Send(&i, 1, MPI_INT, reqnode, VMD_MPI_TAG_FOR_REQUEST, 
00262                  MPI_COMM_WORLD);
00263       }
00264 #endif
00265     } else {
00266       char istr[128];
00267       int done=0;
00268       int mynode=app->par_rank();
00269       while (!done) {
00270         MPI_Send(&mynode, 1, MPI_INT, 0, VMD_MPI_TAG_FOR_REQUEST, 
00271                  MPI_COMM_WORLD);
00272 
00273         MPI_Status rcvstat;
00274         MPI_Recv(&i, 1, MPI_INT, MPI_ANY_SOURCE, VMD_MPI_TAG_FOR_REQUEST, 
00275                  MPI_COMM_WORLD, &rcvstat); 
00276         if (i == -1) {
00277           done = 1;
00278         } else {
00279           sprintf(istr, "%d", i);
00280           if (Tcl_VarEval(interp, argv[4], " ", istr, " {",
00281                           argv[5], "} ", NULL) != TCL_OK) {
00282             Tcl_SetResult(interp, (char *) "error occured during parallel for", TCL_STATIC);
00283           }
00284         }
00285       }
00286     }
00287 #endif
00288 
00289     return TCL_OK;
00290   }
00291 
00292   // Execute an allgather producing a Tcl list of the per-node contributions
00293   //
00294   // parallel allgather <object>
00295   //
00296   if(!strupncmp(argv[1], "allgather", CMDLEN)) {
00297     int isok = (argc == 3);
00298 
00299 #if defined(VMDMPI)
00300     int allok = 0;
00301     int i;
00302 
00303     // Check all node result codes before we continue with the gather
00304     MPI_Allreduce(&isok, &allok, 1, MPI_INT, MPI_LAND, MPI_COMM_WORLD);
00305 
00306     if (!allok) {
00307       Tcl_SetResult(interp, (char *) "invalid parallel gather, missing parameter on one or more nodes", TCL_STATIC);
00308       return TCL_ERROR;
00309     }
00310 
00311     // Collect parameter size data so we can allocate result buffers
00312     // before executing the gather
00313     int *szlist = new int[app->par_size()];
00314     szlist[app->par_rank()] = strlen(argv[2])+1;
00315 
00316 #if defined(USE_MPI_IN_PLACE)
00317     // MPI >= 2.x implementations (e.g. NCSA/Cray Blue Waters)
00318     MPI_Allgather(MPI_IN_PLACE, 1, MPI_INT,
00319                   &szlist[0], 1, MPI_INT, MPI_COMM_WORLD);
00320 #else
00321     // MPI 1.x
00322     MPI_Allgather(&szlist[app->par_rank()], 1, MPI_INT,
00323                   &szlist[0], 1, MPI_INT, MPI_COMM_WORLD);
00324 #endif
00325 
00326     int totalsz = 0;
00327     int *displist = new int[app->par_size()];
00328     for (i=0; i<app->par_size(); i++) {
00329       displist[i]=totalsz;
00330       totalsz+=szlist[i];
00331     }
00332 
00333     char *recvbuf = new char[totalsz];
00334     memset(recvbuf, 0, totalsz);
00335 
00336     // Copy this node's data into the correct array position
00337     strcpy(&recvbuf[displist[app->par_rank()]], argv[2]);
00338 
00339     // Perform the parallel gather 
00340 #if defined(USE_MPI_IN_PLACE)
00341     // MPI >= 2.x implementations (e.g. NCSA/Cray Blue Waters)
00342     MPI_Allgatherv(MPI_IN_PLACE, szlist[app->par_rank()], MPI_BYTE,
00343                    &recvbuf[0], szlist, displist, MPI_BYTE, MPI_COMM_WORLD);
00344 #else
00345     // MPI 1.x
00346     MPI_Allgatherv(&recvbuf[displist[app->par_rank()]], szlist[app->par_rank()], MPI_BYTE,
00347                    &recvbuf[0], szlist, displist, MPI_BYTE, MPI_COMM_WORLD);
00348 #endif
00349 
00350     // Build Tcl result from the array of results
00351     Tcl_Obj *tcl_result = Tcl_NewListObj(0, NULL);
00352     for (i=0; i<app->par_size(); i++) {
00353       Tcl_ListObjAppendElement(interp, tcl_result, Tcl_NewStringObj(&recvbuf[displist[i]], szlist[i]-1));
00354     }
00355     Tcl_SetObjResult(interp, tcl_result);
00356 
00357     delete [] recvbuf;
00358     delete [] displist;
00359     delete [] szlist;
00360     return TCL_OK;
00361 #else
00362     if (!isok) {
00363       Tcl_SetResult(interp, (char *) "invalid parallel gather, missing parameter on one or more nodes", TCL_STATIC);
00364       return TCL_ERROR;
00365     }
00366 
00367     Tcl_Obj *tcl_result = Tcl_NewListObj(0, NULL);
00368     Tcl_ListObjAppendElement(interp, tcl_result, Tcl_NewStringObj(argv[2], strlen(argv[2])));
00369     Tcl_SetObjResult(interp, tcl_result);
00370     return TCL_OK;
00371 #endif
00372   }
00373 
00374 
00375   //
00376   // Execute an All-Reduce across all of the nodes.
00377   // The user must provide a Tcl proc that performs the appropriate reduction
00378   // operation for a pair of data items, resulting in a single item.
00379   // Since the user may pass floating point data or perform reductions 
00380   // that give very slightly different answers depending on the order of
00381   // operations, the architecture or the host, or whether reductions on 
00382   // a given host are occuring on the CPU or on a heterogeneous accelerator 
00383   // or GPU of some kind, we must ensure that all nodes get a bit-identical
00384   // result.  When heterogeneous accelerators are involved, we can really 
00385   // only guarantee this by implementing the All-Reduce with a 
00386   // Reduce-then-Broadcast approach, where the reduction collapses the
00387   // result down to node zero, which then does a broadcast to all peers. 
00388   //
00389   // parallel allreduce <tcl reduction proc> <object>
00390   //
00391   if(!strupncmp(argv[1], "allreduce", CMDLEN)) {
00392     int isok = (argc == 4);
00393     int N = app->par_size();
00394 
00395     //
00396     // If there's only one node, short-circuit the full parallel reduction
00397     //
00398     if (N == 1) {
00399       if (!isok) {
00400         Tcl_SetResult(interp, (char *) "invalid parallel reduction, missing parameter", TCL_STATIC);
00401         return TCL_ERROR;
00402       }
00403 
00404       // return our result, no other reduction is necessary
00405       Tcl_SetObjResult(interp, Tcl_NewStringObj(argv[3], strlen(argv[3])));
00406       return TCL_OK;
00407     }
00408 
00409 #if 1 && defined(VMDMPI)
00410     //
00411     // All-Reduce implementation based on a ring reduction followed by a 
00412     // broadcast from node zero.  This implementation gaurantees strict
00413     // ordering and will properly handle the case where one or more nodes
00414     // perform their reduction with slightly differing floating point 
00415     // rounding than others (e.g. using GPUs, heterogeneous nodes, etc),
00416     // and it works with any number of nodes.  While NOT latency-optimal,
00417     // this implementation is close to bandwidth-optimal which is helpful
00418     // for workstation clusters on non-switched networks or networks with
00419     // switches that cannot operate in a fully non-blocking manner.
00420     //
00421     int allok = 0;
00422 
00423     // Check all node result codes before we continue with the reduction
00424     MPI_Allreduce(&isok, &allok, 1, MPI_INT, MPI_LAND, MPI_COMM_WORLD);
00425 
00426     // XXX we may want to verify that all nodes are going to call the same
00427     // reduction proc here before continuing further.
00428 
00429     if (!allok) {
00430       Tcl_SetResult(interp, (char *) "invalid parallel reduction, missing parameter on one or more nodes", TCL_STATIC);
00431       return TCL_ERROR;
00432     }
00433 
00434     // copy incoming data into initial "result" object
00435     Tcl_Obj *resultobj = Tcl_NewStringObj((const char *) argv[3], strlen(argv[3])+1);
00436 
00437     // A ring-based all-reduce implementation which should be 
00438     // close to bandwidth-optimal, at the cost of additional latency.
00439     int src=app->par_rank(); // src node is this node
00440     int Ldest = (N + src + 1) % N; // compute left peer
00441     int Rdest = (N + src - 1) % N; // compute right peer
00442     MPI_Status status;
00443 
00444     if (src != 0) {
00445       int recvsz = 0;
00446 
00447       // Post blocking receive for data size
00448       MPI_Recv(&recvsz, 1, MPI_INT, Ldest,
00449                VMD_MPI_TAG_ALLREDUCE_ARGLENGTH, MPI_COMM_WORLD, &status);
00450 
00451       // Allocate or resize receive buffer 
00452       char * recvbuf = (char *) malloc(recvsz);
00453 
00454       // Post non-blocking receive for data
00455       MPI_Recv(recvbuf, recvsz, MPI_BYTE, Ldest,
00456                VMD_MPI_TAG_ALLREDUCE_PAYLOAD, MPI_COMM_WORLD, &status);
00457 
00458       // Perform reduction
00459       // Perform the reduction operation on our existing and incoming data.
00460       // We build a Tcl command string with the user-defined proc, this 
00461       // node's previous resultand, and the incoming data, and evaluate it.
00462       if (Tcl_VarEval(interp, argv[2], " ", 
00463                       Tcl_GetString(resultobj), " ", 
00464                       recvbuf, NULL) != TCL_OK) {
00465         printf("Error occured during reduction!\n");    
00466       }
00467 
00468       // Prep for next reduction step.  Set result object to result of
00469       // the latest communication/reduction phase.
00470       resultobj = Tcl_GetObjResult(interp);
00471 
00472       // Free the receive buffer
00473       free(recvbuf);
00474     } 
00475   
00476     //
00477     // All nodes
00478     //  
00479     char *sendbuf = Tcl_GetString(resultobj);
00480     int sendsz = strlen(sendbuf)+1;
00481 
00482     // Post blocking send for data size
00483     MPI_Send(&sendsz, 1, MPI_INT, Rdest,
00484              VMD_MPI_TAG_ALLREDUCE_ARGLENGTH, MPI_COMM_WORLD);
00485 
00486     // Post blocking send for data
00487     MPI_Send(sendbuf, sendsz, MPI_BYTE, Rdest,
00488              VMD_MPI_TAG_ALLREDUCE_PAYLOAD, MPI_COMM_WORLD);
00489 
00490     if (src == 0) {
00491       int recvsz = 0;
00492 
00493       // Post blocking receive for data size
00494       MPI_Recv(&recvsz, 1, MPI_INT, Ldest,
00495                VMD_MPI_TAG_ALLREDUCE_ARGLENGTH, MPI_COMM_WORLD, &status);
00496 
00497       // Allocate or resize receive buffer 
00498       char * recvbuf = (char *) malloc(recvsz);
00499 
00500       // Post non-blocking receive for data
00501       MPI_Recv(recvbuf, recvsz, MPI_BYTE, Ldest,
00502                VMD_MPI_TAG_ALLREDUCE_PAYLOAD, MPI_COMM_WORLD, &status);
00503 
00504       // Perform reduction
00505       // Perform the reduction operation on our existing and incoming data.
00506       // We build a Tcl command string with the user-defined proc, this 
00507       // node's previous result and the incoming data, and evaluate it.
00508       if (Tcl_VarEval(interp, argv[2], " ", 
00509                       Tcl_GetString(resultobj), " ", 
00510                       recvbuf, NULL) != TCL_OK) {
00511         printf("Error occured during reduction!\n");    
00512       }
00513 
00514       // Prep for next reduction step.  Set result object to result of
00515       // the latest communication/reduction phase.
00516       resultobj = Tcl_GetObjResult(interp);
00517 
00518       // Free the receive buffer
00519       free(recvbuf);
00520     } 
00521 
00522     //
00523     // Broadcast final result from root to peers
00524     //
00525     if (src == 0) {
00526       // update send buffer for root node before broadcast
00527       sendbuf = Tcl_GetString(resultobj);
00528       sendsz = strlen(sendbuf)+1;
00529       MPI_Bcast(&sendsz, 1, MPI_INT, 0, MPI_COMM_WORLD);
00530       MPI_Bcast(sendbuf, sendsz, MPI_BYTE, 0, MPI_COMM_WORLD);
00531     } else {
00532       int recvsz = 0;
00533       MPI_Bcast(&recvsz, 1, MPI_INT, 0, MPI_COMM_WORLD);
00534 
00535       // Allocate or resize receive buffer 
00536       char * recvbuf = (char *) malloc(recvsz);
00537 
00538       MPI_Bcast(recvbuf, recvsz, MPI_BYTE, 0, MPI_COMM_WORLD);
00539 
00540       // Set the final Tcl result if necessary
00541       Tcl_SetObjResult(interp, Tcl_NewStringObj(recvbuf, recvsz-1));
00542 
00543       // Free the receive buffer
00544       free(recvbuf);
00545     }
00546 
00547     return TCL_OK;
00548 
00549 #elif defined(VMDMPI)
00550 
00551     //
00552     // Power-of-two-only hypercube/butterfly/recursive doubling 
00553     // All-Reduce implementation.  This implementation can't be used
00554     // in the case that we have either a non-power-of-two node count or
00555     // in the case where we have heterogeneous processing units that may
00556     // yield different floating point rounding.  For now we leave this
00557     // implementation in the code for performance comparisons until we work
00558     // out the changes necessary to make it closer to bandwidth-optimal,
00559     // heterogeneous-safe, and non-power-of-two capable.
00560     //
00561     int allok = 0;
00562     int i;
00563 
00564     // Check all node result codes before we continue with the reduction
00565     MPI_Allreduce(&isok, &allok, 1, MPI_INT, MPI_LAND, MPI_COMM_WORLD);
00566 
00567     // XXX we may want to verify that all nodes are going to call the same
00568     // reduction proc here before continuing further.
00569 
00570     if (!allok) {
00571       Tcl_SetResult(interp, (char *) "invalid parallel reduction, missing parameter on one or more nodes", TCL_STATIC);
00572       return TCL_ERROR;
00573     }
00574 
00575     // Calculate number of reduction phases required
00576     int log2N;
00577     for (log2N=0; N>1; N>>=1) {
00578       log2N++;
00579 
00580       // XXX bail out of we don't have a power-of-two node count, 
00581       //     at least until we implement 3-2 reduction phases
00582       if ((N & 1) && (N > 1)) {
00583         Tcl_SetResult(interp, (char *) "parallel allreduce only allowed for even power-of-two node count", TCL_STATIC);
00584         return TCL_ERROR;
00585       }
00586     }
00587     N = app->par_size();
00588 
00589     // copy incoming data into initial "result" object
00590     Tcl_Obj *resultobj = Tcl_NewStringObj((const char *) argv[3], strlen(argv[3])+1);
00591 
00592     // An all-reduce tree with hypercube connectivity with 
00593     // log2(N) communication/reduction phases.  At each phase, we compute
00594     // the peer/destination node we will communicate with using an XOR of
00595     // our node ID with the current hypercube dimension.  If we have an
00596     // incomplete hypercube topology (e.g. non-power-of-two node count), 
00597     // we have to do special 3-2 communication rounds (not implemented yet).
00598     // The current implementation requires that all existing nodes 
00599     // participate, and that they contribute a valid data item.
00600     // If we wish to support reductions where a node may not contribute,
00601     // we would need to handle that similarly to a peer node that doesn't 
00602     // exist, but we would likely determine this during the parameter length
00603     // exchange step.
00604     int src=app->par_rank(); // src node is this node
00605     for (i=0; i<log2N; i++) {
00606       int mask = 1 << i;     // generate bitmask to use in the XOR
00607       int dest = src ^ mask; // XOR src node with bitmask to find dest node
00608       Tcl_Obj *oldresultobj = resultobj; // track old result 
00609 
00610       // Check to make sure dest node exists for non-power-of-two 
00611       // node counts (an incomplete hypercube).  If not, skip to the next
00612       // communication/reduction phase.
00613       if (dest < N) {
00614         char *sendbuf = Tcl_GetString(oldresultobj);
00615         int sendsz = strlen(sendbuf)+1;
00616         int recvsz = 0;
00617         MPI_Request handle;
00618         MPI_Status status;
00619 
00620         //
00621         // Exchange required receive buffer size for data exchange with peer
00622         //
00623 
00624         // Post non-blocking receive for data size
00625         MPI_Irecv(&recvsz, 1, MPI_INT, dest,
00626                   VMD_MPI_TAG_ALLREDUCE_ARGLENGTH, MPI_COMM_WORLD, &handle);
00627 
00628         // Post blocking send for data size
00629         MPI_Send(&sendsz, 1, MPI_INT, dest,
00630                  VMD_MPI_TAG_ALLREDUCE_ARGLENGTH, MPI_COMM_WORLD);
00631 
00632         // Wait for non-blocking receive of data size to complete
00633         MPI_Wait(&handle, &status); 
00634 
00635 // printf("src[%d], dest[%d], value '%s', recvsz: %d\n", src, dest, sendbuf, recvsz);
00636 
00637         // Allocate or resize receive buffer 
00638         char * recvbuf = (char *) malloc(recvsz);
00639 
00640         //
00641         // Exchange the data payload
00642         // 
00643 
00644         // Post non-blocking receive for data
00645         MPI_Irecv(recvbuf, recvsz, MPI_BYTE, dest,
00646                   VMD_MPI_TAG_ALLREDUCE_PAYLOAD, MPI_COMM_WORLD, &handle);
00647         
00648         // Post blocking send for data
00649         MPI_Send(sendbuf, sendsz, MPI_BYTE, dest,
00650                  VMD_MPI_TAG_ALLREDUCE_PAYLOAD, MPI_COMM_WORLD);
00651 
00652         // Wait for receive of data
00653         MPI_Wait(&handle, &status); 
00654 
00655         // Perform the reduction operation on our existing and incoming data.
00656         // We build a Tcl command string with the user-defined proc, this 
00657         // node's previous result and the incoming data, and evaluate it.
00658         if (Tcl_VarEval(interp, argv[2], " ", 
00659                         sendbuf, " ", recvbuf, NULL) != TCL_OK) {
00660           printf("Error occured during reduction!\n");    
00661         }
00662 
00663         // Free the receive buffer
00664         free(recvbuf);
00665 
00666         // Prep for next reduction step.  Set result object to result of
00667         // the latest communication/reduction phase.
00668         resultobj = Tcl_GetObjResult(interp);
00669       }
00670     }
00671 
00672     // Set the final Tcl result if necessary
00673     Tcl_SetObjResult(interp, resultobj);
00674 
00675     return TCL_OK;
00676 #endif
00677   }
00678 
00679   Tcl_SetResult(interp, (char *) "invalid parallel subcommand.", TCL_STATIC);
00680 
00681   return TCL_ERROR;
00682 }
00683 

Generated on Wed May 23 01:49:41 2012 for VMD (current) by doxygen1.2.14 written by Dimitri van Heesch, © 1997-2002