00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026 #if defined(VMDMPI)
00027 #include <mpi.h>
00028
00029
00030
00031
00032
00033
00034
00035 #if !defined(USE_MPI_IN_PLACE)
00036 #if (MPI_VERSION >= 2) || defined(MPI_IN_PLACE)
00037 #define USE_MPI_IN_PLACE 1
00038 #endif
00039 #endif
00040
00041 #endif
00042
00043 #include <stdio.h>
00044 #include <string.h>
00045 #include <stdlib.h>
00046 #include <tcl.h>
00047 #include "config.h"
00048 #include "CommandQueue.h"
00049 #include "Command.h"
00050 #include "VMDApp.h"
00051 #include "Inform.h"
00052 #include "WKFThreads.h"
00053
00054 #define VMD_MPI_TAG_ALLREDUCE_ARGLENGTH 1
00055 #define VMD_MPI_TAG_ALLREDUCE_PAYLOAD 2
00056 #define VMD_MPI_TAG_FOR_REQUEST 3
00057
00058
00059
00060
00061
00062
00063 #if defined(VMDMPI)
00064
00065
00066
00067
00068 MPI_Comm turbine_adlb_comm;
00069 #endif
00070
00071 extern int swift_mpi_init(Tcl_Interp *interp) {
00072 #if defined(VMDMPI)
00073 if (getenv("VMDNOSWIFTCOMM") == NULL) {
00074 if (MPI_SUCCESS == MPI_Comm_dup(MPI_COMM_WORLD, &turbine_adlb_comm)) {
00075 Tcl_Obj* TURBINE_ADLB_COMM = Tcl_NewStringObj("TURBINE_ADLB_COMM", -1);
00076
00077
00078
00079 Tcl_Obj* adlb_comm_ptr = Tcl_NewLongObj((long) &turbine_adlb_comm);
00080 Tcl_ObjSetVar2(interp, TURBINE_ADLB_COMM, NULL, adlb_comm_ptr, 0);
00081 }
00082 }
00083 #endif
00084
00085 return 0;
00086 }
00087
00088
00089
00090
00091
00092 #if defined(VMDMPI)
00093
00094 typedef struct {
00095 int numnodes;
00096 wkf_tasktile_t loop;
00097 wkf_shared_iterator_t iter;
00098 } parallel_for_parms;
00099
00100 extern "C" void *vmd_mpi_parallel_for_scheduler(void *voidparms) {
00101 parallel_for_parms *parfor = (parallel_for_parms *) voidparms;
00102
00103
00104
00105 #if defined(VMDTHREADS)
00106 int i;
00107 wkf_tasktile_t curtile;
00108 while (wkf_shared_iterator_next_tile(&parfor->iter, 1, &curtile) != WKF_SCHED_DONE) {
00109 i = curtile.start;
00110 #else
00111 int i;
00112 for (i=parfor->loop.start; i<parfor->loop.end; i++) {
00113 #endif
00114 int reqnode;
00115 MPI_Status rcvstat;
00116 MPI_Recv(&reqnode, 1, MPI_INT, MPI_ANY_SOURCE, VMD_MPI_TAG_FOR_REQUEST,
00117 MPI_COMM_WORLD, &rcvstat);
00118 MPI_Send(&i, 1, MPI_INT, reqnode, VMD_MPI_TAG_FOR_REQUEST,
00119 MPI_COMM_WORLD);
00120 }
00121
00122
00123 int node;
00124 for (node=1; node<parfor->numnodes; node++) {
00125 int reqnode;
00126 MPI_Status rcvstat;
00127 MPI_Recv(&reqnode, 1, MPI_INT, MPI_ANY_SOURCE, VMD_MPI_TAG_FOR_REQUEST,
00128 MPI_COMM_WORLD, &rcvstat);
00129
00130 i=-1;
00131 MPI_Send(&i, 1, MPI_INT, reqnode, VMD_MPI_TAG_FOR_REQUEST,
00132 MPI_COMM_WORLD);
00133 }
00134
00135 return NULL;
00136 }
00137
00138 #endif
00139
00140
00141
00142 int text_cmd_parallel(ClientData cd, Tcl_Interp *interp, int argc, const char *argv[]) {
00143 VMDApp *app = (VMDApp *)cd;
00144
00145 if(argc<2) {
00146 Tcl_SetResult(interp,
00147 (char *)
00148 "Parallel job query commands:\n"
00149 " parallel nodename\n"
00150 " parallel noderank\n"
00151 " parallel nodecount\n"
00152 "Parallel collective operations (all nodes MUST participate):\n"
00153 " parallel allgather <object>\n"
00154 " parallel allreduce <tcl reduction proc> <object>\n"
00155 " parallel barrier\n"
00156 " parallel for <startcount> <endcount> <tcl callback proc> <user data>",
00157 TCL_STATIC);
00158 return TCL_ERROR;
00159 }
00160
00161
00162
00163 if (!strcmp(argv[1], "swift_clone_communicator")) {
00164 swift_mpi_init(interp);
00165 return TCL_OK;
00166 }
00167
00168
00169 if (!strcmp(argv[1], "nodename")) {
00170 Tcl_Obj *tcl_result = Tcl_NewListObj(0, NULL);
00171 Tcl_ListObjAppendElement(interp, tcl_result, Tcl_NewStringObj(app->par_name(), strlen(app->par_name())));
00172 Tcl_SetObjResult(interp, tcl_result);
00173 return TCL_OK;
00174 }
00175
00176
00177 if (!strcmp(argv[1], "noderank")) {
00178 Tcl_Obj *tcl_result = Tcl_NewListObj(0, NULL);
00179 Tcl_ListObjAppendElement(interp, tcl_result, Tcl_NewIntObj(app->par_rank()));
00180 Tcl_SetObjResult(interp, tcl_result);
00181 return TCL_OK;
00182 }
00183
00184
00185 if (!strcmp(argv[1], "nodecount")) {
00186 Tcl_Obj *tcl_result = Tcl_NewListObj(0, NULL);
00187 Tcl_ListObjAppendElement(interp, tcl_result, Tcl_NewIntObj(app->par_size()));
00188 Tcl_SetObjResult(interp, tcl_result);
00189 return TCL_OK;
00190 }
00191
00192
00193 if(!strupncmp(argv[1], "barrier", CMDLEN) && argc==2) {
00194 app->par_barrier();
00195 return TCL_OK;
00196 }
00197
00198
00199
00200
00201
00202
00203 if (!strupncmp(argv[1], "for", CMDLEN)) {
00204 int isok = (argc == 6);
00205 int N = app->par_size();
00206 int start, end;
00207
00208 if (Tcl_GetInt(interp, argv[2], &start) != TCL_OK ||
00209 Tcl_GetInt(interp, argv[3], &end) != TCL_OK) {
00210 isok = 0;
00211 }
00212
00213
00214
00215
00216 if (N == 1) {
00217 if (!isok) {
00218 Tcl_SetResult(interp, (char *) "invalid parallel for, missing parameter", TCL_STATIC);
00219 return TCL_ERROR;
00220 }
00221
00222
00223 int i;
00224 for (i=start; i<=end; i++) {
00225 char istr[128] = { 0 };
00226 sprintf(istr, "%d", i);
00227 if (Tcl_VarEval(interp, argv[4], " ", istr, " {",
00228 argv[5], "} ", NULL) != TCL_OK) {
00229 Tcl_SetResult(interp, (char *) "error occured during parallel for", TCL_STATIC);
00230 }
00231 }
00232
00233 return TCL_OK;
00234 }
00235
00236 #if defined(VMDMPI)
00237 int allok = 0;
00238
00239
00240 MPI_Allreduce(&isok, &allok, 1, MPI_INT, MPI_LAND, MPI_COMM_WORLD);
00241
00242
00243
00244
00245 if (!allok) {
00246 Tcl_SetResult(interp, (char *) "invalid parallel for, missing parameter on one or more nodes", TCL_STATIC);
00247 return TCL_ERROR;
00248 }
00249
00250
00251
00252 int i;
00253 if (app->par_rank() == 0) {
00254
00255 parallel_for_parms parfor;
00256 memset(&parfor, 0, sizeof(parfor));
00257 parfor.numnodes = N;
00258 parfor.loop.start=start;
00259 parfor.loop.end=end+1;
00260 wkf_shared_iterator_init(&parfor.iter);
00261 wkf_shared_iterator_set(&parfor.iter, &parfor.loop);
00262
00263 #if defined(VMDTHREADS)
00264
00265 wkf_thread_t pft;
00266 wkf_thread_create(&pft, vmd_mpi_parallel_for_scheduler, &parfor);
00267
00268
00269 wkf_tasktile_t curtile;
00270 while (wkf_shared_iterator_next_tile(&parfor.iter, 1, &curtile) != WKF_SCHED_DONE) {
00271 i = curtile.start;
00272 char istr[128] = { 0 };
00273 sprintf(istr, "%d", i);
00274 if (Tcl_VarEval(interp, argv[4], " ", istr, " {",
00275 argv[5], "} ", NULL) != TCL_OK) {
00276 Tcl_SetResult(interp, (char *) "error occured during parallel for", TCL_STATIC);
00277 }
00278 }
00279
00280
00281 wkf_thread_join(pft, NULL);
00282 #else
00283
00284 vmd_mpi_parallel_for_scheduler(&parfor);
00285 #endif
00286
00287 wkf_shared_iterator_destroy(&parfor.iter);
00288 } else {
00289 char istr[128] = { 0 };
00290 int done=0;
00291 int mynode=app->par_rank();
00292 while (!done) {
00293 MPI_Send(&mynode, 1, MPI_INT, 0, VMD_MPI_TAG_FOR_REQUEST,
00294 MPI_COMM_WORLD);
00295
00296 MPI_Status rcvstat;
00297 MPI_Recv(&i, 1, MPI_INT, MPI_ANY_SOURCE, VMD_MPI_TAG_FOR_REQUEST,
00298 MPI_COMM_WORLD, &rcvstat);
00299 if (i == -1) {
00300 done = 1;
00301 } else {
00302 sprintf(istr, "%d", i);
00303 if (Tcl_VarEval(interp, argv[4], " ", istr, " {",
00304 argv[5], "} ", NULL) != TCL_OK) {
00305 Tcl_SetResult(interp, (char *) "error occured during parallel for", TCL_STATIC);
00306 }
00307 }
00308 }
00309 }
00310 #endif
00311
00312 return TCL_OK;
00313 }
00314
00315
00316
00317
00318
00319
00320 if (!strupncmp(argv[1], "allgather", CMDLEN)) {
00321 int isok = (argc == 3);
00322 int N = app->par_size();
00323
00324
00325
00326
00327
00328
00329 if (N == 1) {
00330 if (!isok) {
00331 Tcl_SetResult(interp, (char *) "invalid parallel gather, missing parameter on one or more nodes", TCL_STATIC);
00332 return TCL_ERROR;
00333 }
00334
00335 Tcl_Obj *tcl_result = Tcl_NewListObj(0, NULL);
00336 Tcl_ListObjAppendElement(interp, tcl_result, Tcl_NewStringObj(argv[2], strlen(argv[2])));
00337 Tcl_SetObjResult(interp, tcl_result);
00338 return TCL_OK;
00339 }
00340 #if defined(VMDMPI)
00341 else {
00342 int allok = 0;
00343 int i;
00344
00345
00346 MPI_Allreduce(&isok, &allok, 1, MPI_INT, MPI_LAND, MPI_COMM_WORLD);
00347
00348 if (!allok) {
00349 Tcl_SetResult(interp, (char *) "invalid parallel gather, missing parameter on one or more nodes", TCL_STATIC);
00350 return TCL_ERROR;
00351 }
00352
00353
00354
00355 int *szlist = new int[N];
00356 szlist[app->par_rank()] = strlen(argv[2])+1;
00357
00358 #if defined(USE_MPI_IN_PLACE)
00359
00360 MPI_Allgather(MPI_IN_PLACE, 1, MPI_INT,
00361 &szlist[0], 1, MPI_INT, MPI_COMM_WORLD);
00362 #else
00363
00364 MPI_Allgather(&szlist[app->par_rank()], 1, MPI_INT,
00365 &szlist[0], 1, MPI_INT, MPI_COMM_WORLD);
00366 #endif
00367
00368 int totalsz = 0;
00369 int *displist = new int[N];
00370 for (i=0; i<N; i++) {
00371 displist[i]=totalsz;
00372 totalsz+=szlist[i];
00373 }
00374
00375 char *recvbuf = new char[totalsz];
00376 memset(recvbuf, 0, totalsz);
00377
00378
00379 strcpy(&recvbuf[displist[app->par_rank()]], argv[2]);
00380
00381
00382 #if defined(USE_MPI_IN_PLACE)
00383
00384 MPI_Allgatherv(MPI_IN_PLACE, szlist[app->par_rank()], MPI_BYTE,
00385 &recvbuf[0], szlist, displist, MPI_BYTE, MPI_COMM_WORLD);
00386 #else
00387
00388 MPI_Allgatherv(&recvbuf[displist[app->par_rank()]], szlist[app->par_rank()], MPI_BYTE,
00389 &recvbuf[0], szlist, displist, MPI_BYTE, MPI_COMM_WORLD);
00390 #endif
00391
00392
00393 Tcl_Obj *tcl_result = Tcl_NewListObj(0, NULL);
00394 for (i=0; i<N; i++) {
00395 Tcl_ListObjAppendElement(interp, tcl_result, Tcl_NewStringObj(&recvbuf[displist[i]], szlist[i]-1));
00396 }
00397 Tcl_SetObjResult(interp, tcl_result);
00398
00399 delete [] recvbuf;
00400 delete [] displist;
00401 delete [] szlist;
00402 return TCL_OK;
00403 }
00404 #endif
00405 }
00406
00407
00408
00409
00410
00411
00412
00413
00414
00415
00416
00417
00418
00419
00420
00421
00422
00423
00424 if (!strupncmp(argv[1], "allreduce", CMDLEN)) {
00425 int isok = (argc == 4);
00426 int N = app->par_size();
00427
00428
00429
00430
00431 if (N == 1) {
00432 if (!isok) {
00433 Tcl_SetResult(interp, (char *) "invalid parallel reduction, missing parameter", TCL_STATIC);
00434 return TCL_ERROR;
00435 }
00436
00437
00438 Tcl_SetObjResult(interp, Tcl_NewStringObj(argv[3], strlen(argv[3])));
00439 return TCL_OK;
00440 }
00441
00442 #if 1 && defined(VMDMPI)
00443
00444
00445
00446
00447
00448
00449
00450
00451
00452
00453
00454 int allok = 0;
00455
00456
00457 MPI_Allreduce(&isok, &allok, 1, MPI_INT, MPI_LAND, MPI_COMM_WORLD);
00458
00459
00460
00461
00462 if (!allok) {
00463 Tcl_SetResult(interp, (char *) "invalid parallel reduction, missing parameter on one or more nodes", TCL_STATIC);
00464 return TCL_ERROR;
00465 }
00466
00467
00468 Tcl_Obj *resultobj = Tcl_NewStringObj((const char *) argv[3], strlen(argv[3])+1);
00469
00470
00471
00472 int src=app->par_rank();
00473 int Ldest = (N + src + 1) % N;
00474 int Rdest = (N + src - 1) % N;
00475 MPI_Status status;
00476
00477 if (src != 0) {
00478 int recvsz = 0;
00479
00480
00481 MPI_Recv(&recvsz, 1, MPI_INT, Ldest,
00482 VMD_MPI_TAG_ALLREDUCE_ARGLENGTH, MPI_COMM_WORLD, &status);
00483
00484
00485 char * recvbuf = (char *) malloc(recvsz);
00486
00487
00488 MPI_Recv(recvbuf, recvsz, MPI_BYTE, Ldest,
00489 VMD_MPI_TAG_ALLREDUCE_PAYLOAD, MPI_COMM_WORLD, &status);
00490
00491
00492
00493
00494
00495 if (Tcl_VarEval(interp, argv[2], " ",
00496 Tcl_GetString(resultobj), " ",
00497 recvbuf, NULL) != TCL_OK) {
00498 printf("Error occured during reduction!\n");
00499 }
00500
00501
00502
00503 resultobj = Tcl_GetObjResult(interp);
00504
00505
00506 free(recvbuf);
00507 }
00508
00509
00510
00511
00512 char *sendbuf = Tcl_GetString(resultobj);
00513 int sendsz = strlen(sendbuf)+1;
00514
00515
00516 MPI_Send(&sendsz, 1, MPI_INT, Rdest,
00517 VMD_MPI_TAG_ALLREDUCE_ARGLENGTH, MPI_COMM_WORLD);
00518
00519
00520 MPI_Send(sendbuf, sendsz, MPI_BYTE, Rdest,
00521 VMD_MPI_TAG_ALLREDUCE_PAYLOAD, MPI_COMM_WORLD);
00522
00523 if (src == 0) {
00524 int recvsz = 0;
00525
00526
00527 MPI_Recv(&recvsz, 1, MPI_INT, Ldest,
00528 VMD_MPI_TAG_ALLREDUCE_ARGLENGTH, MPI_COMM_WORLD, &status);
00529
00530
00531 char * recvbuf = (char *) malloc(recvsz);
00532
00533
00534 MPI_Recv(recvbuf, recvsz, MPI_BYTE, Ldest,
00535 VMD_MPI_TAG_ALLREDUCE_PAYLOAD, MPI_COMM_WORLD, &status);
00536
00537
00538
00539
00540
00541 if (Tcl_VarEval(interp, argv[2], " ",
00542 Tcl_GetString(resultobj), " ",
00543 recvbuf, NULL) != TCL_OK) {
00544 printf("Error occured during reduction!\n");
00545 }
00546
00547
00548
00549 resultobj = Tcl_GetObjResult(interp);
00550
00551
00552 free(recvbuf);
00553 }
00554
00555
00556
00557
00558 if (src == 0) {
00559
00560 sendbuf = Tcl_GetString(resultobj);
00561 sendsz = strlen(sendbuf)+1;
00562 MPI_Bcast(&sendsz, 1, MPI_INT, 0, MPI_COMM_WORLD);
00563 MPI_Bcast(sendbuf, sendsz, MPI_BYTE, 0, MPI_COMM_WORLD);
00564 } else {
00565 int recvsz = 0;
00566 MPI_Bcast(&recvsz, 1, MPI_INT, 0, MPI_COMM_WORLD);
00567
00568
00569 char * recvbuf = (char *) malloc(recvsz);
00570
00571 MPI_Bcast(recvbuf, recvsz, MPI_BYTE, 0, MPI_COMM_WORLD);
00572
00573
00574 Tcl_SetObjResult(interp, Tcl_NewStringObj(recvbuf, recvsz-1));
00575
00576
00577 free(recvbuf);
00578 }
00579
00580 return TCL_OK;
00581
00582 #elif defined(VMDMPI)
00583
00584
00585
00586
00587
00588
00589
00590
00591
00592
00593
00594 int allok = 0;
00595 int i;
00596
00597
00598 MPI_Allreduce(&isok, &allok, 1, MPI_INT, MPI_LAND, MPI_COMM_WORLD);
00599
00600
00601
00602
00603 if (!allok) {
00604 Tcl_SetResult(interp, (char *) "invalid parallel reduction, missing parameter on one or more nodes", TCL_STATIC);
00605 return TCL_ERROR;
00606 }
00607
00608
00609 int log2N;
00610 for (log2N=0; N>1; N>>=1) {
00611 log2N++;
00612
00613
00614
00615 if ((N & 1) && (N > 1)) {
00616 Tcl_SetResult(interp, (char *) "parallel allreduce only allowed for even power-of-two node count", TCL_STATIC);
00617 return TCL_ERROR;
00618 }
00619 }
00620 N = app->par_size();
00621
00622
00623 Tcl_Obj *resultobj = Tcl_NewStringObj((const char *) argv[3], strlen(argv[3])+1);
00624
00625
00626
00627
00628
00629
00630
00631
00632
00633
00634
00635
00636
00637 int src=app->par_rank();
00638 for (i=0; i<log2N; i++) {
00639 int mask = 1 << i;
00640 int dest = src ^ mask;
00641 Tcl_Obj *oldresultobj = resultobj;
00642
00643
00644
00645
00646 if (dest < N) {
00647 char *sendbuf = Tcl_GetString(oldresultobj);
00648 int sendsz = strlen(sendbuf)+1;
00649 int recvsz = 0;
00650 MPI_Request handle;
00651 MPI_Status status;
00652
00653
00654
00655
00656
00657
00658 MPI_Irecv(&recvsz, 1, MPI_INT, dest,
00659 VMD_MPI_TAG_ALLREDUCE_ARGLENGTH, MPI_COMM_WORLD, &handle);
00660
00661
00662 MPI_Send(&sendsz, 1, MPI_INT, dest,
00663 VMD_MPI_TAG_ALLREDUCE_ARGLENGTH, MPI_COMM_WORLD);
00664
00665
00666 MPI_Wait(&handle, &status);
00667
00668
00669
00670
00671 char * recvbuf = (char *) malloc(recvsz);
00672
00673
00674
00675
00676
00677
00678 MPI_Irecv(recvbuf, recvsz, MPI_BYTE, dest,
00679 VMD_MPI_TAG_ALLREDUCE_PAYLOAD, MPI_COMM_WORLD, &handle);
00680
00681
00682 MPI_Send(sendbuf, sendsz, MPI_BYTE, dest,
00683 VMD_MPI_TAG_ALLREDUCE_PAYLOAD, MPI_COMM_WORLD);
00684
00685
00686 MPI_Wait(&handle, &status);
00687
00688
00689
00690
00691 if (Tcl_VarEval(interp, argv[2], " ",
00692 sendbuf, " ", recvbuf, NULL) != TCL_OK) {
00693 printf("Error occured during reduction!\n");
00694 }
00695
00696
00697 free(recvbuf);
00698
00699
00700
00701 resultobj = Tcl_GetObjResult(interp);
00702 }
00703 }
00704
00705
00706 Tcl_SetObjResult(interp, resultobj);
00707
00708 return TCL_OK;
00709 #endif
00710 }
00711
00712 Tcl_SetResult(interp, (char *) "invalid parallel subcommand.", TCL_STATIC);
00713
00714 return TCL_ERROR;
00715 }
00716