]> git.xonotic.org Git - xonotic/darkplaces.git/blob - taskqueue.c
Added taskqueue.[ch].
[xonotic/darkplaces.git] / taskqueue.c
1 #include "quakedef.h"\r
2 #include "taskqueue.h"\r
3 \r
4 cvar_t taskqueue_maxthreads = { CVAR_SAVE, "taskqueue_maxthreads", "32", "how many threads to use for executing tasks" };\r
5 cvar_t taskqueue_linkedlist = { CVAR_SAVE, "taskqueue_linkedlist", "1", "whether to use a doubly linked list or an array for the FIFO queue" };\r
6 \r
7 typedef struct taskqueue_state_thread_s\r
8 {\r
9         void *handle;\r
10 }\r
11 taskqueue_state_thread_t;\r
12 \r
13 typedef struct taskqueue_state_s\r
14 {\r
15         int numthreads;\r
16         taskqueue_state_thread_t threads[1024];\r
17 \r
18         // we can enqueue this many tasks before execution of them must proceed\r
19         int queue_used;\r
20         int queue_max; // size of queue array\r
21         taskqueue_task_t **queue_tasks;\r
22 \r
23         // command \r
24         Thread_SpinLock command_lock;\r
25 \r
26         int threads_quit;\r
27 \r
28         // doubly linked list - enqueue pushes to list.prev, dequeue pops from list.next\r
29         taskqueue_task_t list;\r
30 }\r
31 taskqueue_state_t;\r
32 \r
33 static taskqueue_state_t taskqueue_state;\r
34 \r
35 int TaskQueue_Init(void)\r
36 {\r
37         Cvar_RegisterVariable(&taskqueue_maxthreads);\r
38         Cvar_RegisterVariable(&taskqueue_linkedlist);\r
39         // initialize the doubly-linked list header\r
40         taskqueue_state.list.next = &taskqueue_state.list;\r
41         taskqueue_state.list.prev = &taskqueue_state.list;\r
42         return 0;\r
43 }\r
44 \r
45 void TaskQueue_Shutdown(void)\r
46 {\r
47         if (taskqueue_state.numthreads)\r
48                 TaskQueue_Frame(true);\r
49         if (taskqueue_state.queue_tasks)\r
50                 Mem_Free(taskqueue_state.queue_tasks);\r
51         taskqueue_state.queue_tasks = NULL;\r
52 }\r
53 \r
54 static taskqueue_task_t *TaskQueue_GetPending(void)\r
55 {\r
56         taskqueue_task_t *t = NULL;\r
57         if (taskqueue_state.list.next != &taskqueue_state.list)\r
58         {\r
59                 // pop from list.next\r
60                 t = taskqueue_state.list.next;\r
61                 t->next->prev = t->prev;\r
62                 t->prev->next = t->next;\r
63                 t->prev = t->next = NULL;\r
64         }\r
65         if (t == NULL)\r
66         {\r
67                 if (taskqueue_state.queue_used > 0)\r
68                 {\r
69                         t = taskqueue_state.queue_tasks[0];\r
70                         taskqueue_state.queue_used--;\r
71                         memmove(taskqueue_state.queue_tasks, taskqueue_state.queue_tasks + 1, taskqueue_state.queue_used * sizeof(taskqueue_task_t *));\r
72                         taskqueue_state.queue_tasks[taskqueue_state.queue_used] = NULL;\r
73                 }\r
74         }\r
75         return t;\r
76 }\r
77 \r
78 static void TaskQueue_ExecuteTask(taskqueue_task_t *t)\r
79 {\r
80         // see if t is waiting on something\r
81         if (t->preceding && t->preceding->done == 0)\r
82                 TaskQueue_Yield(t);\r
83         else\r
84                 t->func(t);\r
85 }\r
86 \r
87 // FIXME: don't use mutex\r
88 // FIXME: this is basically fibers but less featureful - context switching for yield is not implemented\r
89 static int TaskQueue_ThreadFunc(void *d)\r
90 {\r
91         for (;;)\r
92         {\r
93                 qboolean quit;\r
94                 taskqueue_task_t *t = NULL;\r
95                 Thread_AtomicLock(&taskqueue_state.command_lock);\r
96                 quit = taskqueue_state.threads_quit != 0;\r
97                 t = TaskQueue_GetPending();\r
98                 Thread_AtomicUnlock(&taskqueue_state.command_lock);\r
99                 if (t)\r
100                         TaskQueue_ExecuteTask(t);\r
101                 else if (quit)\r
102                         break;\r
103         }\r
104         return 0;\r
105 }\r
106 \r
107 void TaskQueue_Execute(qboolean force)\r
108 {\r
109         // if we have no threads to run the tasks, just start executing them now\r
110         if (taskqueue_state.numthreads == 0 || force)\r
111         {\r
112                 for (;;)\r
113                 {\r
114                         taskqueue_task_t *t = NULL;\r
115                         Thread_AtomicLock(&taskqueue_state.command_lock);\r
116                         t = TaskQueue_GetPending();\r
117                         Thread_AtomicUnlock(&taskqueue_state.command_lock);\r
118                         if (!t)\r
119                                 break;\r
120                         TaskQueue_ExecuteTask(t);\r
121                 }\r
122         }\r
123 }\r
124 \r
125 void TaskQueue_Enqueue(int numtasks, taskqueue_task_t *tasks)\r
126 {\r
127         int i;\r
128         // try not to spinlock for a long time by breaking up large enqueues\r
129         while (numtasks > 64)\r
130         {\r
131                 TaskQueue_Enqueue(64, tasks);\r
132                 tasks += 64;\r
133                 numtasks -= 64;\r
134         }\r
135         Thread_AtomicLock(&taskqueue_state.command_lock);\r
136         for (i = 0; i < numtasks; i++)\r
137         {\r
138                 taskqueue_task_t *t = &tasks[i];\r
139                 if (taskqueue_linkedlist.integer)\r
140                 {\r
141                         // push to list.prev\r
142                         t->next = &taskqueue_state.list;\r
143                         t->prev = taskqueue_state.list.prev;\r
144                         t->next->prev = t;\r
145                         t->prev->next = t;\r
146                 }\r
147                 else\r
148                 {\r
149                         if (taskqueue_state.queue_used >= taskqueue_state.queue_max)\r
150                         {\r
151                                 taskqueue_state.queue_max *= 2;\r
152                                 if (taskqueue_state.queue_max < 1024)\r
153                                         taskqueue_state.queue_max = 1024;\r
154                                 taskqueue_state.queue_tasks = (taskqueue_task_t **)Mem_Realloc(cls.permanentmempool, taskqueue_state.queue_tasks, taskqueue_state.queue_max * sizeof(taskqueue_task_t *));\r
155                         }\r
156                         taskqueue_state.queue_tasks[taskqueue_state.queue_used++] = t;\r
157                 }\r
158         }\r
159         Thread_AtomicUnlock(&taskqueue_state.command_lock);\r
160 }\r
161 \r
162 // if the task can not be completed due yet to preconditions, just enqueue it again...\r
163 void TaskQueue_Yield(taskqueue_task_t *t)\r
164 {\r
165         t->yieldcount++;\r
166         TaskQueue_Enqueue(1, t);\r
167 }\r
168 \r
169 qboolean TaskQueue_IsDone(taskqueue_task_t *t)\r
170 {\r
171         return !t->done != 0;\r
172 }\r
173 \r
174 void TaskQueue_WaitForTaskDone(taskqueue_task_t *t)\r
175 {\r
176         qboolean done = false;\r
177         while (!done)\r
178         {\r
179                 Thread_AtomicLock(&taskqueue_state.command_lock);\r
180                 done = t->done != 0;\r
181                 Thread_AtomicUnlock(&taskqueue_state.command_lock);\r
182                 // if there are no threads, just execute the tasks immediately\r
183                 if (!done && taskqueue_state.numthreads == 0)\r
184                         TaskQueue_Execute(true);\r
185         }\r
186 }\r
187 \r
188 void TaskQueue_Frame(qboolean shutdown)\r
189 {\r
190         int numthreads = shutdown ? 0 : bound(0, taskqueue_maxthreads.integer, sizeof(taskqueue_state.threads) / sizeof(taskqueue_state.threads[0]));\r
191 #ifdef THREADDISABLE\r
192         numthreads = 0;\r
193 #endif\r
194         if (taskqueue_state.numthreads != numthreads)\r
195         {\r
196                 int i;\r
197                 Thread_AtomicLock(&taskqueue_state.command_lock);\r
198                 taskqueue_state.threads_quit = 1;\r
199                 Thread_AtomicUnlock(&taskqueue_state.command_lock);\r
200                 for (i = 0; i < taskqueue_state.numthreads; i++)\r
201                 {\r
202                         if (taskqueue_state.threads[i].handle)\r
203                                 Thread_WaitThread(taskqueue_state.threads[i].handle, 0);\r
204                         taskqueue_state.threads[i].handle = NULL;\r
205                 }\r
206                 Thread_AtomicLock(&taskqueue_state.command_lock);\r
207                 taskqueue_state.threads_quit = 0;\r
208                 Thread_AtomicUnlock(&taskqueue_state.command_lock);\r
209                 taskqueue_state.numthreads = numthreads;\r
210                 for (i = 0; i < taskqueue_state.numthreads; i++)\r
211                         taskqueue_state.threads[i].handle = Thread_CreateThread(TaskQueue_ThreadFunc, &taskqueue_state.threads[i]);\r
212                 // if there are still pending tasks (e.g. no threads), execute them on main thread now\r
213                 TaskQueue_Execute(true);\r
214         }\r
215 }\r
216 \r
217 void TaskQueue_Setup(taskqueue_task_t *t, taskqueue_task_t *preceding, void(*func)(taskqueue_task_t *), size_t i0, size_t i1, void *p0, void *p1)\r
218 {\r
219         memset(t, 0, sizeof(*t));\r
220         t->preceding = preceding;\r
221         t->func = func;\r
222         t->i[0] = i0;\r
223         t->i[1] = i1;\r
224         t->p[0] = p0;\r
225         t->p[1] = p1;\r
226 }\r
227 \r
228 void TaskQueue_Task_CheckTasksDone(taskqueue_task_t *t)\r
229 {\r
230         size_t numtasks = t->i[0];\r
231         taskqueue_task_t *tasks = t->p[0];\r
232         while (numtasks > 0)\r
233         {\r
234                 // check the last task first as it's usually going to be the last to finish, so we do the least work by checking it first\r
235                 if (!tasks[numtasks - 1].done)\r
236                 {\r
237                         // update our partial progress, then yield to another pending task.\r
238                         t->i[0] = numtasks;\r
239                         TaskQueue_Yield(t);\r
240                         return;\r
241                 }\r
242                 numtasks--;\r
243         }\r
244         t->started = 1;\r
245         t->done = 1;\r
246 }\r