00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051 #include <stdio.h>
00052 #include <stdlib.h>
00053 #include <string.h>
00054
00059 #if defined(__linux)
00060 #define _GNU_SOURCE 1
00061 #include <sched.h>
00062 #endif
00063
00064 #include "WKFThreads.h"
00065
00066 #ifdef _MSC_VER
00067 #if 0
00068 #define WKFUSENEWWIN32APIS 1
00069 #define _WIN32_WINNT 0x0400
00070 #define WINVER 0x0400
00071 #endif
00072 #include <windows.h>
00073 #include <winbase.h>
00074 #endif
00075
00076 #if defined(_AIX) || defined(_CRAY) || defined(__irix) || defined(__linux) || defined(__osf__) || defined(__sun)
00077 #include <unistd.h>
00078 #endif
00079
00080 #if defined(__APPLE__) && defined(WKFTHREADS)
00081 #include <Carbon/Carbon.h>
00082 #endif
00083
00084 #if defined(__hpux)
00085 #include <sys/mpctl.h>
00086 #endif
00087
00088
00089 #ifdef __cplusplus
00090 extern "C" {
00091 #endif
00092
00093 int wkf_thread_numphysprocessors(void) {
00094 int a=1;
00095
00096 #ifdef WKFTHREADS
00097 #if defined(__APPLE__)
00098 a = MPProcessorsScheduled();
00099 #endif
00100
00101 #ifdef _MSC_VER
00102 struct _SYSTEM_INFO sysinfo;
00103 GetSystemInfo(&sysinfo);
00104 a = sysinfo.dwNumberOfProcessors;
00105 #endif
00106
00107 #if defined(_CRAY)
00108 a = sysconf(_SC_CRAY_NCPU);
00109 #endif
00110
00111 #if defined(__sun) || defined(__linux) || defined(__osf__) || defined(_AIX)
00112 a = sysconf(_SC_NPROCESSORS_ONLN);
00113 #endif
00114
00115 #if defined(__irix)
00116 a = sysconf(_SC_NPROC_ONLN);
00117 #endif
00118
00119 #if defined(__hpux)
00120 a = mpctl(MPC_GETNUMSPUS, 0, 0);
00121 #endif
00122 #endif
00123
00124 return a;
00125 }
00126
00127
00128 int wkf_thread_numprocessors(void) {
00129 int a=1;
00130
00131 #ifdef WKFTHREADS
00132
00133
00134 char *forcecount = getenv("WKFFORCECPUCOUNT");
00135
00136
00137 if (forcecount == NULL)
00138 forcecount = getenv("VMDFORCECPUCOUNT");
00139
00140 if (forcecount != NULL) {
00141 if (sscanf(forcecount, "%d", &a) == 1) {
00142 return a;
00143 } else {
00144 a=1;
00145 }
00146 }
00147
00148
00149 a = wkf_thread_numphysprocessors();
00150
00151
00152
00153
00154 #endif
00155
00156 return a;
00157 }
00158
00159
00160 int * wkf_cpu_affinitylist(int *cpuaffinitycount) {
00161 int *affinitylist = NULL;
00162 *cpuaffinitycount = -1;
00163
00164
00165 #if 0 && defined(_MSC_VER)
00166
00167 HANDLE myproc = GetCurrentProcess();
00168 DWORD affinitymask, sysaffinitymask;
00169
00170 if (!GetProcessAffinityMask(myproc, &affinitymask, &sysaffinitymask)) {
00171
00172 int affinitycount=0;
00173 int i;
00174 for (i=0; i<31; i++) {
00175 affinitycount += (affinitymask >> i) & 0x1;
00176 }
00177
00178
00179 if (affinitycount > 0) {
00180 affinitylist = (int *) malloc(affinitycount * sizeof(int));
00181 if (affinitylist == NULL)
00182 return NULL;
00183
00184 int curcount = 0;
00185 for (i=0; i<CPU_SETSIZE; i++) {
00186 if (CPU_ISSET(i, &affinitymask)) {
00187 affinitylist[curcount] = i;
00188 curcount++;
00189 }
00190 }
00191 }
00192
00193 *cpuaffinitycount = affinitycount;
00194 }
00195 #endif
00196
00197
00198 #if defined(__linux)
00199
00200
00201 #if defined(CPU_SETSIZE)
00202 int i;
00203 cpu_set_t affinitymask;
00204 int affinitycount=0;
00205
00206
00207 if (sched_getaffinity(0, sizeof(affinitymask), &affinitymask) < 0) {
00208 perror("wkf_cpu_affinitylist: sched_getaffinity");
00209 return NULL;
00210 }
00211
00212
00213 for (i=0; i<CPU_SETSIZE; i++) {
00214 affinitycount += CPU_ISSET(i, &affinitymask);
00215 }
00216
00217
00218 if (affinitycount > 0) {
00219 affinitylist = (int *) malloc(affinitycount * sizeof(int));
00220 if (affinitylist == NULL)
00221 return NULL;
00222
00223 int curcount = 0;
00224 for (i=0; i<CPU_SETSIZE; i++) {
00225 if (CPU_ISSET(i, &affinitymask)) {
00226 affinitylist[curcount] = i;
00227 curcount++;
00228 }
00229 }
00230 }
00231
00232 *cpuaffinitycount = affinitycount;
00233 #endif
00234 #endif
00235
00236
00237
00238
00239
00240
00241
00242
00243
00244
00245 return affinitylist;
00246 }
00247
00248
00249 int wkf_thread_set_self_cpuaffinity(int cpu) {
00250 int status=-1;
00251
00252 #ifdef WKFTHREADS
00253
00254 #if defined(__linux) && defined(CPU_ZERO) && defined(CPU_SET)
00255 #if 0
00256
00257
00258
00259 cpu_set_t affinitymask;
00260 CPU_ZERO(&affinitymask);
00261 CPU_SET(cpu, &affinitymask);
00262 status = pthread_setaffinity_np(pthread_self(), sizeof(affinitymask), &affinitymask);
00263 #else
00264
00265 cpu_set_t affinitymask;
00266 CPU_ZERO(&affinitymask);
00267 CPU_SET(cpu, &affinitymask);
00268
00269
00270 if ((status=sched_setaffinity(0, sizeof(affinitymask), &affinitymask)) < 0) {
00271 perror("wkf_thread_set_self_cpuaffinitylist: sched_setaffinity");
00272 return status;
00273 }
00274 #endif
00275
00276
00277 sched_yield();
00278 #endif
00279
00280
00281
00282
00283
00284
00285
00286
00287
00288 #endif
00289
00290 return status;
00291 }
00292
00293
00294 int wkf_thread_setconcurrency(int nthr) {
00295 int status=0;
00296
00297 #ifdef WKFTHREADS
00298 #if defined(__sun)
00299 #ifdef USEPOSIXTHREADS
00300 status = pthread_setconcurrency(nthr);
00301 #else
00302 status = thr_setconcurrency(nthr);
00303 #endif
00304 #endif
00305
00306 #if defined(__irix) || defined(_AIX)
00307 status = pthread_setconcurrency(nthr);
00308 #endif
00309 #endif
00310
00311 return status;
00312 }
00313
00314
00315
00316
00317
00319 typedef void * (*WKFTHREAD_START_ROUTINE)(void *);
00320
00321 int wkf_thread_create(wkf_thread_t * thr, void * fctn(void *), void * arg) {
00322 int status=0;
00323
00324 #ifdef WKFTHREADS
00325 #ifdef _MSC_VER
00326 DWORD tid;
00327 *thr = CreateThread(NULL, 8192, (LPTHREAD_START_ROUTINE) fctn, arg, 0, &tid);
00328 if (*thr == NULL) {
00329 status = -1;
00330 }
00331 #endif
00332
00333 #ifdef USEPOSIXTHREADS
00334 #if defined(_AIX)
00335
00336 {
00337 pthread_attr_t attr;
00338 pthread_attr_init(&attr);
00339 pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
00340 status = pthread_create(thr, &attr, (WKFTHREAD_START_ROUTINE)fctn, arg);
00341 pthread_attr_destroy(&attr);
00342 }
00343 #else
00344 status = pthread_create(thr, NULL, (WKFTHREAD_START_ROUTINE)fctn, arg);
00345 #endif
00346 #endif
00347 #endif
00348
00349 return status;
00350 }
00351
00352
00353 int wkf_thread_join(wkf_thread_t thr, void ** stat) {
00354 int status=0;
00355
00356 #ifdef WKFTHREADS
00357 #ifdef _MSC_VER
00358 DWORD wstatus = 0;
00359
00360 wstatus = WAIT_TIMEOUT;
00361
00362 while (wstatus != WAIT_OBJECT_0) {
00363 wstatus = WaitForSingleObject(thr, INFINITE);
00364 }
00365 #endif
00366
00367 #ifdef USEPOSIXTHREADS
00368 status = pthread_join(thr, stat);
00369 #endif
00370 #endif
00371
00372 return status;
00373 }
00374
00375
00376
00377
00378
00379 int wkf_mutex_init(wkf_mutex_t * mp) {
00380 int status=0;
00381
00382 #ifdef WKFTHREADS
00383 #ifdef _MSC_VER
00384 InitializeCriticalSection(mp);
00385 #endif
00386
00387 #ifdef USEPOSIXTHREADS
00388 status = pthread_mutex_init(mp, 0);
00389 #endif
00390 #endif
00391
00392 return status;
00393 }
00394
00395
00396 int wkf_mutex_lock(wkf_mutex_t * mp) {
00397 int status=0;
00398
00399 #ifdef WKFTHREADS
00400 #ifdef _MSC_VER
00401 EnterCriticalSection(mp);
00402 #endif
00403
00404 #ifdef USEPOSIXTHREADS
00405 status = pthread_mutex_lock(mp);
00406 #endif
00407 #endif
00408
00409 return status;
00410 }
00411
00412
00413 int wkf_mutex_trylock(wkf_mutex_t * mp) {
00414 int status=0;
00415
00416 #ifdef WKFTHREADS
00417 #ifdef _MSC_VER
00418 #if defined(WKFUSENEWWIN32APIS)
00419
00420
00421 status = (!(TryEnterCriticalSection(mp)));
00422 #endif
00423 #endif
00424
00425 #ifdef USEPOSIXTHREADS
00426 status = (pthread_mutex_lock(mp) != 0);
00427 #endif
00428 #endif
00429
00430 return status;
00431 }
00432
00433
00434 int wkf_mutex_spin_lock(wkf_mutex_t * mp) {
00435 int status=0;
00436
00437 #ifdef WKFTHREADS
00438 #ifdef _MSC_VER
00439 #if defined(WKFUSENEWWIN32APIS)
00440
00441
00442 while (!TryEnterCriticalSection(mp));
00443 #else
00444 EnterCriticalSection(mp);
00445 #endif
00446 #endif
00447
00448 #ifdef USEPOSIXTHREADS
00449 while ((status = pthread_mutex_trylock(mp)) != 0);
00450 #endif
00451 #endif
00452
00453 return status;
00454 }
00455
00456
00457 int wkf_mutex_unlock(wkf_mutex_t * mp) {
00458 int status=0;
00459
00460 #ifdef WKFTHREADS
00461 #ifdef _MSC_VER
00462 LeaveCriticalSection(mp);
00463 #endif
00464
00465 #ifdef USEPOSIXTHREADS
00466 status = pthread_mutex_unlock(mp);
00467 #endif
00468 #endif
00469
00470 return status;
00471 }
00472
00473
00474 int wkf_mutex_destroy(wkf_mutex_t * mp) {
00475 int status=0;
00476
00477 #ifdef WKFTHREADS
00478 #ifdef _MSC_VER
00479 DeleteCriticalSection(mp);
00480 #endif
00481
00482 #ifdef USEPOSIXTHREADS
00483 status = pthread_mutex_destroy(mp);
00484 #endif
00485 #endif
00486
00487 return status;
00488 }
00489
00490
00491
00492
00493
00494 int wkf_cond_init(wkf_cond_t * cvp) {
00495 int status=0;
00496
00497 #ifdef WKFTHREADS
00498 #ifdef _MSC_VER
00499 #if defined(WKFUSEWIN2008CONDVARS)
00500 InitializeConditionVariable(cvp);
00501 #else
00502
00503 cvp->waiters = 0;
00504
00505
00506 cvp->events[WKF_COND_SIGNAL] = CreateEvent(NULL,
00507 FALSE,
00508 FALSE,
00509 NULL);
00510
00511
00512 cvp->events[WKF_COND_BROADCAST] = CreateEvent(NULL,
00513 TRUE,
00514 FALSE,
00515 NULL);
00516 #endif
00517 #endif
00518
00519 #ifdef USEPOSIXTHREADS
00520 status = pthread_cond_init(cvp, NULL);
00521 #endif
00522 #endif
00523
00524 return status;
00525 }
00526
00527 int wkf_cond_destroy(wkf_cond_t * cvp) {
00528 int status=0;
00529
00530 #ifdef WKFTHREADS
00531 #ifdef _MSC_VER
00532 #if defined(WKFUSEWIN2008CONDVARS)
00533
00534 #else
00535 CloseHandle(cvp->events[WKF_COND_SIGNAL]);
00536 CloseHandle(cvp->events[WKF_COND_BROADCAST]);
00537 #endif
00538 #endif
00539
00540 #ifdef USEPOSIXTHREADS
00541 status = pthread_cond_destroy(cvp);
00542 #endif
00543 #endif
00544
00545 return status;
00546 }
00547
00548 int wkf_cond_wait(wkf_cond_t * cvp, wkf_mutex_t * mp) {
00549 int status=0;
00550 #if defined(WKFTHREADS) && defined(_MSC_VER)
00551 int result=0;
00552 LONG last_waiter;
00553 LONG my_waiter;
00554 #endif
00555
00556 #ifdef WKFTHREADS
00557 #ifdef _MSC_VER
00558 #if defined(WKFUSEWIN2008CONDVARS)
00559 SleepConditionVariableCS(cvp, mp, INFINITE)
00560 #else
00561 #if !defined(WKFUSEINTERLOCKEDATOMICOPS)
00562 EnterCriticalSection(&cvp->waiters_lock);
00563 cvp->waiters++;
00564 LeaveCriticalSection(&cvp->waiters_lock);
00565 #else
00566 InterlockedIncrement(&cvp->waiters);
00567 #endif
00568
00569 LeaveCriticalSection(mp);
00570
00571
00572 result = WaitForMultipleObjects(2, cvp->events, FALSE, INFINITE);
00573
00574 #if !defined(WKFUSEINTERLOCKEDATOMICOPS)
00575 EnterCriticalSection (&cvp->waiters_lock);
00576 cvp->waiters--;
00577 last_waiter =
00578 ((result == (WAIT_OBJECT_0 + WKF_COND_BROADCAST)) && cvp->waiters == 0);
00579 LeaveCriticalSection (&cvp->waiters_lock);
00580 #else
00581 my_waiter = InterlockedDecrement(&cvp->waiters);
00582 last_waiter =
00583 ((result == (WAIT_OBJECT_0 + WKF_COND_BROADCAST)) && my_waiter == 0);
00584 #endif
00585
00586
00587 if (last_waiter)
00588
00589
00590 ResetEvent(cvp->events[WKF_COND_BROADCAST]);
00591
00592 EnterCriticalSection(mp);
00593 #endif
00594 #endif
00595
00596 #ifdef USEPOSIXTHREADS
00597 status = pthread_cond_wait(cvp, mp);
00598 #endif
00599 #endif
00600
00601 return status;
00602 }
00603
00604 int wkf_cond_signal(wkf_cond_t * cvp) {
00605 int status=0;
00606
00607 #ifdef WKFTHREADS
00608 #ifdef _MSC_VER
00609 #if defined(WKFUSEWIN2008CONDVARS)
00610 WakeConditionVariable(cvp);
00611 #else
00612 #if !defined(WKFUSEINTERLOCKEDATOMICOPS)
00613 EnterCriticalSection(&cvp->waiters_lock);
00614 int have_waiters = (cvp->waiters > 0);
00615 LeaveCriticalSection(&cvp->waiters_lock);
00616 if (have_waiters)
00617 SetEvent (cvp->events[WKF_COND_SIGNAL]);
00618 #else
00619 if (InterlockedExchangeAdd(&cvp->waiters, 0) > 0)
00620 SetEvent(cvp->events[WKF_COND_SIGNAL]);
00621 #endif
00622 #endif
00623 #endif
00624
00625 #ifdef USEPOSIXTHREADS
00626 status = pthread_cond_signal(cvp);
00627 #endif
00628 #endif
00629
00630 return status;
00631 }
00632
00633 int wkf_cond_broadcast(wkf_cond_t * cvp) {
00634 int status=0;
00635
00636 #ifdef WKFTHREADS
00637 #ifdef _MSC_VER
00638 #if defined(WKFUSEWIN2008CONDVARS)
00639 WakeAllConditionVariable(cvp);
00640 #else
00641 #if !defined(WKFUSEINTERLOCKEDATOMICOPS)
00642 EnterCriticalSection(&cvp->waiters_lock);
00643 int have_waiters = (cvp->waiters > 0);
00644 LeaveCriticalSection(&cvp->waiters_lock);
00645 if (have_waiters)
00646 SetEvent(cvp->events[WKF_COND_BROADCAST]);
00647 #else
00648 if (InterlockedExchangeAdd(&cvp->waiters, 0) > 0)
00649 SetEvent(cvp->events[WKF_COND_BROADCAST]);
00650 #endif
00651
00652 #endif
00653 #endif
00654
00655 #ifdef USEPOSIXTHREADS
00656 status = pthread_cond_broadcast(cvp);
00657 #endif
00658 #endif
00659
00660 return status;
00661 }
00662
00663
00664
00665
00666
00667
00668 int wkf_thread_run_barrier_init(wkf_run_barrier_t *barrier, int n_clients) {
00669 #ifdef WKFTHREADS
00670 if (barrier != NULL) {
00671 barrier->n_clients = n_clients;
00672 barrier->n_waiting = 0;
00673 barrier->phase = 0;
00674 barrier->fctn = NULL;
00675
00676 wkf_mutex_init(&barrier->lock);
00677 wkf_cond_init(&barrier->wait_cv);
00678 }
00679 #endif
00680
00681 return 0;
00682 }
00683
00684 void wkf_thread_run_barrier_destroy(wkf_run_barrier_t *barrier) {
00685 #ifdef WKFTHREADS
00686 wkf_mutex_destroy(&barrier->lock);
00687 wkf_cond_destroy(&barrier->wait_cv);
00688 #endif
00689 }
00690
00691
00696 void * (*wkf_thread_run_barrier(wkf_run_barrier_t *barrier,
00697 void * fctn(void*),
00698 void * parms,
00699 void **rsltparms))(void *) {
00700 #if defined(WKFTHREADS)
00701 int my_phase;
00702 void * (*my_result)(void*);
00703
00704 wkf_mutex_lock(&barrier->lock);
00705 my_phase = barrier->phase;
00706 if (fctn != NULL)
00707 barrier->fctn = fctn;
00708 if (parms != NULL)
00709 barrier->parms = parms;
00710 barrier->n_waiting++;
00711
00712 if (barrier->n_waiting == barrier->n_clients) {
00713 barrier->rslt = barrier->fctn;
00714 barrier->rsltparms = barrier->parms;
00715 barrier->fctn = NULL;
00716 barrier->parms = NULL;
00717 barrier->n_waiting = 0;
00718 barrier->phase = 1 - my_phase;
00719 wkf_cond_broadcast(&barrier->wait_cv);
00720 }
00721
00722 while (barrier->phase == my_phase) {
00723 wkf_cond_wait(&barrier->wait_cv, &barrier->lock);
00724 }
00725
00726 my_result = barrier->rslt;
00727 if (rsltparms != NULL)
00728 *rsltparms = barrier->rsltparms;
00729
00730 wkf_mutex_unlock(&barrier->lock);
00731 #else
00732 void * (*my_result)(void*) = fctn;
00733 if (rsltparms != NULL)
00734 *rsltparms = parms;
00735 #endif
00736
00737 return my_result;
00738 }
00739
00740
00742 int wkf_thread_run_barrier_poll(wkf_run_barrier_t *barrier) {
00743 int rc=0;
00744 #if defined(WKFTHREADS)
00745 wkf_mutex_lock(&barrier->lock);
00746 if (barrier->n_waiting == (barrier->n_clients-1)) {
00747 rc=1;
00748 }
00749 wkf_mutex_unlock(&barrier->lock);
00750 #endif
00751 return rc;
00752 }
00753
00754
00755
00756
00757
00758 int wkf_tilestack_init(wkf_tilestack_t *s, int size) {
00759 if (s == NULL)
00760 return -1;
00761
00762 #if defined(WKFTHREADS)
00763 wkf_mutex_init(&s->mtx);
00764 #endif
00765
00766 s->growthrate = 512;
00767 s->top = -1;
00768
00769 if (size > 0) {
00770 s->size = size;
00771 s->s = (wkf_tasktile_t *) malloc(s->size * sizeof(wkf_tasktile_t));
00772 } else {
00773 s->size = 0;
00774 s->s = NULL;
00775 }
00776
00777 return 0;
00778 }
00779
00780
00781 void wkf_tilestack_destroy(wkf_tilestack_t *s) {
00782 #if defined(WKFTHREADS)
00783 wkf_mutex_destroy(&s->mtx);
00784 #endif
00785 free(s->s);
00786 s->s = NULL;
00787 }
00788
00789
00790 int wkf_tilestack_compact(wkf_tilestack_t *s) {
00791 #if defined(WKFTHREADS)
00792 wkf_mutex_lock(&s->mtx);
00793 #endif
00794 if (s->size > (s->top + 1)) {
00795 int newsize = s->top + 1;
00796 wkf_tasktile_t *tmp = (wkf_tasktile_t *) realloc(s->s, newsize * sizeof(wkf_tasktile_t));
00797 if (tmp == NULL) {
00798 #if defined(WKFTHREADS)
00799 wkf_mutex_unlock(&s->mtx);
00800 #endif
00801 return -1;
00802 }
00803 s->s = tmp;
00804 s->size = newsize;
00805 }
00806 #if defined(WKFTHREADS)
00807 wkf_mutex_unlock(&s->mtx);
00808 #endif
00809
00810 return 0;
00811 }
00812
00813
00814 int wkf_tilestack_push(wkf_tilestack_t *s, const wkf_tasktile_t *t) {
00815 #if defined(WKFTHREADS)
00816 wkf_mutex_lock(&s->mtx);
00817 #endif
00818 s->top++;
00819 if (s->top >= s->size) {
00820 int newsize = s->size + s->growthrate;
00821 wkf_tasktile_t *tmp = (wkf_tasktile_t *) realloc(s->s, newsize * sizeof(wkf_tasktile_t));
00822 if (tmp == NULL) {
00823 s->top--;
00824 #if defined(WKFTHREADS)
00825 wkf_mutex_unlock(&s->mtx);
00826 #endif
00827 return -1;
00828 }
00829 s->s = tmp;
00830 s->size = newsize;
00831 }
00832
00833 s->s[s->top] = *t;
00834
00835 #if defined(WKFTHREADS)
00836 wkf_mutex_unlock(&s->mtx);
00837 #endif
00838
00839 return 0;
00840 }
00841
00842
00843 int wkf_tilestack_pop(wkf_tilestack_t *s, wkf_tasktile_t *t) {
00844 #if defined(WKFTHREADS)
00845 wkf_mutex_lock(&s->mtx);
00846 #endif
00847
00848 if (s->top < 0) {
00849 #if defined(WKFTHREADS)
00850 wkf_mutex_unlock(&s->mtx);
00851 #endif
00852 return WKF_TILESTACK_EMPTY;
00853 }
00854
00855 *t = s->s[s->top];
00856 s->top--;
00857
00858 #if defined(WKFTHREADS)
00859 wkf_mutex_unlock(&s->mtx);
00860 #endif
00861
00862 return 0;
00863 }
00864
00865
00866 int wkf_tilestack_popall(wkf_tilestack_t *s) {
00867 #if defined(WKFTHREADS)
00868 wkf_mutex_lock(&s->mtx);
00869 #endif
00870
00871 s->top = -1;
00872
00873 #if defined(WKFTHREADS)
00874 wkf_mutex_unlock(&s->mtx);
00875 #endif
00876
00877 return 0;
00878 }
00879
00880
00881 int wkf_tilestack_empty(wkf_tilestack_t *s) {
00882 #if defined(WKFTHREADS)
00883 wkf_mutex_lock(&s->mtx);
00884 #endif
00885
00886 if (s->top < 0) {
00887 #if defined(WKFTHREADS)
00888 wkf_mutex_unlock(&s->mtx);
00889 #endif
00890 return 1;
00891 }
00892
00893 #if defined(WKFTHREADS)
00894 wkf_mutex_unlock(&s->mtx);
00895 #endif
00896
00897 return 0;
00898 }
00899
00900
00901
00902
00903
00904
00906 int wkf_shared_iterator_init(wkf_shared_iterator_t *it) {
00907 memset(it, 0, sizeof(wkf_shared_iterator_t));
00908 #if defined(WKFTHREADS)
00909 wkf_mutex_init(&it->mtx);
00910 #endif
00911 return 0;
00912 }
00913
00914
00916 int wkf_shared_iterator_destroy(wkf_shared_iterator_t *it) {
00917 #if defined(WKFTHREADS)
00918 wkf_mutex_destroy(&it->mtx);
00919 #endif
00920 return 0;
00921 }
00922
00923
00925 int wkf_shared_iterator_set(wkf_shared_iterator_t *it,
00926 wkf_tasktile_t *tile) {
00927 #if defined(WKFTHREADS)
00928 wkf_mutex_lock(&it->mtx);
00929 #endif
00930 it->start = tile->start;
00931 it->current = tile->start;
00932 it->end = tile->end;
00933 it->fatalerror = 0;
00934 #if defined(WKFTHREADS)
00935 wkf_mutex_unlock(&it->mtx);
00936 #endif
00937 return 0;
00938 }
00939
00940
00942 int wkf_shared_iterator_next_tile(wkf_shared_iterator_t *it, int reqsize,
00943 wkf_tasktile_t *tile) {
00944 int rc=WKF_SCHED_CONTINUE;
00945
00946 #if defined(WKFTHREADS)
00947 wkf_mutex_spin_lock(&it->mtx);
00948 #endif
00949 if (!it->fatalerror) {
00950 tile->start=it->current;
00951 it->current+=reqsize;
00952 tile->end=it->current;
00953
00954
00955 if (tile->start >= it->end) {
00956 tile->start=0;
00957 tile->end=0;
00958 rc = WKF_SCHED_DONE;
00959 }
00960
00961
00962
00963 if (tile->end > it->end) {
00964 tile->end = it->end;
00965 }
00966 } else {
00967 rc = WKF_SCHED_DONE;
00968 }
00969 #if defined(WKFTHREADS)
00970 wkf_mutex_unlock(&it->mtx);
00971 #endif
00972
00973 return rc;
00974 }
00975
00976
00978 int wkf_shared_iterator_setfatalerror(wkf_shared_iterator_t *it) {
00979 #if defined(WKFTHREADS)
00980 wkf_mutex_spin_lock(&it->mtx);
00981 #endif
00982 it->fatalerror=1;
00983 #if defined(WKFTHREADS)
00984 wkf_mutex_unlock(&it->mtx);
00985 #endif
00986 return 0;
00987 }
00988
00989
00991 int wkf_shared_iterator_getfatalerror(wkf_shared_iterator_t *it) {
00992 int rc=0;
00993 #if defined(WKFTHREADS)
00994 wkf_mutex_lock(&it->mtx);
00995 #endif
00996 if (it->fatalerror)
00997 rc = -1;
00998 #if defined(WKFTHREADS)
00999 wkf_mutex_unlock(&it->mtx);
01000 #endif
01001 return rc;
01002 }
01003
01004
01005 #if defined(WKFTHREADS)
01006
01007
01008
01009 static void * wkf_threadpool_workerproc(void *voidparms) {
01010 void *(*fctn)(void*);
01011 wkf_threadpool_workerdata_t *workerdata = (wkf_threadpool_workerdata_t *) voidparms;
01012 wkf_threadpool_t *thrpool = (wkf_threadpool_t *) workerdata->thrpool;
01013
01014 while ((fctn = wkf_thread_run_barrier(&thrpool->runbar, NULL, NULL, &workerdata->parms)) != NULL) {
01015 (*fctn)(workerdata);
01016 }
01017
01018 return NULL;
01019 }
01020
01021
01022 static void * wkf_threadpool_workersync(void *voidparms) {
01023 return NULL;
01024 }
01025 #endif
01026
01027
01028 wkf_threadpool_t * wkf_threadpool_create(int workercount, int *devlist) {
01029 int i;
01030 wkf_threadpool_t *thrpool = NULL;
01031 thrpool = (wkf_threadpool_t *) malloc(sizeof(wkf_threadpool_t));
01032 if (thrpool == NULL)
01033 return NULL;
01034
01035 memset(thrpool, 0, sizeof(wkf_threadpool_t));
01036
01037 #if !defined(WKFTHREADS)
01038 workercount=1;
01039 #endif
01040
01041
01042
01043 thrpool->devlist = (int *) malloc(sizeof(int) * workercount);
01044 if (devlist == NULL) {
01045 for (i=0; i<workercount; i++)
01046 thrpool->devlist[i] = -1;
01047 } else {
01048 memcpy(thrpool->devlist, devlist, sizeof(int) * workercount);
01049 }
01050
01051
01052 wkf_shared_iterator_init(&thrpool->iter);
01053
01054
01055 wkf_tilestack_init(&thrpool->errorstack, 64);
01056
01057
01058 thrpool->workercount = workercount;
01059 wkf_thread_run_barrier_init(&thrpool->runbar, workercount+1);
01060
01061
01062 thrpool->threads = (wkf_thread_t *) malloc(sizeof(wkf_thread_t) * workercount);
01063 thrpool->workerdata = (wkf_threadpool_workerdata_t *) malloc(sizeof(wkf_threadpool_workerdata_t) * workercount);
01064 memset(thrpool->workerdata, 0, sizeof(wkf_threadpool_workerdata_t) * workercount);
01065
01066
01067 for (i=0; i<workercount; i++) {
01068 thrpool->workerdata[i].iter=&thrpool->iter;
01069 thrpool->workerdata[i].errorstack=&thrpool->errorstack;
01070 thrpool->workerdata[i].threadid=i;
01071 thrpool->workerdata[i].threadcount=workercount;
01072 thrpool->workerdata[i].devid=thrpool->devlist[i];
01073 thrpool->workerdata[i].devspeed=1.0f;
01074 thrpool->workerdata[i].thrpool=thrpool;
01075 }
01076
01077 #if defined(WKFTHREADS)
01078
01079 for (i=0; i<workercount; i++) {
01080 wkf_thread_create(&thrpool->threads[i], wkf_threadpool_workerproc, &thrpool->workerdata[i]);
01081 }
01082 #endif
01083
01084 return thrpool;
01085 }
01086
01087
01088 int wkf_threadpool_launch(wkf_threadpool_t *thrpool,
01089 void *fctn(void *), void *parms, int blocking) {
01090 if (thrpool == NULL)
01091 return -1;
01092
01093 #if defined(WKFTHREADS)
01094
01095 wkf_thread_run_barrier(&thrpool->runbar, fctn, parms, NULL);
01096 if (blocking)
01097 wkf_thread_run_barrier(&thrpool->runbar, wkf_threadpool_workersync, NULL, NULL);
01098 #else
01099 thrpool->workerdata[0].parms = parms;
01100 (*fctn)(&thrpool->workerdata[0]);
01101 #endif
01102 return 0;
01103 }
01104
01105
01106 int wkf_threadpool_wait(wkf_threadpool_t *thrpool) {
01107 #if defined(WKFTHREADS)
01108 wkf_thread_run_barrier(&thrpool->runbar, wkf_threadpool_workersync, NULL, NULL);
01109 #endif
01110 return 0;
01111 }
01112
01113
01114 int wkf_threadpool_poll(wkf_threadpool_t *thrpool) {
01115 #if defined(WKFTHREADS)
01116 return wkf_thread_run_barrier_poll(&thrpool->runbar);
01117 #else
01118 return 1;
01119 #endif
01120 }
01121
01122
01123 int wkf_threadpool_destroy(wkf_threadpool_t *thrpool) {
01124 #if defined(WKFTHREADS)
01125 int i;
01126 #endif
01127
01128
01129 wkf_thread_run_barrier(&thrpool->runbar, NULL, NULL, NULL);
01130
01131 #if defined(WKFTHREADS)
01132
01133 for (i=0; i<thrpool->workercount; i++) {
01134 wkf_thread_join(thrpool->threads[i], NULL);
01135 }
01136 #endif
01137
01138
01139 wkf_thread_run_barrier_destroy(&thrpool->runbar);
01140
01141
01142 wkf_shared_iterator_destroy(&thrpool->iter);
01143
01144
01145 wkf_tilestack_destroy(&thrpool->errorstack);
01146
01147 free(thrpool->devlist);
01148 free(thrpool->threads);
01149 free(thrpool->workerdata);
01150 free(thrpool);
01151
01152 return 0;
01153 }
01154
01155
01157 int wkf_threadpool_get_workercount(wkf_threadpool_t *thrpool) {
01158 return thrpool->workercount;
01159 }
01160
01161
01163 int wkf_threadpool_worker_getid(void *voiddata, int *threadid, int *threadcount) {
01164 wkf_threadpool_workerdata_t *worker = (wkf_threadpool_workerdata_t *) voiddata;
01165 if (threadid != NULL)
01166 *threadid = worker->threadid;
01167
01168 if (threadcount != NULL)
01169 *threadcount = worker->threadcount;
01170
01171 return 0;
01172 }
01173
01174
01176 int wkf_threadpool_worker_getdevid(void *voiddata, int *devid) {
01177 wkf_threadpool_workerdata_t *worker = (wkf_threadpool_workerdata_t *) voiddata;
01178 if (devid != NULL)
01179 *devid = worker->devid;
01180
01181 return 0;
01182 }
01183
01184
01191 int wkf_threadpool_worker_setdevspeed(void *voiddata, float speed) {
01192 wkf_threadpool_workerdata_t *worker = (wkf_threadpool_workerdata_t *) voiddata;
01193 worker->devspeed = speed;
01194 return 0;
01195 }
01196
01197
01202 int wkf_threadpool_worker_getdevspeed(void *voiddata, float *speed) {
01203 wkf_threadpool_workerdata_t *worker = (wkf_threadpool_workerdata_t *) voiddata;
01204 if (speed != NULL)
01205 *speed = worker->devspeed;
01206 return 0;
01207 }
01208
01209
01214 int wkf_threadpool_worker_devscaletile(void *voiddata, int *tilesize) {
01215 wkf_threadpool_workerdata_t *worker = (wkf_threadpool_workerdata_t *) voiddata;
01216 if (tilesize != NULL) {
01217 int scaledtilesize;
01218 scaledtilesize = (int) (worker->devspeed * ((float) (*tilesize)));
01219 if (scaledtilesize < 1)
01220 scaledtilesize = 1;
01221
01222 *tilesize = scaledtilesize;
01223 }
01224
01225 return 0;
01226 }
01227
01228
01230 int wkf_threadpool_worker_getdata(void *voiddata, void **clientdata) {
01231 wkf_threadpool_workerdata_t *worker = (wkf_threadpool_workerdata_t *) voiddata;
01232 if (clientdata != NULL)
01233 *clientdata = worker->parms;
01234
01235 return 0;
01236 }
01237
01238
01240 int wkf_threadpool_sched_dynamic(wkf_threadpool_t *thrpool, wkf_tasktile_t *tile) {
01241 if (thrpool == NULL)
01242 return -1;
01243 return wkf_shared_iterator_set(&thrpool->iter, tile);
01244 }
01245
01246
01248 int wkf_threadpool_next_tile(void *voidparms, int reqsize,
01249 wkf_tasktile_t *tile) {
01250 int rc;
01251 wkf_threadpool_workerdata_t *worker = (wkf_threadpool_workerdata_t *) voidparms;
01252 rc = wkf_shared_iterator_next_tile(worker->iter, reqsize, tile);
01253 if (rc == WKF_SCHED_DONE) {
01254
01255
01256 if (wkf_tilestack_pop(worker->errorstack, tile) != WKF_TILESTACK_EMPTY)
01257 return WKF_SCHED_CONTINUE;
01258 }
01259
01260 return rc;
01261 }
01262
01263
01268 int wkf_threadpool_tile_failed(void *voidparms, wkf_tasktile_t *tile) {
01269 wkf_threadpool_workerdata_t *worker = (wkf_threadpool_workerdata_t *) voidparms;
01270 return wkf_tilestack_push(worker->errorstack, tile);
01271 }
01272
01273
01274
01275 int wkf_threadpool_setfatalerror(void *voidparms) {
01276 wkf_threadpool_workerdata_t *worker = (wkf_threadpool_workerdata_t *) voidparms;
01277 wkf_shared_iterator_setfatalerror(worker->iter);
01278 return 0;
01279 }
01280
01281
01282
01283 int wkf_threadpool_getfatalerror(void *voidparms) {
01284 wkf_threadpool_workerdata_t *worker = (wkf_threadpool_workerdata_t *) voidparms;
01285
01286 return wkf_shared_iterator_getfatalerror(worker->iter);
01287 }
01288
01289
01290
01291 int wkf_threadlaunch(int numprocs, void *clientdata, void * fctn(void *),
01292 wkf_tasktile_t *tile) {
01293 wkf_shared_iterator_t iter;
01294 wkf_threadlaunch_t *parms=NULL;
01295 wkf_thread_t * threads=NULL;
01296 int i, rc;
01297
01298
01299 #if !defined(WKFTHREADS)
01300 numprocs=1;
01301 #endif
01302
01303
01304 wkf_shared_iterator_init(&iter);
01305 if (wkf_shared_iterator_set(&iter, tile))
01306 return -1;
01307
01308
01309 threads = (wkf_thread_t *) calloc(numprocs * sizeof(wkf_thread_t), 1);
01310 if (threads == NULL)
01311 return -1;
01312
01313
01314 parms = (wkf_threadlaunch_t *) malloc(numprocs * sizeof(wkf_threadlaunch_t));
01315 if (parms == NULL) {
01316 free(threads);
01317 return -1;
01318 }
01319 for (i=0; i<numprocs; i++) {
01320 parms[i].iter = &iter;
01321 parms[i].threadid = i;
01322 parms[i].threadcount = numprocs;
01323 parms[i].clientdata = clientdata;
01324 }
01325
01326 #if defined(WKFTHREADS)
01327 if (numprocs == 1) {
01328
01329
01330
01331
01332
01333
01334
01335
01336 fctn((void *) &parms[0]);
01337 } else {
01338
01339 for (i=0; i<numprocs; i++) {
01340 wkf_thread_create(&threads[i], fctn, &parms[i]);
01341 }
01342
01343
01344 for (i=0; i<numprocs; i++) {
01345 wkf_thread_join(threads[i], NULL);
01346 }
01347 }
01348 #else
01349
01350 fctn((void *) &parms[0]);
01351 #endif
01352
01353
01354 free(parms);
01355 free(threads);
01356
01357
01358 rc=wkf_shared_iterator_getfatalerror(&iter);
01359
01360
01361 wkf_shared_iterator_destroy(&iter);
01362
01363 return rc;
01364 }
01365
01366
01368 int wkf_threadlaunch_getid(void *voidparms, int *threadid, int *threadcount) {
01369 wkf_threadlaunch_t *worker = (wkf_threadlaunch_t *) voidparms;
01370 if (threadid != NULL)
01371 *threadid = worker->threadid;
01372
01373 if (threadcount != NULL)
01374 *threadcount = worker->threadcount;
01375
01376 return 0;
01377 }
01378
01379
01381 int wkf_threadlaunch_getdata(void *voidparms, void **clientdata) {
01382 wkf_threadlaunch_t *worker = (wkf_threadlaunch_t *) voidparms;
01383 if (clientdata != NULL)
01384 *clientdata = worker->clientdata;
01385
01386 return 0;
01387 }
01388
01389
01391 int wkf_threadlaunch_next_tile(void *voidparms, int reqsize,
01392 wkf_tasktile_t *tile) {
01393 wkf_threadlaunch_t *worker = (wkf_threadlaunch_t *) voidparms;
01394 return wkf_shared_iterator_next_tile(worker->iter, reqsize, tile);
01395 }
01396
01397
01399 int wkf_threadlaunch_setfatalerror(void *voidparms) {
01400 wkf_threadlaunch_t *worker = (wkf_threadlaunch_t *) voidparms;
01401 return wkf_shared_iterator_setfatalerror(worker->iter);
01402 }
01403
01404
01405 #ifdef __cplusplus
01406 }
01407 #endif
01408
01409