2 #include <SDL_thread.h>
6 cvar_t taskqueue_maxthreads = {CVAR_SAVE, "taskqueue_maxthreads", "32", "how many threads to use for executing tasks"};
7 cvar_t taskqueue_linkedlist = {CVAR_SAVE, "taskqueue_linkedlist", "1", "whether to use a doubly linked list or an array for the FIFO queue"};
9 typedef struct taskqueue_state_thread_s
13 taskqueue_state_thread_t;
15 typedef struct taskqueue_state_s
18 taskqueue_state_thread_t threads[1024];
20 // we can enqueue this many tasks before execution of them must proceed
22 int queue_max; // size of queue array
23 taskqueue_task_t **queue_tasks;
26 Thread_SpinLock command_lock;
28 volatile uint64_t threads_quit;
30 // doubly linked list - enqueue pushes to list.prev, dequeue pops from list.next
31 taskqueue_task_t list;
35 static taskqueue_state_t taskqueue_state;
39 Cvar_RegisterVariable(&taskqueue_maxthreads);
40 Cvar_RegisterVariable(&taskqueue_linkedlist);
42 Con_Printf("Threading disabled in this build\n");
44 // initialize the doubly-linked list header
45 taskqueue_state.list.next = &taskqueue_state.list;
46 taskqueue_state.list.prev = &taskqueue_state.list;
50 void Thread_Shutdown(void)
52 if (taskqueue_state.numthreads)
53 TaskQueue_Frame(true);
54 if (taskqueue_state.queue_tasks)
55 Mem_Free(taskqueue_state.queue_tasks);
56 taskqueue_state.queue_tasks = NULL;
59 qboolean Thread_HasThreads(void)
68 void *_Thread_CreateMutex(const char *filename, int fileline)
70 void *mutex = SDL_CreateMutex();
72 Sys_PrintfToTerminal("%p mutex create %s:%i\n" , mutex, filename, fileline);
77 void _Thread_DestroyMutex(void *mutex, const char *filename, int fileline)
80 Sys_PrintfToTerminal("%p mutex destroy %s:%i\n", mutex, filename, fileline);
82 SDL_DestroyMutex((SDL_mutex *)mutex);
85 int _Thread_LockMutex(void *mutex, const char *filename, int fileline)
88 Sys_PrintfToTerminal("%p mutex lock %s:%i\n" , mutex, filename, fileline);
90 return SDL_LockMutex((SDL_mutex *)mutex);
93 int _Thread_UnlockMutex(void *mutex, const char *filename, int fileline)
96 Sys_PrintfToTerminal("%p mutex unlock %s:%i\n" , mutex, filename, fileline);
98 return SDL_UnlockMutex((SDL_mutex *)mutex);
101 void *_Thread_CreateCond(const char *filename, int fileline)
103 void *cond = (void *)SDL_CreateCond();
105 Sys_PrintfToTerminal("%p cond create %s:%i\n" , cond, filename, fileline);
110 void _Thread_DestroyCond(void *cond, const char *filename, int fileline)
113 Sys_PrintfToTerminal("%p cond destroy %s:%i\n" , cond, filename, fileline);
115 SDL_DestroyCond((SDL_cond *)cond);
118 int _Thread_CondSignal(void *cond, const char *filename, int fileline)
121 Sys_PrintfToTerminal("%p cond signal %s:%i\n" , cond, filename, fileline);
123 return SDL_CondSignal((SDL_cond *)cond);
126 int _Thread_CondBroadcast(void *cond, const char *filename, int fileline)
129 Sys_PrintfToTerminal("%p cond broadcast %s:%i\n" , cond, filename, fileline);
131 return SDL_CondBroadcast((SDL_cond *)cond);
134 int _Thread_CondWait(void *cond, void *mutex, const char *filename, int fileline)
137 Sys_PrintfToTerminal("%p cond wait %s:%i\n" , cond, filename, fileline);
139 return SDL_CondWait((SDL_cond *)cond, (SDL_mutex *)mutex);
142 void *_Thread_CreateThread(int (*fn)(void *), void *data, const char *filename, int fileline)
144 void *thread = (void *)SDL_CreateThread(fn, filename, data);
146 Sys_PrintfToTerminal("%p thread create %s:%i\n" , thread, filename, fileline);
151 int _Thread_WaitThread(void *thread, int retval, const char *filename, int fileline)
155 Sys_PrintfToTerminal("%p thread wait %s:%i\n" , thread, filename, fileline);
157 SDL_WaitThread((SDL_Thread *)thread, &status);
161 // standard barrier implementation using conds and mutexes
162 // see: http://www.howforge.com/implementing-barrier-in-pthreads
170 void *_Thread_CreateBarrier(unsigned int count, const char *filename, int fileline)
172 volatile barrier_t *b = (volatile barrier_t *) Z_Malloc(sizeof(barrier_t));
174 Sys_PrintfToTerminal("%p barrier create(%d) %s:%i\n", b, count, filename, fileline);
178 b->mutex = Thread_CreateMutex();
179 b->cond = Thread_CreateCond();
183 void _Thread_DestroyBarrier(void *barrier, const char *filename, int fileline)
185 volatile barrier_t *b = (volatile barrier_t *) barrier;
187 Sys_PrintfToTerminal("%p barrier destroy %s:%i\n", b, filename, fileline);
189 Thread_DestroyMutex(b->mutex);
190 Thread_DestroyCond(b->cond);
193 void _Thread_WaitBarrier(void *barrier, const char *filename, int fileline)
195 volatile barrier_t *b = (volatile barrier_t *) barrier;
197 Sys_PrintfToTerminal("%p barrier wait %s:%i\n", b, filename, fileline);
199 Thread_LockMutex(b->mutex);
201 if (b->called == b->needed) {
203 Thread_CondBroadcast(b->cond);
206 Thread_CondWait(b->cond, b->mutex);
209 Thread_UnlockMutex(b->mutex);
212 int _Thread_AtomicGet(Thread_Atomic *a, const char *filename, int fileline)
215 Sys_PrintfToTerminal("%p atomic get at %s:%i\n", a, v, filename, fileline);
217 return SDL_AtomicGet((SDL_atomic_t *)a);
220 int _Thread_AtomicSet(Thread_Atomic *a, int v, const char *filename, int fileline)
223 Sys_PrintfToTerminal("%p atomic set %v at %s:%i\n", a, v, filename, fileline);
225 return SDL_AtomicSet((SDL_atomic_t *)a, v);
228 int _Thread_AtomicAdd(Thread_Atomic *a, int v, const char *filename, int fileline)
231 Sys_PrintfToTerminal("%p atomic add %v at %s:%i\n", a, v, filename, fileline);
233 return SDL_AtomicAdd((SDL_atomic_t *)a, v);
236 void _Thread_AtomicIncRef(Thread_Atomic *a, const char *filename, int fileline)
239 Sys_PrintfToTerminal("%p atomic incref %s:%i\n", lock, filename, fileline);
241 SDL_AtomicIncRef((SDL_atomic_t *)a);
244 qboolean _Thread_AtomicDecRef(Thread_Atomic *a, const char *filename, int fileline)
247 Sys_PrintfToTerminal("%p atomic decref %s:%i\n", lock, filename, fileline);
249 return SDL_AtomicDecRef((SDL_atomic_t *)a) != SDL_FALSE;
252 qboolean _Thread_AtomicTryLock(Thread_SpinLock *lock, const char *filename, int fileline)
255 Sys_PrintfToTerminal("%p atomic try lock %s:%i\n", lock, filename, fileline);
257 return SDL_AtomicTryLock(lock) != SDL_FALSE;
260 void _Thread_AtomicLock(Thread_SpinLock *lock, const char *filename, int fileline)
263 Sys_PrintfToTerminal("%p atomic lock %s:%i\n", lock, filename, fileline);
265 SDL_AtomicLock(lock);
268 void _Thread_AtomicUnlock(Thread_SpinLock *lock, const char *filename, int fileline)
271 Sys_PrintfToTerminal("%p atomic unlock %s:%i\n", lock, filename, fileline);
273 SDL_AtomicUnlock(lock);
276 static taskqueue_task_t *TaskQueue_GetPending(void)
278 taskqueue_task_t *t = NULL;
279 if (taskqueue_state.list.next != &taskqueue_state.list)
281 // pop from list.next
282 t = taskqueue_state.list.next;
283 t->next->prev = t->prev;
284 t->prev->next = t->next;
285 t->prev = t->next = NULL;
289 if (taskqueue_state.queue_used > 0)
291 t = taskqueue_state.queue_tasks[0];
292 taskqueue_state.queue_used--;
293 memmove(taskqueue_state.queue_tasks, taskqueue_state.queue_tasks + 1, taskqueue_state.queue_used * sizeof(taskqueue_task_t *));
294 taskqueue_state.queue_tasks[taskqueue_state.queue_used] = NULL;
300 static void TaskQueue_ExecuteTask(taskqueue_task_t *t)
302 // see if t is waiting on something
303 if (t->preceding && t->preceding->done == 0)
309 // FIXME: don't use mutex
310 // FIXME: this is basically fibers but less featureful - context switching for yield is not implemented
311 static int TaskQueue_ThreadFunc(void *d)
316 taskqueue_task_t *t = NULL;
317 Thread_AtomicLock(&taskqueue_state.command_lock);
318 quit = taskqueue_state.threads_quit != 0;
319 t = TaskQueue_GetPending();
320 Thread_AtomicUnlock(&taskqueue_state.command_lock);
322 TaskQueue_ExecuteTask(t);
329 void TaskQueue_Execute(qboolean force)
331 // if we have no threads to run the tasks, just start executing them now
332 if (taskqueue_state.numthreads == 0 || force)
336 taskqueue_task_t *t = NULL;
337 Thread_AtomicLock(&taskqueue_state.command_lock);
338 t = TaskQueue_GetPending();
339 Thread_AtomicUnlock(&taskqueue_state.command_lock);
342 TaskQueue_ExecuteTask(t);
347 void TaskQueue_Enqueue(int numtasks, taskqueue_task_t *tasks)
350 // try not to spinlock for a long time by breaking up large enqueues
351 while (numtasks > 64)
353 TaskQueue_Enqueue(64, tasks);
357 Thread_AtomicLock(&taskqueue_state.command_lock);
358 for (i = 0; i < numtasks; i++)
360 taskqueue_task_t *t = &tasks[i];
361 if (taskqueue_linkedlist.integer)
364 t->next = &taskqueue_state.list;
365 t->prev = taskqueue_state.list.prev;
371 if (taskqueue_state.queue_used >= taskqueue_state.queue_max)
373 taskqueue_state.queue_max *= 2;
374 if (taskqueue_state.queue_max < 1024)
375 taskqueue_state.queue_max = 1024;
376 taskqueue_state.queue_tasks = (taskqueue_task_t **)Mem_Realloc(cls.permanentmempool, taskqueue_state.queue_tasks, taskqueue_state.queue_max * sizeof(taskqueue_task_t *));
378 taskqueue_state.queue_tasks[taskqueue_state.queue_used++] = t;
381 Thread_AtomicUnlock(&taskqueue_state.command_lock);
384 // if the task can not be completed due yet to preconditions, just enqueue it again...
385 void TaskQueue_Yield(taskqueue_task_t *t)
388 TaskQueue_Enqueue(1, t);
391 void TaskQueue_WaitForTaskDone(taskqueue_task_t *t)
393 qboolean done = false;
396 Thread_AtomicLock(&taskqueue_state.command_lock);
398 Thread_AtomicUnlock(&taskqueue_state.command_lock);
399 // if there are no threads, just execute the tasks immediately
400 if (!done && taskqueue_state.numthreads == 0)
401 TaskQueue_Execute(true);
405 void TaskQueue_Frame(qboolean shutdown)
407 int numthreads = shutdown ? 0 : bound(0, taskqueue_maxthreads.integer, sizeof(taskqueue_state.threads)/sizeof(taskqueue_state.threads[0]));
411 if (taskqueue_state.numthreads != numthreads)
414 Thread_AtomicLock(&taskqueue_state.command_lock);
415 taskqueue_state.threads_quit = 1;
416 Thread_AtomicUnlock(&taskqueue_state.command_lock);
417 for (i = 0; i < taskqueue_state.numthreads; i++)
419 if (taskqueue_state.threads[i].handle)
420 Thread_WaitThread(taskqueue_state.threads[i].handle, 0);
421 taskqueue_state.threads[i].handle = NULL;
423 Thread_AtomicLock(&taskqueue_state.command_lock);
424 taskqueue_state.threads_quit = 0;
425 Thread_AtomicUnlock(&taskqueue_state.command_lock);
426 taskqueue_state.numthreads = numthreads;
427 for (i = 0; i < taskqueue_state.numthreads; i++)
428 taskqueue_state.threads[i].handle = Thread_CreateThread(TaskQueue_ThreadFunc, &taskqueue_state.threads[i]);
429 // if there are still pending tasks (e.g. no threads), execute them on main thread now
430 TaskQueue_Execute(true);
434 void TaskQueue_Setup(taskqueue_task_t *t, taskqueue_task_t *preceding, void(*func)(taskqueue_task_t *), size_t i0, size_t i1, void *p0, void *p1)
436 memset(t, 0, sizeof(*t));
437 t->preceding = preceding;
445 void TaskQueue_Task_CheckTasksDone(taskqueue_task_t *t)
447 size_t numtasks = t->i[0];
448 taskqueue_task_t *tasks = t->p[0];
451 // check the last task first as it's usually going to be the last to finish, so we do the least work by checking it first
452 if (!tasks[numtasks - 1].done)
454 // update our partial progress, then yield to another pending task.