00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026 #if defined(VMDMPI)
00027 #include <mpi.h>
00028
00029
00030
00031
00032 #endif
00033
00034 #include <stdio.h>
00035 #include <string.h>
00036 #include <stdlib.h>
00037 #include <tcl.h>
00038 #include "config.h"
00039 #include "CommandQueue.h"
00040 #include "Command.h"
00041 #include "VMDApp.h"
00042 #include "Inform.h"
00043 #include "WKFThreads.h"
00044
00045 #define VMD_MPI_TAG_ALLREDUCE_ARGLENGTH 1
00046 #define VMD_MPI_TAG_ALLREDUCE_PAYLOAD 2
00047 #define VMD_MPI_TAG_FOR_REQUEST 3
00048
00049
00050
00051
00052
00053 #if defined(VMDMPI)
00054
00055 typedef struct {
00056 int numnodes;
00057 wkf_tasktile_t loop;
00058 wkf_shared_iterator_t iter;
00059 } parallel_for_parms;
00060
00061 extern "C" void *vmd_mpi_parallel_for_scheduler(void *voidparms) {
00062 parallel_for_parms *parfor = (parallel_for_parms *) voidparms;
00063
00064
00065
00066 #if defined(VMDTHREADS)
00067 int i;
00068 wkf_tasktile_t curtile;
00069 while (wkf_shared_iterator_next_tile(&parfor->iter, 1, &curtile) != WKF_SCHED_DONE) {
00070 i = curtile.start;
00071 #else
00072 int i;
00073 for (i=parfor->loop.start; i<parfor->loop.end; i++) {
00074 #endif
00075 int reqnode;
00076 MPI_Status rcvstat;
00077 MPI_Recv(&reqnode, 1, MPI_INT, MPI_ANY_SOURCE, VMD_MPI_TAG_FOR_REQUEST,
00078 MPI_COMM_WORLD, &rcvstat);
00079 MPI_Send(&i, 1, MPI_INT, reqnode, VMD_MPI_TAG_FOR_REQUEST,
00080 MPI_COMM_WORLD);
00081 }
00082
00083
00084 int node;
00085 for (node=1; node<parfor->numnodes; node++) {
00086 int reqnode;
00087 MPI_Status rcvstat;
00088 MPI_Recv(&reqnode, 1, MPI_INT, MPI_ANY_SOURCE, VMD_MPI_TAG_FOR_REQUEST,
00089 MPI_COMM_WORLD, &rcvstat);
00090
00091 i=-1;
00092 MPI_Send(&i, 1, MPI_INT, reqnode, VMD_MPI_TAG_FOR_REQUEST,
00093 MPI_COMM_WORLD);
00094 }
00095
00096 return NULL;
00097 }
00098
00099 #endif
00100
00101
00102
00103 int text_cmd_parallel(ClientData cd, Tcl_Interp *interp, int argc, const char *argv[]) {
00104 VMDApp *app = (VMDApp *)cd;
00105
00106 if(argc<2) {
00107 Tcl_SetResult(interp,
00108 (char *)
00109 "Parallel job query commands:\n"
00110 " parallel nodename\n"
00111 " parallel noderank\n"
00112 " parallel nodecount\n"
00113 "Parallel collective operations (all nodes MUST participate):\n"
00114 " parallel allgather <object>\n"
00115 " parallel allreduce <tcl reduction proc> <object>\n"
00116 " parallel barrier\n"
00117 " parallel for <startcount> <endcount> <tcl callback proc> <user data>",
00118 TCL_STATIC);
00119 return TCL_ERROR;
00120 }
00121
00122
00123 if (!strcmp(argv[1], "nodename")) {
00124 Tcl_Obj *tcl_result = Tcl_NewListObj(0, NULL);
00125 Tcl_ListObjAppendElement(interp, tcl_result, Tcl_NewStringObj(app->par_name(), strlen(app->par_name())));
00126 Tcl_SetObjResult(interp, tcl_result);
00127 return TCL_OK;
00128 }
00129
00130
00131 if (!strcmp(argv[1], "noderank")) {
00132 Tcl_Obj *tcl_result = Tcl_NewListObj(0, NULL);
00133 Tcl_ListObjAppendElement(interp, tcl_result, Tcl_NewIntObj(app->par_rank()));
00134 Tcl_SetObjResult(interp, tcl_result);
00135 return TCL_OK;
00136 }
00137
00138
00139 if (!strcmp(argv[1], "nodecount")) {
00140 Tcl_Obj *tcl_result = Tcl_NewListObj(0, NULL);
00141 Tcl_ListObjAppendElement(interp, tcl_result, Tcl_NewIntObj(app->par_size()));
00142 Tcl_SetObjResult(interp, tcl_result);
00143 return TCL_OK;
00144 }
00145
00146
00147 if(!strupncmp(argv[1], "barrier", CMDLEN) && argc==2) {
00148 app->par_barrier();
00149 return TCL_OK;
00150 }
00151
00152
00153
00154
00155
00156 if(!strupncmp(argv[1], "for", CMDLEN)) {
00157 int isok = (argc == 6);
00158 int N = app->par_size();
00159 int start, end;
00160
00161 if (Tcl_GetInt(interp, argv[2], &start) != TCL_OK ||
00162 Tcl_GetInt(interp, argv[3], &end) != TCL_OK) {
00163 isok = 0;
00164 }
00165
00166
00167
00168
00169 if (N == 1) {
00170 if (!isok) {
00171 Tcl_SetResult(interp, (char *) "invalid parallel for, missing parameter", TCL_STATIC);
00172 return TCL_ERROR;
00173 }
00174
00175
00176 int i;
00177 for (i=start; i<=end; i++) {
00178 char istr[128];
00179 sprintf(istr, "%d", i);
00180 if (Tcl_VarEval(interp, argv[4], " ", istr, " {",
00181 argv[5], "} ", NULL) != TCL_OK) {
00182 Tcl_SetResult(interp, (char *) "error occured during parallel for", TCL_STATIC);
00183 }
00184 }
00185
00186 return TCL_OK;
00187 }
00188
00189 #if 1 && defined(VMDMPI)
00190 int allok = 0;
00191
00192
00193 MPI_Allreduce(&isok, &allok, 1, MPI_INT, MPI_LAND, MPI_COMM_WORLD);
00194
00195
00196
00197
00198 if (!allok) {
00199 Tcl_SetResult(interp, (char *) "invalid parallel for, missing parameter on one or more nodes", TCL_STATIC);
00200 return TCL_ERROR;
00201 }
00202
00203
00204
00205 int i;
00206 if (app->par_rank() == 0) {
00207
00208 #if 1
00209 parallel_for_parms parfor;
00210 memset(&parfor, 0, sizeof(parfor));
00211 parfor.numnodes = N;
00212 parfor.loop.start=start;
00213 parfor.loop.end=end+1;
00214 wkf_shared_iterator_init(&parfor.iter);
00215 wkf_shared_iterator_set(&parfor.iter, &parfor.loop);
00216
00217 #if defined(VMDTHREADS)
00218
00219 wkf_thread_t pft;
00220 wkf_thread_create(&pft, vmd_mpi_parallel_for_scheduler, &parfor);
00221
00222
00223 wkf_tasktile_t curtile;
00224 while (wkf_shared_iterator_next_tile(&parfor.iter, 1, &curtile) != WKF_SCHED_DONE) {
00225 i = curtile.start;
00226 char istr[128];
00227 sprintf(istr, "%d", i);
00228 if (Tcl_VarEval(interp, argv[4], " ", istr, " {",
00229 argv[5], "} ", NULL) != TCL_OK) {
00230 Tcl_SetResult(interp, (char *) "error occured during parallel for", TCL_STATIC);
00231 }
00232 }
00233
00234
00235 wkf_thread_join(pft, NULL);
00236 #else
00237 vmd_mpi_parallel_for_scheduler(&parfor);
00238 #endif
00239
00240 wkf_shared_iterator_destroy(&parfor.iter);
00241
00242 #else
00243 for (i=start; i<=end; i++) {
00244 int reqnode;
00245 MPI_Status rcvstat;
00246 MPI_Recv(&reqnode, 1, MPI_INT, MPI_ANY_SOURCE, VMD_MPI_TAG_FOR_REQUEST,
00247 MPI_COMM_WORLD, &rcvstat);
00248 MPI_Send(&i, 1, MPI_INT, reqnode, VMD_MPI_TAG_FOR_REQUEST,
00249 MPI_COMM_WORLD);
00250 }
00251
00252
00253 int node;
00254 for (node=1; node<N; node++) {
00255 int reqnode;
00256 MPI_Status rcvstat;
00257 MPI_Recv(&reqnode, 1, MPI_INT, MPI_ANY_SOURCE, VMD_MPI_TAG_FOR_REQUEST,
00258 MPI_COMM_WORLD, &rcvstat);
00259
00260 i=-1;
00261 MPI_Send(&i, 1, MPI_INT, reqnode, VMD_MPI_TAG_FOR_REQUEST,
00262 MPI_COMM_WORLD);
00263 }
00264 #endif
00265 } else {
00266 char istr[128];
00267 int done=0;
00268 int mynode=app->par_rank();
00269 while (!done) {
00270 MPI_Send(&mynode, 1, MPI_INT, 0, VMD_MPI_TAG_FOR_REQUEST,
00271 MPI_COMM_WORLD);
00272
00273 MPI_Status rcvstat;
00274 MPI_Recv(&i, 1, MPI_INT, MPI_ANY_SOURCE, VMD_MPI_TAG_FOR_REQUEST,
00275 MPI_COMM_WORLD, &rcvstat);
00276 if (i == -1) {
00277 done = 1;
00278 } else {
00279 sprintf(istr, "%d", i);
00280 if (Tcl_VarEval(interp, argv[4], " ", istr, " {",
00281 argv[5], "} ", NULL) != TCL_OK) {
00282 Tcl_SetResult(interp, (char *) "error occured during parallel for", TCL_STATIC);
00283 }
00284 }
00285 }
00286 }
00287 #endif
00288
00289 return TCL_OK;
00290 }
00291
00292
00293
00294
00295
00296 if(!strupncmp(argv[1], "allgather", CMDLEN)) {
00297 int isok = (argc == 3);
00298
00299 #if defined(VMDMPI)
00300 int allok = 0;
00301 int i;
00302
00303
00304 MPI_Allreduce(&isok, &allok, 1, MPI_INT, MPI_LAND, MPI_COMM_WORLD);
00305
00306 if (!allok) {
00307 Tcl_SetResult(interp, (char *) "invalid parallel gather, missing parameter on one or more nodes", TCL_STATIC);
00308 return TCL_ERROR;
00309 }
00310
00311
00312
00313 int *szlist = new int[app->par_size()];
00314 szlist[app->par_rank()] = strlen(argv[2])+1;
00315
00316 #if defined(USE_MPI_IN_PLACE)
00317
00318 MPI_Allgather(MPI_IN_PLACE, 1, MPI_INT,
00319 &szlist[0], 1, MPI_INT, MPI_COMM_WORLD);
00320 #else
00321
00322 MPI_Allgather(&szlist[app->par_rank()], 1, MPI_INT,
00323 &szlist[0], 1, MPI_INT, MPI_COMM_WORLD);
00324 #endif
00325
00326 int totalsz = 0;
00327 int *displist = new int[app->par_size()];
00328 for (i=0; i<app->par_size(); i++) {
00329 displist[i]=totalsz;
00330 totalsz+=szlist[i];
00331 }
00332
00333 char *recvbuf = new char[totalsz];
00334 memset(recvbuf, 0, totalsz);
00335
00336
00337 strcpy(&recvbuf[displist[app->par_rank()]], argv[2]);
00338
00339
00340 #if defined(USE_MPI_IN_PLACE)
00341
00342 MPI_Allgatherv(MPI_IN_PLACE, szlist[app->par_rank()], MPI_BYTE,
00343 &recvbuf[0], szlist, displist, MPI_BYTE, MPI_COMM_WORLD);
00344 #else
00345
00346 MPI_Allgatherv(&recvbuf[displist[app->par_rank()]], szlist[app->par_rank()], MPI_BYTE,
00347 &recvbuf[0], szlist, displist, MPI_BYTE, MPI_COMM_WORLD);
00348 #endif
00349
00350
00351 Tcl_Obj *tcl_result = Tcl_NewListObj(0, NULL);
00352 for (i=0; i<app->par_size(); i++) {
00353 Tcl_ListObjAppendElement(interp, tcl_result, Tcl_NewStringObj(&recvbuf[displist[i]], szlist[i]-1));
00354 }
00355 Tcl_SetObjResult(interp, tcl_result);
00356
00357 delete [] recvbuf;
00358 delete [] displist;
00359 delete [] szlist;
00360 return TCL_OK;
00361 #else
00362 if (!isok) {
00363 Tcl_SetResult(interp, (char *) "invalid parallel gather, missing parameter on one or more nodes", TCL_STATIC);
00364 return TCL_ERROR;
00365 }
00366
00367 Tcl_Obj *tcl_result = Tcl_NewListObj(0, NULL);
00368 Tcl_ListObjAppendElement(interp, tcl_result, Tcl_NewStringObj(argv[2], strlen(argv[2])));
00369 Tcl_SetObjResult(interp, tcl_result);
00370 return TCL_OK;
00371 #endif
00372 }
00373
00374
00375
00376
00377
00378
00379
00380
00381
00382
00383
00384
00385
00386
00387
00388
00389
00390
00391 if(!strupncmp(argv[1], "allreduce", CMDLEN)) {
00392 int isok = (argc == 4);
00393 int N = app->par_size();
00394
00395
00396
00397
00398 if (N == 1) {
00399 if (!isok) {
00400 Tcl_SetResult(interp, (char *) "invalid parallel reduction, missing parameter", TCL_STATIC);
00401 return TCL_ERROR;
00402 }
00403
00404
00405 Tcl_SetObjResult(interp, Tcl_NewStringObj(argv[3], strlen(argv[3])));
00406 return TCL_OK;
00407 }
00408
00409 #if 1 && defined(VMDMPI)
00410
00411
00412
00413
00414
00415
00416
00417
00418
00419
00420
00421 int allok = 0;
00422
00423
00424 MPI_Allreduce(&isok, &allok, 1, MPI_INT, MPI_LAND, MPI_COMM_WORLD);
00425
00426
00427
00428
00429 if (!allok) {
00430 Tcl_SetResult(interp, (char *) "invalid parallel reduction, missing parameter on one or more nodes", TCL_STATIC);
00431 return TCL_ERROR;
00432 }
00433
00434
00435 Tcl_Obj *resultobj = Tcl_NewStringObj((const char *) argv[3], strlen(argv[3])+1);
00436
00437
00438
00439 int src=app->par_rank();
00440 int Ldest = (N + src + 1) % N;
00441 int Rdest = (N + src - 1) % N;
00442 MPI_Status status;
00443
00444 if (src != 0) {
00445 int recvsz = 0;
00446
00447
00448 MPI_Recv(&recvsz, 1, MPI_INT, Ldest,
00449 VMD_MPI_TAG_ALLREDUCE_ARGLENGTH, MPI_COMM_WORLD, &status);
00450
00451
00452 char * recvbuf = (char *) malloc(recvsz);
00453
00454
00455 MPI_Recv(recvbuf, recvsz, MPI_BYTE, Ldest,
00456 VMD_MPI_TAG_ALLREDUCE_PAYLOAD, MPI_COMM_WORLD, &status);
00457
00458
00459
00460
00461
00462 if (Tcl_VarEval(interp, argv[2], " ",
00463 Tcl_GetString(resultobj), " ",
00464 recvbuf, NULL) != TCL_OK) {
00465 printf("Error occured during reduction!\n");
00466 }
00467
00468
00469
00470 resultobj = Tcl_GetObjResult(interp);
00471
00472
00473 free(recvbuf);
00474 }
00475
00476
00477
00478
00479 char *sendbuf = Tcl_GetString(resultobj);
00480 int sendsz = strlen(sendbuf)+1;
00481
00482
00483 MPI_Send(&sendsz, 1, MPI_INT, Rdest,
00484 VMD_MPI_TAG_ALLREDUCE_ARGLENGTH, MPI_COMM_WORLD);
00485
00486
00487 MPI_Send(sendbuf, sendsz, MPI_BYTE, Rdest,
00488 VMD_MPI_TAG_ALLREDUCE_PAYLOAD, MPI_COMM_WORLD);
00489
00490 if (src == 0) {
00491 int recvsz = 0;
00492
00493
00494 MPI_Recv(&recvsz, 1, MPI_INT, Ldest,
00495 VMD_MPI_TAG_ALLREDUCE_ARGLENGTH, MPI_COMM_WORLD, &status);
00496
00497
00498 char * recvbuf = (char *) malloc(recvsz);
00499
00500
00501 MPI_Recv(recvbuf, recvsz, MPI_BYTE, Ldest,
00502 VMD_MPI_TAG_ALLREDUCE_PAYLOAD, MPI_COMM_WORLD, &status);
00503
00504
00505
00506
00507
00508 if (Tcl_VarEval(interp, argv[2], " ",
00509 Tcl_GetString(resultobj), " ",
00510 recvbuf, NULL) != TCL_OK) {
00511 printf("Error occured during reduction!\n");
00512 }
00513
00514
00515
00516 resultobj = Tcl_GetObjResult(interp);
00517
00518
00519 free(recvbuf);
00520 }
00521
00522
00523
00524
00525 if (src == 0) {
00526
00527 sendbuf = Tcl_GetString(resultobj);
00528 sendsz = strlen(sendbuf)+1;
00529 MPI_Bcast(&sendsz, 1, MPI_INT, 0, MPI_COMM_WORLD);
00530 MPI_Bcast(sendbuf, sendsz, MPI_BYTE, 0, MPI_COMM_WORLD);
00531 } else {
00532 int recvsz = 0;
00533 MPI_Bcast(&recvsz, 1, MPI_INT, 0, MPI_COMM_WORLD);
00534
00535
00536 char * recvbuf = (char *) malloc(recvsz);
00537
00538 MPI_Bcast(recvbuf, recvsz, MPI_BYTE, 0, MPI_COMM_WORLD);
00539
00540
00541 Tcl_SetObjResult(interp, Tcl_NewStringObj(recvbuf, recvsz-1));
00542
00543
00544 free(recvbuf);
00545 }
00546
00547 return TCL_OK;
00548
00549 #elif defined(VMDMPI)
00550
00551
00552
00553
00554
00555
00556
00557
00558
00559
00560
00561 int allok = 0;
00562 int i;
00563
00564
00565 MPI_Allreduce(&isok, &allok, 1, MPI_INT, MPI_LAND, MPI_COMM_WORLD);
00566
00567
00568
00569
00570 if (!allok) {
00571 Tcl_SetResult(interp, (char *) "invalid parallel reduction, missing parameter on one or more nodes", TCL_STATIC);
00572 return TCL_ERROR;
00573 }
00574
00575
00576 int log2N;
00577 for (log2N=0; N>1; N>>=1) {
00578 log2N++;
00579
00580
00581
00582 if ((N & 1) && (N > 1)) {
00583 Tcl_SetResult(interp, (char *) "parallel allreduce only allowed for even power-of-two node count", TCL_STATIC);
00584 return TCL_ERROR;
00585 }
00586 }
00587 N = app->par_size();
00588
00589
00590 Tcl_Obj *resultobj = Tcl_NewStringObj((const char *) argv[3], strlen(argv[3])+1);
00591
00592
00593
00594
00595
00596
00597
00598
00599
00600
00601
00602
00603
00604 int src=app->par_rank();
00605 for (i=0; i<log2N; i++) {
00606 int mask = 1 << i;
00607 int dest = src ^ mask;
00608 Tcl_Obj *oldresultobj = resultobj;
00609
00610
00611
00612
00613 if (dest < N) {
00614 char *sendbuf = Tcl_GetString(oldresultobj);
00615 int sendsz = strlen(sendbuf)+1;
00616 int recvsz = 0;
00617 MPI_Request handle;
00618 MPI_Status status;
00619
00620
00621
00622
00623
00624
00625 MPI_Irecv(&recvsz, 1, MPI_INT, dest,
00626 VMD_MPI_TAG_ALLREDUCE_ARGLENGTH, MPI_COMM_WORLD, &handle);
00627
00628
00629 MPI_Send(&sendsz, 1, MPI_INT, dest,
00630 VMD_MPI_TAG_ALLREDUCE_ARGLENGTH, MPI_COMM_WORLD);
00631
00632
00633 MPI_Wait(&handle, &status);
00634
00635
00636
00637
00638 char * recvbuf = (char *) malloc(recvsz);
00639
00640
00641
00642
00643
00644
00645 MPI_Irecv(recvbuf, recvsz, MPI_BYTE, dest,
00646 VMD_MPI_TAG_ALLREDUCE_PAYLOAD, MPI_COMM_WORLD, &handle);
00647
00648
00649 MPI_Send(sendbuf, sendsz, MPI_BYTE, dest,
00650 VMD_MPI_TAG_ALLREDUCE_PAYLOAD, MPI_COMM_WORLD);
00651
00652
00653 MPI_Wait(&handle, &status);
00654
00655
00656
00657
00658 if (Tcl_VarEval(interp, argv[2], " ",
00659 sendbuf, " ", recvbuf, NULL) != TCL_OK) {
00660 printf("Error occured during reduction!\n");
00661 }
00662
00663
00664 free(recvbuf);
00665
00666
00667
00668 resultobj = Tcl_GetObjResult(interp);
00669 }
00670 }
00671
00672
00673 Tcl_SetObjResult(interp, resultobj);
00674
00675 return TCL_OK;
00676 #endif
00677 }
00678
00679 Tcl_SetResult(interp, (char *) "invalid parallel subcommand.", TCL_STATIC);
00680
00681 return TCL_ERROR;
00682 }
00683