]> git.xonotic.org Git - xonotic/darkplaces.git/commitdiff
Added taskqueue.[ch].
authorhavoc <havoc@d7cf8633-e32d-0410-b094-e92efae38249>
Sat, 18 Jan 2020 05:14:06 +0000 (05:14 +0000)
committerhavoc <havoc@d7cf8633-e32d-0410-b094-e92efae38249>
Sat, 18 Jan 2020 05:14:06 +0000 (05:14 +0000)
git-svn-id: svn://svn.icculus.org/twilight/trunk/darkplaces@12496 d7cf8633-e32d-0410-b094-e92efae38249

taskqueue.c [new file with mode: 0644]
taskqueue.h [new file with mode: 0644]

diff --git a/taskqueue.c b/taskqueue.c
new file mode 100644 (file)
index 0000000..9ef40a6
--- /dev/null
@@ -0,0 +1,246 @@
+#include "quakedef.h"\r
+#include "taskqueue.h"\r
+\r
+cvar_t taskqueue_maxthreads = { CVAR_SAVE, "taskqueue_maxthreads", "32", "how many threads to use for executing tasks" };\r
+cvar_t taskqueue_linkedlist = { CVAR_SAVE, "taskqueue_linkedlist", "1", "whether to use a doubly linked list or an array for the FIFO queue" };\r
+\r
+typedef struct taskqueue_state_thread_s\r
+{\r
+       void *handle;\r
+}\r
+taskqueue_state_thread_t;\r
+\r
+typedef struct taskqueue_state_s\r
+{\r
+       int numthreads;\r
+       taskqueue_state_thread_t threads[1024];\r
+\r
+       // we can enqueue this many tasks before execution of them must proceed\r
+       int queue_used;\r
+       int queue_max; // size of queue array\r
+       taskqueue_task_t **queue_tasks;\r
+\r
+       // command \r
+       Thread_SpinLock command_lock;\r
+\r
+       int threads_quit;\r
+\r
+       // doubly linked list - enqueue pushes to list.prev, dequeue pops from list.next\r
+       taskqueue_task_t list;\r
+}\r
+taskqueue_state_t;\r
+\r
+static taskqueue_state_t taskqueue_state;\r
+\r
+int TaskQueue_Init(void)\r
+{\r
+       Cvar_RegisterVariable(&taskqueue_maxthreads);\r
+       Cvar_RegisterVariable(&taskqueue_linkedlist);\r
+       // initialize the doubly-linked list header\r
+       taskqueue_state.list.next = &taskqueue_state.list;\r
+       taskqueue_state.list.prev = &taskqueue_state.list;\r
+       return 0;\r
+}\r
+\r
+void TaskQueue_Shutdown(void)\r
+{\r
+       if (taskqueue_state.numthreads)\r
+               TaskQueue_Frame(true);\r
+       if (taskqueue_state.queue_tasks)\r
+               Mem_Free(taskqueue_state.queue_tasks);\r
+       taskqueue_state.queue_tasks = NULL;\r
+}\r
+\r
+static taskqueue_task_t *TaskQueue_GetPending(void)\r
+{\r
+       taskqueue_task_t *t = NULL;\r
+       if (taskqueue_state.list.next != &taskqueue_state.list)\r
+       {\r
+               // pop from list.next\r
+               t = taskqueue_state.list.next;\r
+               t->next->prev = t->prev;\r
+               t->prev->next = t->next;\r
+               t->prev = t->next = NULL;\r
+       }\r
+       if (t == NULL)\r
+       {\r
+               if (taskqueue_state.queue_used > 0)\r
+               {\r
+                       t = taskqueue_state.queue_tasks[0];\r
+                       taskqueue_state.queue_used--;\r
+                       memmove(taskqueue_state.queue_tasks, taskqueue_state.queue_tasks + 1, taskqueue_state.queue_used * sizeof(taskqueue_task_t *));\r
+                       taskqueue_state.queue_tasks[taskqueue_state.queue_used] = NULL;\r
+               }\r
+       }\r
+       return t;\r
+}\r
+\r
+static void TaskQueue_ExecuteTask(taskqueue_task_t *t)\r
+{\r
+       // see if t is waiting on something\r
+       if (t->preceding && t->preceding->done == 0)\r
+               TaskQueue_Yield(t);\r
+       else\r
+               t->func(t);\r
+}\r
+\r
+// FIXME: don't use mutex\r
+// FIXME: this is basically fibers but less featureful - context switching for yield is not implemented\r
+static int TaskQueue_ThreadFunc(void *d)\r
+{\r
+       for (;;)\r
+       {\r
+               qboolean quit;\r
+               taskqueue_task_t *t = NULL;\r
+               Thread_AtomicLock(&taskqueue_state.command_lock);\r
+               quit = taskqueue_state.threads_quit != 0;\r
+               t = TaskQueue_GetPending();\r
+               Thread_AtomicUnlock(&taskqueue_state.command_lock);\r
+               if (t)\r
+                       TaskQueue_ExecuteTask(t);\r
+               else if (quit)\r
+                       break;\r
+       }\r
+       return 0;\r
+}\r
+\r
+void TaskQueue_Execute(qboolean force)\r
+{\r
+       // if we have no threads to run the tasks, just start executing them now\r
+       if (taskqueue_state.numthreads == 0 || force)\r
+       {\r
+               for (;;)\r
+               {\r
+                       taskqueue_task_t *t = NULL;\r
+                       Thread_AtomicLock(&taskqueue_state.command_lock);\r
+                       t = TaskQueue_GetPending();\r
+                       Thread_AtomicUnlock(&taskqueue_state.command_lock);\r
+                       if (!t)\r
+                               break;\r
+                       TaskQueue_ExecuteTask(t);\r
+               }\r
+       }\r
+}\r
+\r
+void TaskQueue_Enqueue(int numtasks, taskqueue_task_t *tasks)\r
+{\r
+       int i;\r
+       // try not to spinlock for a long time by breaking up large enqueues\r
+       while (numtasks > 64)\r
+       {\r
+               TaskQueue_Enqueue(64, tasks);\r
+               tasks += 64;\r
+               numtasks -= 64;\r
+       }\r
+       Thread_AtomicLock(&taskqueue_state.command_lock);\r
+       for (i = 0; i < numtasks; i++)\r
+       {\r
+               taskqueue_task_t *t = &tasks[i];\r
+               if (taskqueue_linkedlist.integer)\r
+               {\r
+                       // push to list.prev\r
+                       t->next = &taskqueue_state.list;\r
+                       t->prev = taskqueue_state.list.prev;\r
+                       t->next->prev = t;\r
+                       t->prev->next = t;\r
+               }\r
+               else\r
+               {\r
+                       if (taskqueue_state.queue_used >= taskqueue_state.queue_max)\r
+                       {\r
+                               taskqueue_state.queue_max *= 2;\r
+                               if (taskqueue_state.queue_max < 1024)\r
+                                       taskqueue_state.queue_max = 1024;\r
+                               taskqueue_state.queue_tasks = (taskqueue_task_t **)Mem_Realloc(cls.permanentmempool, taskqueue_state.queue_tasks, taskqueue_state.queue_max * sizeof(taskqueue_task_t *));\r
+                       }\r
+                       taskqueue_state.queue_tasks[taskqueue_state.queue_used++] = t;\r
+               }\r
+       }\r
+       Thread_AtomicUnlock(&taskqueue_state.command_lock);\r
+}\r
+\r
+// if the task can not be completed due yet to preconditions, just enqueue it again...\r
+void TaskQueue_Yield(taskqueue_task_t *t)\r
+{\r
+       t->yieldcount++;\r
+       TaskQueue_Enqueue(1, t);\r
+}\r
+\r
+qboolean TaskQueue_IsDone(taskqueue_task_t *t)\r
+{\r
+       return !t->done != 0;\r
+}\r
+\r
+void TaskQueue_WaitForTaskDone(taskqueue_task_t *t)\r
+{\r
+       qboolean done = false;\r
+       while (!done)\r
+       {\r
+               Thread_AtomicLock(&taskqueue_state.command_lock);\r
+               done = t->done != 0;\r
+               Thread_AtomicUnlock(&taskqueue_state.command_lock);\r
+               // if there are no threads, just execute the tasks immediately\r
+               if (!done && taskqueue_state.numthreads == 0)\r
+                       TaskQueue_Execute(true);\r
+       }\r
+}\r
+\r
+void TaskQueue_Frame(qboolean shutdown)\r
+{\r
+       int numthreads = shutdown ? 0 : bound(0, taskqueue_maxthreads.integer, sizeof(taskqueue_state.threads) / sizeof(taskqueue_state.threads[0]));\r
+#ifdef THREADDISABLE\r
+       numthreads = 0;\r
+#endif\r
+       if (taskqueue_state.numthreads != numthreads)\r
+       {\r
+               int i;\r
+               Thread_AtomicLock(&taskqueue_state.command_lock);\r
+               taskqueue_state.threads_quit = 1;\r
+               Thread_AtomicUnlock(&taskqueue_state.command_lock);\r
+               for (i = 0; i < taskqueue_state.numthreads; i++)\r
+               {\r
+                       if (taskqueue_state.threads[i].handle)\r
+                               Thread_WaitThread(taskqueue_state.threads[i].handle, 0);\r
+                       taskqueue_state.threads[i].handle = NULL;\r
+               }\r
+               Thread_AtomicLock(&taskqueue_state.command_lock);\r
+               taskqueue_state.threads_quit = 0;\r
+               Thread_AtomicUnlock(&taskqueue_state.command_lock);\r
+               taskqueue_state.numthreads = numthreads;\r
+               for (i = 0; i < taskqueue_state.numthreads; i++)\r
+                       taskqueue_state.threads[i].handle = Thread_CreateThread(TaskQueue_ThreadFunc, &taskqueue_state.threads[i]);\r
+               // if there are still pending tasks (e.g. no threads), execute them on main thread now\r
+               TaskQueue_Execute(true);\r
+       }\r
+}\r
+\r
+void TaskQueue_Setup(taskqueue_task_t *t, taskqueue_task_t *preceding, void(*func)(taskqueue_task_t *), size_t i0, size_t i1, void *p0, void *p1)\r
+{\r
+       memset(t, 0, sizeof(*t));\r
+       t->preceding = preceding;\r
+       t->func = func;\r
+       t->i[0] = i0;\r
+       t->i[1] = i1;\r
+       t->p[0] = p0;\r
+       t->p[1] = p1;\r
+}\r
+\r
+void TaskQueue_Task_CheckTasksDone(taskqueue_task_t *t)\r
+{\r
+       size_t numtasks = t->i[0];\r
+       taskqueue_task_t *tasks = t->p[0];\r
+       while (numtasks > 0)\r
+       {\r
+               // check the last task first as it's usually going to be the last to finish, so we do the least work by checking it first\r
+               if (!tasks[numtasks - 1].done)\r
+               {\r
+                       // update our partial progress, then yield to another pending task.\r
+                       t->i[0] = numtasks;\r
+                       TaskQueue_Yield(t);\r
+                       return;\r
+               }\r
+               numtasks--;\r
+       }\r
+       t->started = 1;\r
+       t->done = 1;\r
+}\r
diff --git a/taskqueue.h b/taskqueue.h
new file mode 100644 (file)
index 0000000..e233032
--- /dev/null
@@ -0,0 +1,58 @@
+\r
+#ifndef TASKQUEUE_H\r
+#define TASKQUEUE_H\r
+\r
+#include "qtypes.h"\r
+#include "thread.h"\r
+\r
+typedef struct taskqueue_task_s\r
+{\r
+       // doubly linked list\r
+       struct taskqueue_task_s * volatile prev;\r
+       struct taskqueue_task_s * volatile next;\r
+\r
+       // if not NULL, this task must be done before this one will dequeue (faster than simply Yielding immediately)\r
+       struct taskqueue_task_s *preceding;\r
+\r
+       // see TaskQueue_IsDone() to use proper atomics to poll done status\r
+       volatile int started;\r
+       volatile int done;\r
+\r
+       // function to call, and parameters for it to use\r
+       void(*func)(struct taskqueue_task_s *task);\r
+       void *p[4];\r
+       size_t i[4];\r
+\r
+       // stats:\r
+       unsigned int yieldcount; // number of times this task has been requeued\r
+}\r
+taskqueue_task_t;\r
+\r
+// immediately execute any pending tasks if threading is disabled (or if force is true)\r
+// TRY NOT TO USE THIS IF POSSIBLE - poll task->done instead.\r
+void TaskQueue_Execute(qboolean force);\r
+\r
+// queue the tasks to be executed, or executes them immediately if threading is disabled.\r
+void TaskQueue_Enqueue(int numtasks, taskqueue_task_t *tasks);\r
+\r
+// if the task can not be completed due yet to preconditions, just enqueue it again...\r
+void TaskQueue_Yield(taskqueue_task_t *t);\r
+\r
+// polls for status of task and returns the result immediately - use this instead of checking ->done directly, as this uses atomics\r
+qboolean TaskQueue_IsDone(taskqueue_task_t *t);\r
+\r
+// polls for status of task and waits for it to be done\r
+void TaskQueue_WaitForTaskDone(taskqueue_task_t *t);\r
+\r
+// updates thread count based on the cvar.\r
+void TaskQueue_Frame(qboolean shutdown);\r
+\r
+// convenience function for setting up a task structure.  Does not do the Enqueue, just fills in the struct.\r
+void TaskQueue_Setup(taskqueue_task_t *t, taskqueue_task_t *preceding, void(*func)(taskqueue_task_t *), size_t i0, size_t i1, void *p0, void *p1);\r
+\r
+// general purpose tasks\r
+// t->i[0] = number of tasks in array\r
+// t->p[0] = array of taskqueue_task_t to check\r
+void TaskQueue_Task_CheckTasksDone(taskqueue_task_t *t);\r
+\r
+#endif\r