00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 #if defined(VMDMPI)
00027 #include <mpi.h>    
00028                     
00029                     
00030                     
00031                     
00032 
00033 
00034 
00035 #if !defined(USE_MPI_IN_PLACE)
00036 #if (MPI_VERSION >= 2) || defined(MPI_IN_PLACE)
00037 #define USE_MPI_IN_PLACE 1
00038 #endif
00039 #endif
00040 
00041 #endif
00042 
00043 #include <stdio.h>
00044 #include <string.h>
00045 #include <stdlib.h>
00046 #include <tcl.h>
00047 #include "config.h"
00048 #include "CommandQueue.h"
00049 #include "Command.h"
00050 #include "VMDApp.h"
00051 #include "Inform.h" 
00052 #include "WKFThreads.h"
00053 
00054 #define VMD_MPI_TAG_ALLREDUCE_ARGLENGTH 1
00055 #define VMD_MPI_TAG_ALLREDUCE_PAYLOAD   2
00056 #define VMD_MPI_TAG_FOR_REQUEST         3
00057 
00058 
00059 
00060 
00061 
00062 
00063 #if defined(VMDMPI)
00064 
00065 
00066 
00067 
00068 MPI_Comm turbine_adlb_comm;
00069 #endif
00070 
00071 extern int swift_mpi_init(Tcl_Interp *interp) {
00072 #if defined(VMDMPI)
00073   if (getenv("VMDNOSWIFTCOMM") == NULL) {
00074     if (MPI_SUCCESS == MPI_Comm_dup(MPI_COMM_WORLD, &turbine_adlb_comm)) {
00075       Tcl_Obj* TURBINE_ADLB_COMM = Tcl_NewStringObj("TURBINE_ADLB_COMM", -1);
00076       
00077       
00078       
00079       Tcl_Obj* adlb_comm_ptr = Tcl_NewLongObj((long) &turbine_adlb_comm);
00080       Tcl_ObjSetVar2(interp, TURBINE_ADLB_COMM, NULL, adlb_comm_ptr, 0);
00081     }
00082   }
00083 #endif
00084 
00085   return 0;
00086 }
00087 
00088 
00089 
00090 
00091 
00092 #if defined(VMDMPI)
00093 
00094 typedef struct {
00095   int numnodes;
00096   wkf_tasktile_t loop;
00097   wkf_shared_iterator_t iter;
00098 } parallel_for_parms;
00099 
00100 extern "C" void *vmd_mpi_parallel_for_scheduler(void *voidparms) {
00101   parallel_for_parms *parfor = (parallel_for_parms *) voidparms;
00102 
00103   
00104   
00105 #if defined(VMDTHREADS)
00106   int i;
00107   wkf_tasktile_t curtile;
00108   while (wkf_shared_iterator_next_tile(&parfor->iter, 1, &curtile) != WKF_SCHED_DONE) {
00109     i = curtile.start;
00110 #else
00111   int i;
00112   for (i=parfor->loop.start; i<parfor->loop.end; i++) {
00113 #endif
00114     int reqnode;
00115     MPI_Status rcvstat;
00116     MPI_Recv(&reqnode, 1, MPI_INT, MPI_ANY_SOURCE, VMD_MPI_TAG_FOR_REQUEST, 
00117              MPI_COMM_WORLD, &rcvstat); 
00118     MPI_Send(&i, 1, MPI_INT, reqnode, VMD_MPI_TAG_FOR_REQUEST, 
00119              MPI_COMM_WORLD);
00120   }
00121 
00122   
00123   int node;
00124   for (node=1; node<parfor->numnodes; node++) {
00125     int reqnode;
00126     MPI_Status rcvstat;
00127     MPI_Recv(&reqnode, 1, MPI_INT, MPI_ANY_SOURCE, VMD_MPI_TAG_FOR_REQUEST, 
00128              MPI_COMM_WORLD, &rcvstat); 
00129 
00130     i=-1; 
00131     MPI_Send(&i, 1, MPI_INT, reqnode, VMD_MPI_TAG_FOR_REQUEST, 
00132              MPI_COMM_WORLD);
00133   }
00134 
00135   return NULL;
00136 }
00137 
00138 #endif
00139 
00140 
00141 
00142 int text_cmd_parallel(ClientData cd, Tcl_Interp *interp, int argc, const char *argv[]) {
00143   VMDApp *app = (VMDApp *)cd;
00144 
00145   if(argc<2) {
00146     Tcl_SetResult(interp,
00147       (char *)
00148       "Parallel job query commands:\n"
00149       "  parallel nodename\n"
00150       "  parallel noderank\n"
00151       "  parallel nodecount\n"
00152       "Parallel collective operations (all nodes MUST participate):\n"
00153       "  parallel allgather <object>\n"
00154       "  parallel allreduce <tcl reduction proc> <object>\n"
00155       "  parallel barrier\n"
00156       "  parallel for <startcount> <endcount> <tcl callback proc> <user data>",
00157       TCL_STATIC);
00158     return TCL_ERROR;
00159   }
00160 
00161   
00162   
00163   if (!strcmp(argv[1], "swift_clone_communicator")) {
00164     swift_mpi_init(interp);
00165     return TCL_OK;
00166   }
00167 
00168   
00169   if (!strcmp(argv[1], "nodename")) {
00170     Tcl_Obj *tcl_result = Tcl_NewListObj(0, NULL);
00171     Tcl_ListObjAppendElement(interp, tcl_result, Tcl_NewStringObj(app->par_name(), strlen(app->par_name())));
00172     Tcl_SetObjResult(interp, tcl_result);
00173     return TCL_OK;
00174   }
00175 
00176   
00177   if (!strcmp(argv[1], "noderank")) {
00178     Tcl_Obj *tcl_result = Tcl_NewListObj(0, NULL);
00179     Tcl_ListObjAppendElement(interp, tcl_result, Tcl_NewIntObj(app->par_rank()));
00180     Tcl_SetObjResult(interp, tcl_result);
00181     return TCL_OK;
00182   }
00183 
00184   
00185   if (!strcmp(argv[1], "nodecount")) {
00186     Tcl_Obj *tcl_result = Tcl_NewListObj(0, NULL);
00187     Tcl_ListObjAppendElement(interp, tcl_result, Tcl_NewIntObj(app->par_size()));
00188     Tcl_SetObjResult(interp, tcl_result);
00189     return TCL_OK;
00190   }
00191 
00192   
00193   if(!strupncmp(argv[1], "barrier", CMDLEN) && argc==2) {
00194     app->par_barrier();
00195     return TCL_OK;
00196   }
00197 
00198 
00199   
00200   
00201   
00202   
00203   if (!strupncmp(argv[1], "for", CMDLEN)) {
00204     int isok = (argc == 6);
00205     int N = app->par_size();
00206     int start, end;
00207 
00208     if (Tcl_GetInt(interp, argv[2], &start) != TCL_OK ||
00209         Tcl_GetInt(interp, argv[3], &end) != TCL_OK) {
00210       isok = 0;
00211     }
00212 
00213     
00214     
00215     
00216     if (N == 1) {
00217       if (!isok) {
00218         Tcl_SetResult(interp, (char *) "invalid parallel for, missing parameter", TCL_STATIC);
00219         return TCL_ERROR;
00220       }
00221 
00222       
00223       int i;
00224       for (i=start; i<=end; i++) { 
00225         char istr[128] = { 0 };
00226         sprintf(istr, "%d", i);
00227         if (Tcl_VarEval(interp, argv[4], " ", istr, " {",
00228                         argv[5], "} ", NULL) != TCL_OK) {
00229           Tcl_SetResult(interp, (char *) "error occured during parallel for", TCL_STATIC);
00230         }
00231       }
00232 
00233       return TCL_OK;
00234     }
00235 
00236 #if defined(VMDMPI)
00237     int allok = 0;
00238 
00239     
00240     MPI_Allreduce(&isok, &allok, 1, MPI_INT, MPI_LAND, MPI_COMM_WORLD);
00241 
00242     
00243     
00244 
00245     if (!allok) {
00246       Tcl_SetResult(interp, (char *) "invalid parallel for, missing parameter on one or more nodes", TCL_STATIC);
00247       return TCL_ERROR;
00248     }
00249 
00250     
00251     
00252     int i;
00253     if (app->par_rank() == 0) {
00254       
00255       parallel_for_parms parfor;
00256       memset(&parfor, 0, sizeof(parfor));
00257       parfor.numnodes = N;
00258       parfor.loop.start=start;
00259       parfor.loop.end=end+1;
00260       wkf_shared_iterator_init(&parfor.iter);
00261       wkf_shared_iterator_set(&parfor.iter, &parfor.loop);
00262 
00263 #if defined(VMDTHREADS)
00264       
00265       wkf_thread_t pft;
00266       wkf_thread_create(&pft, vmd_mpi_parallel_for_scheduler, &parfor);
00267 
00268       
00269       wkf_tasktile_t curtile;
00270       while (wkf_shared_iterator_next_tile(&parfor.iter, 1, &curtile) != WKF_SCHED_DONE) {
00271         i = curtile.start;
00272         char istr[128] = { 0 };
00273         sprintf(istr, "%d", i);
00274         if (Tcl_VarEval(interp, argv[4], " ", istr, " {",
00275                         argv[5], "} ", NULL) != TCL_OK) {
00276           Tcl_SetResult(interp, (char *) "error occured during parallel for", TCL_STATIC);
00277         }
00278       }
00279 
00280       
00281       wkf_thread_join(pft, NULL);
00282 #else
00283       
00284       vmd_mpi_parallel_for_scheduler(&parfor);
00285 #endif
00286 
00287       wkf_shared_iterator_destroy(&parfor.iter);
00288     } else {
00289       char istr[128] = { 0 };
00290       int done=0;
00291       int mynode=app->par_rank();
00292       while (!done) {
00293         MPI_Send(&mynode, 1, MPI_INT, 0, VMD_MPI_TAG_FOR_REQUEST, 
00294                  MPI_COMM_WORLD);
00295 
00296         MPI_Status rcvstat;
00297         MPI_Recv(&i, 1, MPI_INT, MPI_ANY_SOURCE, VMD_MPI_TAG_FOR_REQUEST, 
00298                  MPI_COMM_WORLD, &rcvstat); 
00299         if (i == -1) {
00300           done = 1;
00301         } else {
00302           sprintf(istr, "%d", i);
00303           if (Tcl_VarEval(interp, argv[4], " ", istr, " {",
00304                           argv[5], "} ", NULL) != TCL_OK) {
00305             Tcl_SetResult(interp, (char *) "error occured during parallel for", TCL_STATIC);
00306           }
00307         }
00308       }
00309     }
00310 #endif
00311 
00312     return TCL_OK;
00313   }
00314 
00315 
00316   
00317   
00318   
00319   
00320   if (!strupncmp(argv[1], "allgather", CMDLEN)) {
00321     int isok = (argc == 3);
00322     int N = app->par_size();
00323 
00324     
00325     
00326     
00327     
00328     
00329     if (N == 1) {
00330       if (!isok) {
00331         Tcl_SetResult(interp, (char *) "invalid parallel gather, missing parameter on one or more nodes", TCL_STATIC);
00332         return TCL_ERROR;
00333       }
00334 
00335       Tcl_Obj *tcl_result = Tcl_NewListObj(0, NULL);
00336       Tcl_ListObjAppendElement(interp, tcl_result, Tcl_NewStringObj(argv[2], strlen(argv[2])));
00337       Tcl_SetObjResult(interp, tcl_result);
00338       return TCL_OK;
00339     } 
00340 #if defined(VMDMPI)
00341     else {
00342       int allok = 0;
00343       int i;
00344 
00345       
00346       MPI_Allreduce(&isok, &allok, 1, MPI_INT, MPI_LAND, MPI_COMM_WORLD);
00347 
00348       if (!allok) {
00349         Tcl_SetResult(interp, (char *) "invalid parallel gather, missing parameter on one or more nodes", TCL_STATIC);
00350         return TCL_ERROR;
00351       }
00352 
00353       
00354       
00355       int *szlist = new int[N];
00356       szlist[app->par_rank()] = strlen(argv[2])+1;
00357 
00358 #if defined(USE_MPI_IN_PLACE)
00359       
00360       MPI_Allgather(MPI_IN_PLACE, 1, MPI_INT,
00361                     &szlist[0], 1, MPI_INT, MPI_COMM_WORLD);
00362 #else
00363       
00364       MPI_Allgather(&szlist[app->par_rank()], 1, MPI_INT,
00365                     &szlist[0], 1, MPI_INT, MPI_COMM_WORLD);
00366 #endif
00367 
00368       int totalsz = 0;
00369       int *displist = new int[N];
00370       for (i=0; i<N; i++) {
00371         displist[i]=totalsz;
00372         totalsz+=szlist[i];
00373       }
00374 
00375       char *recvbuf = new char[totalsz];
00376       memset(recvbuf, 0, totalsz);
00377 
00378       
00379       strcpy(&recvbuf[displist[app->par_rank()]], argv[2]);
00380 
00381       
00382 #if defined(USE_MPI_IN_PLACE)
00383       
00384       MPI_Allgatherv(MPI_IN_PLACE, szlist[app->par_rank()], MPI_BYTE,
00385                      &recvbuf[0], szlist, displist, MPI_BYTE, MPI_COMM_WORLD);
00386 #else
00387       
00388       MPI_Allgatherv(&recvbuf[displist[app->par_rank()]], szlist[app->par_rank()], MPI_BYTE,
00389                      &recvbuf[0], szlist, displist, MPI_BYTE, MPI_COMM_WORLD);
00390 #endif
00391 
00392       
00393       Tcl_Obj *tcl_result = Tcl_NewListObj(0, NULL);
00394       for (i=0; i<N; i++) {
00395         Tcl_ListObjAppendElement(interp, tcl_result, Tcl_NewStringObj(&recvbuf[displist[i]], szlist[i]-1));
00396       }
00397       Tcl_SetObjResult(interp, tcl_result);
00398 
00399       delete [] recvbuf;
00400       delete [] displist;
00401       delete [] szlist;
00402       return TCL_OK;
00403     }
00404 #endif
00405   }
00406 
00407 
00408   
00409   
00410   
00411   
00412   
00413   
00414   
00415   
00416   
00417   
00418   
00419   
00420   
00421   
00422   
00423   
00424   if (!strupncmp(argv[1], "allreduce", CMDLEN)) {
00425     int isok = (argc == 4);
00426     int N = app->par_size();
00427 
00428     
00429     
00430     
00431     if (N == 1) {
00432       if (!isok) {
00433         Tcl_SetResult(interp, (char *) "invalid parallel reduction, missing parameter", TCL_STATIC);
00434         return TCL_ERROR;
00435       }
00436 
00437       
00438       Tcl_SetObjResult(interp, Tcl_NewStringObj(argv[3], strlen(argv[3])));
00439       return TCL_OK;
00440     }
00441 
00442 #if 1 && defined(VMDMPI)
00443     
00444     
00445     
00446     
00447     
00448     
00449     
00450     
00451     
00452     
00453     
00454     int allok = 0;
00455 
00456     
00457     MPI_Allreduce(&isok, &allok, 1, MPI_INT, MPI_LAND, MPI_COMM_WORLD);
00458 
00459     
00460     
00461 
00462     if (!allok) {
00463       Tcl_SetResult(interp, (char *) "invalid parallel reduction, missing parameter on one or more nodes", TCL_STATIC);
00464       return TCL_ERROR;
00465     }
00466 
00467     
00468     Tcl_Obj *resultobj = Tcl_NewStringObj((const char *) argv[3], strlen(argv[3])+1);
00469 
00470     
00471     
00472     int src=app->par_rank(); 
00473     int Ldest = (N + src + 1) % N; 
00474     int Rdest = (N + src - 1) % N; 
00475     MPI_Status status;
00476 
00477     if (src != 0) {
00478       int recvsz = 0;
00479 
00480       
00481       MPI_Recv(&recvsz, 1, MPI_INT, Ldest,
00482                VMD_MPI_TAG_ALLREDUCE_ARGLENGTH, MPI_COMM_WORLD, &status);
00483 
00484       
00485       char * recvbuf = (char *) malloc(recvsz);
00486 
00487       
00488       MPI_Recv(recvbuf, recvsz, MPI_BYTE, Ldest,
00489                VMD_MPI_TAG_ALLREDUCE_PAYLOAD, MPI_COMM_WORLD, &status);
00490 
00491       
00492       
00493       
00494       
00495       if (Tcl_VarEval(interp, argv[2], " ", 
00496                       Tcl_GetString(resultobj), " ", 
00497                       recvbuf, NULL) != TCL_OK) {
00498         printf("Error occured during reduction!\n");    
00499       }
00500 
00501       
00502       
00503       resultobj = Tcl_GetObjResult(interp);
00504 
00505       
00506       free(recvbuf);
00507     } 
00508   
00509     
00510     
00511     
00512     char *sendbuf = Tcl_GetString(resultobj);
00513     int sendsz = strlen(sendbuf)+1;
00514 
00515     
00516     MPI_Send(&sendsz, 1, MPI_INT, Rdest,
00517              VMD_MPI_TAG_ALLREDUCE_ARGLENGTH, MPI_COMM_WORLD);
00518 
00519     
00520     MPI_Send(sendbuf, sendsz, MPI_BYTE, Rdest,
00521              VMD_MPI_TAG_ALLREDUCE_PAYLOAD, MPI_COMM_WORLD);
00522 
00523     if (src == 0) {
00524       int recvsz = 0;
00525 
00526       
00527       MPI_Recv(&recvsz, 1, MPI_INT, Ldest,
00528                VMD_MPI_TAG_ALLREDUCE_ARGLENGTH, MPI_COMM_WORLD, &status);
00529 
00530       
00531       char * recvbuf = (char *) malloc(recvsz);
00532 
00533       
00534       MPI_Recv(recvbuf, recvsz, MPI_BYTE, Ldest,
00535                VMD_MPI_TAG_ALLREDUCE_PAYLOAD, MPI_COMM_WORLD, &status);
00536 
00537       
00538       
00539       
00540       
00541       if (Tcl_VarEval(interp, argv[2], " ", 
00542                       Tcl_GetString(resultobj), " ", 
00543                       recvbuf, NULL) != TCL_OK) {
00544         printf("Error occured during reduction!\n");    
00545       }
00546 
00547       
00548       
00549       resultobj = Tcl_GetObjResult(interp);
00550 
00551       
00552       free(recvbuf);
00553     } 
00554 
00555     
00556     
00557     
00558     if (src == 0) {
00559       
00560       sendbuf = Tcl_GetString(resultobj);
00561       sendsz = strlen(sendbuf)+1;
00562       MPI_Bcast(&sendsz, 1, MPI_INT, 0, MPI_COMM_WORLD);
00563       MPI_Bcast(sendbuf, sendsz, MPI_BYTE, 0, MPI_COMM_WORLD);
00564     } else {
00565       int recvsz = 0;
00566       MPI_Bcast(&recvsz, 1, MPI_INT, 0, MPI_COMM_WORLD);
00567 
00568       
00569       char * recvbuf = (char *) malloc(recvsz);
00570 
00571       MPI_Bcast(recvbuf, recvsz, MPI_BYTE, 0, MPI_COMM_WORLD);
00572 
00573       
00574       Tcl_SetObjResult(interp, Tcl_NewStringObj(recvbuf, recvsz-1));
00575 
00576       
00577       free(recvbuf);
00578     }
00579 
00580     return TCL_OK;
00581 
00582 #elif defined(VMDMPI)
00583 
00584     
00585     
00586     
00587     
00588     
00589     
00590     
00591     
00592     
00593     
00594     int allok = 0;
00595     int i;
00596 
00597     
00598     MPI_Allreduce(&isok, &allok, 1, MPI_INT, MPI_LAND, MPI_COMM_WORLD);
00599 
00600     
00601     
00602 
00603     if (!allok) {
00604       Tcl_SetResult(interp, (char *) "invalid parallel reduction, missing parameter on one or more nodes", TCL_STATIC);
00605       return TCL_ERROR;
00606     }
00607 
00608     
00609     int log2N;
00610     for (log2N=0; N>1; N>>=1) {
00611       log2N++;
00612 
00613       
00614       
00615       if ((N & 1) && (N > 1)) {
00616         Tcl_SetResult(interp, (char *) "parallel allreduce only allowed for even power-of-two node count", TCL_STATIC);
00617         return TCL_ERROR;
00618       }
00619     }
00620     N = app->par_size();
00621 
00622     
00623     Tcl_Obj *resultobj = Tcl_NewStringObj((const char *) argv[3], strlen(argv[3])+1);
00624 
00625     
00626     
00627     
00628     
00629     
00630     
00631     
00632     
00633     
00634     
00635     
00636     
00637     int src=app->par_rank(); 
00638     for (i=0; i<log2N; i++) {
00639       int mask = 1 << i;     
00640       int dest = src ^ mask; 
00641       Tcl_Obj *oldresultobj = resultobj; 
00642 
00643       
00644       
00645       
00646       if (dest < N) {
00647         char *sendbuf = Tcl_GetString(oldresultobj);
00648         int sendsz = strlen(sendbuf)+1;
00649         int recvsz = 0;
00650         MPI_Request handle;
00651         MPI_Status status;
00652 
00653         
00654         
00655         
00656 
00657         
00658         MPI_Irecv(&recvsz, 1, MPI_INT, dest,
00659                   VMD_MPI_TAG_ALLREDUCE_ARGLENGTH, MPI_COMM_WORLD, &handle);
00660 
00661         
00662         MPI_Send(&sendsz, 1, MPI_INT, dest,
00663                  VMD_MPI_TAG_ALLREDUCE_ARGLENGTH, MPI_COMM_WORLD);
00664 
00665         
00666         MPI_Wait(&handle, &status); 
00667 
00668 
00669 
00670         
00671         char * recvbuf = (char *) malloc(recvsz);
00672 
00673         
00674         
00675         
00676 
00677         
00678         MPI_Irecv(recvbuf, recvsz, MPI_BYTE, dest,
00679                   VMD_MPI_TAG_ALLREDUCE_PAYLOAD, MPI_COMM_WORLD, &handle);
00680         
00681         
00682         MPI_Send(sendbuf, sendsz, MPI_BYTE, dest,
00683                  VMD_MPI_TAG_ALLREDUCE_PAYLOAD, MPI_COMM_WORLD);
00684 
00685         
00686         MPI_Wait(&handle, &status); 
00687 
00688         
00689         
00690         
00691         if (Tcl_VarEval(interp, argv[2], " ", 
00692                         sendbuf, " ", recvbuf, NULL) != TCL_OK) {
00693           printf("Error occured during reduction!\n");    
00694         }
00695 
00696         
00697         free(recvbuf);
00698 
00699         
00700         
00701         resultobj = Tcl_GetObjResult(interp);
00702       }
00703     }
00704 
00705     
00706     Tcl_SetObjResult(interp, resultobj);
00707 
00708     return TCL_OK;
00709 #endif
00710   }
00711 
00712   Tcl_SetResult(interp, (char *) "invalid parallel subcommand.", TCL_STATIC);
00713 
00714   return TCL_ERROR;
00715 }
00716