X-Git-Url: http://git.xonotic.org/?p=xonotic%2Fdarkplaces.git;a=blobdiff_plain;f=taskqueue.c;h=e2445d43cb621dad52c831fd71576ad27656478e;hp=35a6c753d691378a87c30aba381ecf891399a152;hb=b2a1a3ffa49a3f315f9f59aa011c5888ad4bea4b;hpb=749072a8f60b88ef37448e224e83944b7096e909 diff --git a/taskqueue.c b/taskqueue.c index 35a6c753..e2445d43 100644 --- a/taskqueue.c +++ b/taskqueue.c @@ -1,312 +1,312 @@ -#include "quakedef.h" -#include "taskqueue.h" - -cvar_t taskqueue_minthreads = {CVAR_CLIENT | CVAR_SERVER | CVAR_SAVE, "taskqueue_minthreads", "4", "minimum number of threads to keep active for executing tasks"}; -cvar_t taskqueue_maxthreads = {CVAR_CLIENT | CVAR_SERVER | CVAR_SAVE, "taskqueue_maxthreads", "32", "maximum number of threads to start up as needed based on task count"}; -cvar_t taskqueue_tasksperthread = {CVAR_CLIENT | CVAR_SERVER | CVAR_SAVE, "taskqueue_tasksperthread", "4000", "expected amount of work that a single thread can do in a frame - the number of threads being used depends on the average workload in recent frames"}; - -#define MAXTHREADS 1024 -#define RECENTFRAMES 64 // averaging thread activity over this many frames to decide how many threads we need -#define THREADTASKS 256 // thread can hold this many tasks in its own queue -#define THREADBATCH 64 // thread will run this many tasks before checking status again -#define THREADSLEEPCOUNT 1000 // thread will sleep for a little while if it checks this many times and has no work to do - -typedef struct taskqueue_state_thread_s -{ - void *handle; - unsigned int quit; - unsigned int thread_index; - unsigned int tasks_completed; - - unsigned int enqueueposition; - unsigned int dequeueposition; - taskqueue_task_t *queue[THREADTASKS]; -} -taskqueue_state_thread_t; - -typedef struct taskqueue_state_s -{ - // TaskQueue_DistributeTasks cycles through the threads when assigning, each has its own queue - unsigned int enqueuethread; - int numthreads; - taskqueue_state_thread_t threads[MAXTHREADS]; - - // synchronization point for enqueue and some other memory access - Thread_SpinLock command_lock; - - // distributor queue (not assigned to threads yet, or waiting on other tasks) - unsigned int queue_enqueueposition; - unsigned int queue_dequeueposition; - unsigned int queue_size; - taskqueue_task_t **queue_data; - - // metrics to balance workload vs cpu resources - unsigned int tasks_recentframesindex; - unsigned int tasks_recentframes[RECENTFRAMES]; - unsigned int tasks_thisframe; - unsigned int tasks_averageperframe; -} -taskqueue_state_t; - -static taskqueue_state_t taskqueue_state; - -void TaskQueue_Init(void) -{ - Cvar_RegisterVariable(&taskqueue_minthreads); - Cvar_RegisterVariable(&taskqueue_maxthreads); - Cvar_RegisterVariable(&taskqueue_tasksperthread); -} - -void TaskQueue_Shutdown(void) -{ - if (taskqueue_state.numthreads) - TaskQueue_Frame(true); -} - -static void TaskQueue_ExecuteTask(taskqueue_task_t *t) -{ - // see if t is waiting on something - if (t->preceding && t->preceding->done == 0) - TaskQueue_Yield(t); - else - t->func(t); -} - -// FIXME: don't use mutex -// FIXME: this is basically fibers but less featureful - context switching for yield is not implemented -static int TaskQueue_ThreadFunc(void *d) -{ - taskqueue_state_thread_t *s = (taskqueue_state_thread_t *)d; - unsigned int sleepcounter = 0; - for (;;) - { - qboolean quit; - while (s->dequeueposition != s->enqueueposition) - { - taskqueue_task_t *t = s->queue[s->dequeueposition % THREADTASKS]; - TaskQueue_ExecuteTask(t); - // when we advance, also clear the pointer for good measure - s->queue[s->dequeueposition++ % THREADTASKS] = NULL; - sleepcounter = 0; - } - Thread_AtomicLock(&taskqueue_state.command_lock); - quit = s->quit != 0; - Thread_AtomicUnlock(&taskqueue_state.command_lock); - if (quit) - break; - sleepcounter++; - if (sleepcounter >= THREADSLEEPCOUNT) - Sys_Sleep(1000); - sleepcounter = 0; - } - return 0; -} - -void TaskQueue_Enqueue(int numtasks, taskqueue_task_t *tasks) -{ - int i; - Thread_AtomicLock(&taskqueue_state.command_lock); - if (taskqueue_state.queue_size < - (taskqueue_state.queue_enqueueposition < taskqueue_state.queue_dequeueposition ? taskqueue_state.queue_size : 0) + - taskqueue_state.queue_enqueueposition - taskqueue_state.queue_dequeueposition + numtasks) - { - // we have to grow the queue... - unsigned int newsize = (taskqueue_state.queue_size + numtasks) * 2; - if (newsize < 1024) - newsize = 1024; - taskqueue_state.queue_data = Mem_Realloc(zonemempool, taskqueue_state.queue_data, sizeof(*taskqueue_state.queue_data) * newsize); - taskqueue_state.queue_size = newsize; - } - for (i = 0; i < numtasks; i++) - { - if (tasks[i].yieldcount == 0) - taskqueue_state.tasks_thisframe++; - taskqueue_state.queue_data[taskqueue_state.queue_enqueueposition] = &tasks[i]; - taskqueue_state.queue_enqueueposition++; - if (taskqueue_state.queue_enqueueposition >= taskqueue_state.queue_size) - taskqueue_state.queue_enqueueposition = 0; - } - Thread_AtomicUnlock(&taskqueue_state.command_lock); -} - -// if the task can not be completed due yet to preconditions, just enqueue it again... -void TaskQueue_Yield(taskqueue_task_t *t) -{ - t->yieldcount++; - TaskQueue_Enqueue(1, t); -} - -qboolean TaskQueue_IsDone(taskqueue_task_t *t) -{ - return !t->done != 0; -} - -void TaskQueue_DistributeTasks(void) -{ - Thread_AtomicLock(&taskqueue_state.command_lock); - if (taskqueue_state.numthreads > 0) - { - unsigned int attempts = taskqueue_state.numthreads; - while (attempts-- > 0 && taskqueue_state.queue_enqueueposition != taskqueue_state.queue_dequeueposition) - { - taskqueue_task_t *t = taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition]; - if (t->preceding && t->preceding->done == 0) - { - // task is waiting on something - // first dequeue it properly - taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition] = NULL; - taskqueue_state.queue_dequeueposition++; - if (taskqueue_state.queue_dequeueposition >= taskqueue_state.queue_size) - taskqueue_state.queue_dequeueposition = 0; - // now put it back in the distributor queue - we know there is room because we just made room - taskqueue_state.queue_data[taskqueue_state.queue_enqueueposition] = t; - taskqueue_state.queue_enqueueposition++; - if (taskqueue_state.queue_enqueueposition >= taskqueue_state.queue_size) - taskqueue_state.queue_enqueueposition = 0; - // we do not refresh the attempt counter here to avoid deadlock - quite often the only things sitting in the distributor queue are waiting on other tasks - } - else - { - taskqueue_state_thread_t *s = &taskqueue_state.threads[taskqueue_state.enqueuethread]; - if (s->enqueueposition - s->dequeueposition < THREADTASKS) - { - // add the task to the thread's queue - s->queue[(s->enqueueposition++) % THREADTASKS] = t; - // since we succeeded in assigning the task, advance the distributor queue - taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition] = NULL; - taskqueue_state.queue_dequeueposition++; - if (taskqueue_state.queue_dequeueposition >= taskqueue_state.queue_size) - taskqueue_state.queue_dequeueposition = 0; - // refresh our attempt counter because we did manage to assign something to a thread - attempts = taskqueue_state.numthreads; - } - } - } - } - Thread_AtomicUnlock(&taskqueue_state.command_lock); - // execute one pending task on the distributor queue, this matters if numthreads is 0 - if (taskqueue_state.queue_dequeueposition != taskqueue_state.queue_enqueueposition) - { - taskqueue_task_t *t = taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition]; - taskqueue_state.queue_dequeueposition++; - if (taskqueue_state.queue_dequeueposition >= taskqueue_state.queue_size) - taskqueue_state.queue_dequeueposition = 0; - if (t) - TaskQueue_ExecuteTask(t); - } -} - -void TaskQueue_WaitForTaskDone(taskqueue_task_t *t) -{ - qboolean done = false; - for (;;) - { - Thread_AtomicLock(&taskqueue_state.command_lock); - done = t->done != 0; - Thread_AtomicUnlock(&taskqueue_state.command_lock); - if (done) - break; - TaskQueue_DistributeTasks(); - } -} - -void TaskQueue_Frame(qboolean shutdown) -{ - int i; - unsigned long long int avg; - int maxthreads = bound(0, taskqueue_maxthreads.integer, MAXTHREADS); - int numthreads = maxthreads; - int tasksperthread = bound(10, taskqueue_tasksperthread.integer, 100000); -#ifdef THREADDISABLE - numthreads = 0; -#endif - - Thread_AtomicLock(&taskqueue_state.command_lock); - taskqueue_state.tasks_recentframesindex = (taskqueue_state.tasks_recentframesindex + 1) % RECENTFRAMES; - taskqueue_state.tasks_recentframes[taskqueue_state.tasks_recentframesindex] = taskqueue_state.tasks_thisframe; - taskqueue_state.tasks_thisframe = 0; - avg = 0; - for (i = 0; i < RECENTFRAMES; i++) - avg += taskqueue_state.tasks_recentframes[i]; - taskqueue_state.tasks_averageperframe = avg / RECENTFRAMES; - Thread_AtomicUnlock(&taskqueue_state.command_lock); - - numthreads = taskqueue_state.tasks_averageperframe / tasksperthread; - numthreads = bound(taskqueue_minthreads.integer, numthreads, taskqueue_maxthreads.integer); - - if (shutdown) - numthreads = 0; - - // check if we need to close some threads - if (taskqueue_state.numthreads > numthreads) - { - // tell extra threads to quit - Thread_AtomicLock(&taskqueue_state.command_lock); - for (i = numthreads; i < taskqueue_state.numthreads; i++) - taskqueue_state.threads[i].quit = 1; - Thread_AtomicUnlock(&taskqueue_state.command_lock); - for (i = numthreads; i < taskqueue_state.numthreads; i++) - { - if (taskqueue_state.threads[i].handle) - Thread_WaitThread(taskqueue_state.threads[i].handle, 0); - taskqueue_state.threads[i].handle = NULL; - } - // okay we're at the new state now - taskqueue_state.numthreads = numthreads; - } - - // check if we need to start more threads - if (taskqueue_state.numthreads < numthreads) - { - // make sure we're not telling new threads to just quit on startup - Thread_AtomicLock(&taskqueue_state.command_lock); - for (i = taskqueue_state.numthreads; i < numthreads; i++) - taskqueue_state.threads[i].quit = 0; - Thread_AtomicUnlock(&taskqueue_state.command_lock); - - // start new threads - for (i = taskqueue_state.numthreads; i < numthreads; i++) - { - taskqueue_state.threads[i].thread_index = i; - taskqueue_state.threads[i].handle = Thread_CreateThread(TaskQueue_ThreadFunc, &taskqueue_state.threads[i]); - } - - // okay we're at the new state now - taskqueue_state.numthreads = numthreads; - } - - // just for good measure, distribute any pending tasks that span across frames - TaskQueue_DistributeTasks(); -} - -void TaskQueue_Setup(taskqueue_task_t *t, taskqueue_task_t *preceding, void(*func)(taskqueue_task_t *), size_t i0, size_t i1, void *p0, void *p1) -{ - memset(t, 0, sizeof(*t)); - t->preceding = preceding; - t->func = func; - t->i[0] = i0; - t->i[1] = i1; - t->p[0] = p0; - t->p[1] = p1; -} - -void TaskQueue_Task_CheckTasksDone(taskqueue_task_t *t) -{ - size_t numtasks = t->i[0]; - taskqueue_task_t *tasks = t->p[0]; - while (numtasks > 0) - { - // check the last task first as it's usually going to be the last to finish, so we do the least work by checking it first - if (!tasks[numtasks - 1].done) - { - // update our partial progress, then yield to another pending task. - t->i[0] = numtasks; - // set our preceding task to one of the ones we are watching for - t->preceding = &tasks[numtasks - 1]; - TaskQueue_Yield(t); - return; - } - numtasks--; - } - t->done = 1; -} +#include "quakedef.h" +#include "taskqueue.h" + +cvar_t taskqueue_minthreads = {CF_CLIENT | CF_SERVER | CF_ARCHIVE, "taskqueue_minthreads", "0", "minimum number of threads to keep active for executing tasks"}; +cvar_t taskqueue_maxthreads = {CF_CLIENT | CF_SERVER | CF_ARCHIVE, "taskqueue_maxthreads", "32", "maximum number of threads to start up as needed based on task count"}; +cvar_t taskqueue_tasksperthread = {CF_CLIENT | CF_SERVER | CF_ARCHIVE, "taskqueue_tasksperthread", "4000", "expected amount of work that a single thread can do in a frame - the number of threads being used depends on the average workload in recent frames"}; + +#define MAXTHREADS 1024 +#define RECENTFRAMES 64 // averaging thread activity over this many frames to decide how many threads we need +#define THREADTASKS 256 // thread can hold this many tasks in its own queue +#define THREADBATCH 64 // thread will run this many tasks before checking status again +#define THREADSLEEPCOUNT 1000 // thread will sleep for a little while if it checks this many times and has no work to do + +typedef struct taskqueue_state_thread_s +{ + void *handle; + unsigned int quit; + unsigned int thread_index; + unsigned int tasks_completed; + + unsigned int enqueueposition; + unsigned int dequeueposition; + taskqueue_task_t *queue[THREADTASKS]; +} +taskqueue_state_thread_t; + +typedef struct taskqueue_state_s +{ + // TaskQueue_DistributeTasks cycles through the threads when assigning, each has its own queue + unsigned int enqueuethread; + int numthreads; + taskqueue_state_thread_t threads[MAXTHREADS]; + + // synchronization point for enqueue and some other memory access + Thread_SpinLock command_lock; + + // distributor queue (not assigned to threads yet, or waiting on other tasks) + unsigned int queue_enqueueposition; + unsigned int queue_dequeueposition; + unsigned int queue_size; + taskqueue_task_t **queue_data; + + // metrics to balance workload vs cpu resources + unsigned int tasks_recentframesindex; + unsigned int tasks_recentframes[RECENTFRAMES]; + unsigned int tasks_thisframe; + unsigned int tasks_averageperframe; +} +taskqueue_state_t; + +static taskqueue_state_t taskqueue_state; + +void TaskQueue_Init(void) +{ + Cvar_RegisterVariable(&taskqueue_minthreads); + Cvar_RegisterVariable(&taskqueue_maxthreads); + Cvar_RegisterVariable(&taskqueue_tasksperthread); +} + +void TaskQueue_Shutdown(void) +{ + if (taskqueue_state.numthreads) + TaskQueue_Frame(true); +} + +static void TaskQueue_ExecuteTask(taskqueue_task_t *t) +{ + // see if t is waiting on something + if (t->preceding && t->preceding->done == 0) + TaskQueue_Yield(t); + else + t->func(t); +} + +// FIXME: don't use mutex +// FIXME: this is basically fibers but less featureful - context switching for yield is not implemented +static int TaskQueue_ThreadFunc(void *d) +{ + taskqueue_state_thread_t *s = (taskqueue_state_thread_t *)d; + unsigned int sleepcounter = 0; + for (;;) + { + qbool quit; + while (s->dequeueposition != s->enqueueposition) + { + taskqueue_task_t *t = s->queue[s->dequeueposition % THREADTASKS]; + TaskQueue_ExecuteTask(t); + // when we advance, also clear the pointer for good measure + s->queue[s->dequeueposition++ % THREADTASKS] = NULL; + sleepcounter = 0; + } + Thread_AtomicLock(&taskqueue_state.command_lock); + quit = s->quit != 0; + Thread_AtomicUnlock(&taskqueue_state.command_lock); + if (quit) + break; + sleepcounter++; + if (sleepcounter >= THREADSLEEPCOUNT) + Sys_Sleep(1000); + sleepcounter = 0; + } + return 0; +} + +void TaskQueue_Enqueue(int numtasks, taskqueue_task_t *tasks) +{ + int i; + Thread_AtomicLock(&taskqueue_state.command_lock); + if (taskqueue_state.queue_size < + (taskqueue_state.queue_enqueueposition < taskqueue_state.queue_dequeueposition ? taskqueue_state.queue_size : 0) + + taskqueue_state.queue_enqueueposition - taskqueue_state.queue_dequeueposition + numtasks) + { + // we have to grow the queue... + unsigned int newsize = (taskqueue_state.queue_size + numtasks) * 2; + if (newsize < 1024) + newsize = 1024; + taskqueue_state.queue_data = (taskqueue_task_t **)Mem_Realloc(zonemempool, taskqueue_state.queue_data, sizeof(*taskqueue_state.queue_data) * newsize); + taskqueue_state.queue_size = newsize; + } + for (i = 0; i < numtasks; i++) + { + if (tasks[i].yieldcount == 0) + taskqueue_state.tasks_thisframe++; + taskqueue_state.queue_data[taskqueue_state.queue_enqueueposition] = &tasks[i]; + taskqueue_state.queue_enqueueposition++; + if (taskqueue_state.queue_enqueueposition >= taskqueue_state.queue_size) + taskqueue_state.queue_enqueueposition = 0; + } + Thread_AtomicUnlock(&taskqueue_state.command_lock); +} + +// if the task can not be completed due yet to preconditions, just enqueue it again... +void TaskQueue_Yield(taskqueue_task_t *t) +{ + t->yieldcount++; + TaskQueue_Enqueue(1, t); +} + +qbool TaskQueue_IsDone(taskqueue_task_t *t) +{ + return !!t->done; +} + +static void TaskQueue_DistributeTasks(void) +{ + Thread_AtomicLock(&taskqueue_state.command_lock); + if (taskqueue_state.numthreads > 0) + { + unsigned int attempts = taskqueue_state.numthreads; + while (attempts-- > 0 && taskqueue_state.queue_enqueueposition != taskqueue_state.queue_dequeueposition) + { + taskqueue_task_t *t = taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition]; + if (t->preceding && t->preceding->done == 0) + { + // task is waiting on something + // first dequeue it properly + taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition] = NULL; + taskqueue_state.queue_dequeueposition++; + if (taskqueue_state.queue_dequeueposition >= taskqueue_state.queue_size) + taskqueue_state.queue_dequeueposition = 0; + // now put it back in the distributor queue - we know there is room because we just made room + taskqueue_state.queue_data[taskqueue_state.queue_enqueueposition] = t; + taskqueue_state.queue_enqueueposition++; + if (taskqueue_state.queue_enqueueposition >= taskqueue_state.queue_size) + taskqueue_state.queue_enqueueposition = 0; + // we do not refresh the attempt counter here to avoid deadlock - quite often the only things sitting in the distributor queue are waiting on other tasks + } + else + { + taskqueue_state_thread_t *s = &taskqueue_state.threads[taskqueue_state.enqueuethread]; + if (s->enqueueposition - s->dequeueposition < THREADTASKS) + { + // add the task to the thread's queue + s->queue[(s->enqueueposition++) % THREADTASKS] = t; + // since we succeeded in assigning the task, advance the distributor queue + taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition] = NULL; + taskqueue_state.queue_dequeueposition++; + if (taskqueue_state.queue_dequeueposition >= taskqueue_state.queue_size) + taskqueue_state.queue_dequeueposition = 0; + // refresh our attempt counter because we did manage to assign something to a thread + attempts = taskqueue_state.numthreads; + } + } + } + } + Thread_AtomicUnlock(&taskqueue_state.command_lock); + // execute one pending task on the distributor queue, this matters if numthreads is 0 + if (taskqueue_state.queue_dequeueposition != taskqueue_state.queue_enqueueposition) + { + taskqueue_task_t *t = taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition]; + taskqueue_state.queue_dequeueposition++; + if (taskqueue_state.queue_dequeueposition >= taskqueue_state.queue_size) + taskqueue_state.queue_dequeueposition = 0; + if (t) + TaskQueue_ExecuteTask(t); + } +} + +void TaskQueue_WaitForTaskDone(taskqueue_task_t *t) +{ + qbool done = false; + for (;;) + { + Thread_AtomicLock(&taskqueue_state.command_lock); + done = t->done != 0; + Thread_AtomicUnlock(&taskqueue_state.command_lock); + if (done) + break; + TaskQueue_DistributeTasks(); + } +} + +void TaskQueue_Frame(qbool shutdown) +{ + int i; + unsigned long long int avg; + int maxthreads = bound(0, taskqueue_maxthreads.integer, MAXTHREADS); + int numthreads = maxthreads; + int tasksperthread = bound(10, taskqueue_tasksperthread.integer, 100000); +#ifdef THREADDISABLE + numthreads = 0; +#endif + + Thread_AtomicLock(&taskqueue_state.command_lock); + taskqueue_state.tasks_recentframesindex = (taskqueue_state.tasks_recentframesindex + 1) % RECENTFRAMES; + taskqueue_state.tasks_recentframes[taskqueue_state.tasks_recentframesindex] = taskqueue_state.tasks_thisframe; + taskqueue_state.tasks_thisframe = 0; + avg = 0; + for (i = 0; i < RECENTFRAMES; i++) + avg += taskqueue_state.tasks_recentframes[i]; + taskqueue_state.tasks_averageperframe = avg / RECENTFRAMES; + Thread_AtomicUnlock(&taskqueue_state.command_lock); + + numthreads = taskqueue_state.tasks_averageperframe / tasksperthread; + numthreads = bound(taskqueue_minthreads.integer, numthreads, taskqueue_maxthreads.integer); + + if (shutdown) + numthreads = 0; + + // check if we need to close some threads + if (taskqueue_state.numthreads > numthreads) + { + // tell extra threads to quit + Thread_AtomicLock(&taskqueue_state.command_lock); + for (i = numthreads; i < taskqueue_state.numthreads; i++) + taskqueue_state.threads[i].quit = 1; + Thread_AtomicUnlock(&taskqueue_state.command_lock); + for (i = numthreads; i < taskqueue_state.numthreads; i++) + { + if (taskqueue_state.threads[i].handle) + Thread_WaitThread(taskqueue_state.threads[i].handle, 0); + taskqueue_state.threads[i].handle = NULL; + } + // okay we're at the new state now + taskqueue_state.numthreads = numthreads; + } + + // check if we need to start more threads + if (taskqueue_state.numthreads < numthreads) + { + // make sure we're not telling new threads to just quit on startup + Thread_AtomicLock(&taskqueue_state.command_lock); + for (i = taskqueue_state.numthreads; i < numthreads; i++) + taskqueue_state.threads[i].quit = 0; + Thread_AtomicUnlock(&taskqueue_state.command_lock); + + // start new threads + for (i = taskqueue_state.numthreads; i < numthreads; i++) + { + taskqueue_state.threads[i].thread_index = i; + taskqueue_state.threads[i].handle = Thread_CreateThread(TaskQueue_ThreadFunc, &taskqueue_state.threads[i]); + } + + // okay we're at the new state now + taskqueue_state.numthreads = numthreads; + } + + // just for good measure, distribute any pending tasks that span across frames + TaskQueue_DistributeTasks(); +} + +void TaskQueue_Setup(taskqueue_task_t *t, taskqueue_task_t *preceding, void(*func)(taskqueue_task_t *), size_t i0, size_t i1, void *p0, void *p1) +{ + memset(t, 0, sizeof(*t)); + t->preceding = preceding; + t->func = func; + t->i[0] = i0; + t->i[1] = i1; + t->p[0] = p0; + t->p[1] = p1; +} + +void TaskQueue_Task_CheckTasksDone(taskqueue_task_t *t) +{ + size_t numtasks = t->i[0]; + taskqueue_task_t *tasks = (taskqueue_task_t *)t->p[0]; + while (numtasks > 0) + { + // check the last task first as it's usually going to be the last to finish, so we do the least work by checking it first + if (!tasks[numtasks - 1].done) + { + // update our partial progress, then yield to another pending task. + t->i[0] = numtasks; + // set our preceding task to one of the ones we are watching for + t->preceding = &tasks[numtasks - 1]; + TaskQueue_Yield(t); + return; + } + numtasks--; + } + t->done = 1; +}