]> git.xonotic.org Git - xonotic/darkplaces.git/blob - taskqueue.c
Rename indent/dedent functions
[xonotic/darkplaces.git] / taskqueue.c
1 #include "quakedef.h"
2 #include "taskqueue.h"
3
4 cvar_t taskqueue_minthreads = {CF_CLIENT | CF_SERVER | CF_ARCHIVE, "taskqueue_minthreads", "0", "minimum number of threads to keep active for executing tasks"};
5 cvar_t taskqueue_maxthreads = {CF_CLIENT | CF_SERVER | CF_ARCHIVE, "taskqueue_maxthreads", "32", "maximum number of threads to start up as needed based on task count"};
6 cvar_t taskqueue_tasksperthread = {CF_CLIENT | CF_SERVER | CF_ARCHIVE, "taskqueue_tasksperthread", "4000", "expected amount of work that a single thread can do in a frame - the number of threads being used depends on the average workload in recent frames"};
7
8 #define MAXTHREADS 1024
9 #define RECENTFRAMES 64 // averaging thread activity over this many frames to decide how many threads we need
10 #define THREADTASKS 256 // thread can hold this many tasks in its own queue
11 #define THREADBATCH 64 // thread will run this many tasks before checking status again
12 #define THREADSLEEPCOUNT 1000 // thread will sleep for a little while if it checks this many times and has no work to do
13
14 typedef struct taskqueue_state_thread_s
15 {
16         void *handle;
17         unsigned int quit;
18         unsigned int thread_index;
19         unsigned int tasks_completed;
20
21         unsigned int enqueueposition;
22         unsigned int dequeueposition;
23         taskqueue_task_t *queue[THREADTASKS];
24 }
25 taskqueue_state_thread_t;
26
27 typedef struct taskqueue_state_s
28 {
29         // TaskQueue_DistributeTasks cycles through the threads when assigning, each has its own queue
30         unsigned int enqueuethread;
31         int numthreads;
32         taskqueue_state_thread_t threads[MAXTHREADS];
33
34         // synchronization point for enqueue and some other memory access
35         Thread_SpinLock command_lock;
36
37         // distributor queue (not assigned to threads yet, or waiting on other tasks)
38         unsigned int queue_enqueueposition;
39         unsigned int queue_dequeueposition;
40         unsigned int queue_size;
41         taskqueue_task_t **queue_data;
42
43         // metrics to balance workload vs cpu resources
44         unsigned int tasks_recentframesindex;
45         unsigned int tasks_recentframes[RECENTFRAMES];
46         unsigned int tasks_thisframe;
47         unsigned int tasks_averageperframe;
48 }
49 taskqueue_state_t;
50
51 static taskqueue_state_t taskqueue_state;
52
53 void TaskQueue_Init(void)
54 {
55         Cvar_RegisterVariable(&taskqueue_minthreads);
56         Cvar_RegisterVariable(&taskqueue_maxthreads);
57         Cvar_RegisterVariable(&taskqueue_tasksperthread);
58 }
59
60 void TaskQueue_Shutdown(void)
61 {
62         if (taskqueue_state.numthreads)
63                 TaskQueue_Frame(true);
64 }
65
66 static void TaskQueue_ExecuteTask(taskqueue_task_t *t)
67 {
68         // see if t is waiting on something
69         if (t->preceding && t->preceding->done == 0)
70                 TaskQueue_Yield(t);
71         else
72                 t->func(t);
73 }
74
75 // FIXME: don't use mutex
76 // FIXME: this is basically fibers but less featureful - context switching for yield is not implemented
77 static int TaskQueue_ThreadFunc(void *d)
78 {
79         taskqueue_state_thread_t *s = (taskqueue_state_thread_t *)d;
80         unsigned int sleepcounter = 0;
81         for (;;)
82         {
83                 qbool quit;
84                 while (s->dequeueposition != s->enqueueposition)
85                 {
86                         taskqueue_task_t *t = s->queue[s->dequeueposition % THREADTASKS];
87                         TaskQueue_ExecuteTask(t);
88                         // when we advance, also clear the pointer for good measure
89                         s->queue[s->dequeueposition++ % THREADTASKS] = NULL;
90                         sleepcounter = 0;
91                 }
92                 Thread_AtomicLock(&taskqueue_state.command_lock);
93                 quit = s->quit != 0;
94                 Thread_AtomicUnlock(&taskqueue_state.command_lock);
95                 if (quit)
96                         break;
97                 sleepcounter++;
98                 if (sleepcounter >= THREADSLEEPCOUNT)
99                         Sys_Sleep(1000);
100                 sleepcounter = 0;
101         }
102         return 0;
103 }
104
105 void TaskQueue_Enqueue(int numtasks, taskqueue_task_t *tasks)
106 {
107         int i;
108         Thread_AtomicLock(&taskqueue_state.command_lock);
109         if (taskqueue_state.queue_size <
110                 (taskqueue_state.queue_enqueueposition < taskqueue_state.queue_dequeueposition ? taskqueue_state.queue_size : 0) +
111                 taskqueue_state.queue_enqueueposition - taskqueue_state.queue_dequeueposition + numtasks)
112         {
113                 // we have to grow the queue...
114                 unsigned int newsize = (taskqueue_state.queue_size + numtasks) * 2;
115                 if (newsize < 1024)
116                         newsize = 1024;
117                 taskqueue_state.queue_data = (taskqueue_task_t **)Mem_Realloc(zonemempool, taskqueue_state.queue_data, sizeof(*taskqueue_state.queue_data) * newsize);
118                 taskqueue_state.queue_size = newsize;
119         }
120         for (i = 0; i < numtasks; i++)
121         {
122                 if (tasks[i].yieldcount == 0)
123                         taskqueue_state.tasks_thisframe++;
124                 taskqueue_state.queue_data[taskqueue_state.queue_enqueueposition] = &tasks[i];
125                 taskqueue_state.queue_enqueueposition++;
126                 if (taskqueue_state.queue_enqueueposition >= taskqueue_state.queue_size)
127                         taskqueue_state.queue_enqueueposition = 0;
128         }
129         Thread_AtomicUnlock(&taskqueue_state.command_lock);
130 }
131
132 // if the task can not be completed due yet to preconditions, just enqueue it again...
133 void TaskQueue_Yield(taskqueue_task_t *t)
134 {
135         t->yieldcount++;
136         TaskQueue_Enqueue(1, t);
137 }
138
139 qbool TaskQueue_IsDone(taskqueue_task_t *t)
140 {
141         return !!t->done;
142 }
143
144 static void TaskQueue_DistributeTasks(void)
145 {
146         Thread_AtomicLock(&taskqueue_state.command_lock);
147         if (taskqueue_state.numthreads > 0)
148         {
149                 unsigned int attempts = taskqueue_state.numthreads;
150                 while (attempts-- > 0 && taskqueue_state.queue_enqueueposition != taskqueue_state.queue_dequeueposition)
151                 {
152                         taskqueue_task_t *t = taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition];
153                         if (t->preceding && t->preceding->done == 0)
154                         {
155                                 // task is waiting on something
156                                 // first dequeue it properly
157                                 taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition] = NULL;
158                                 taskqueue_state.queue_dequeueposition++;
159                                 if (taskqueue_state.queue_dequeueposition >= taskqueue_state.queue_size)
160                                         taskqueue_state.queue_dequeueposition = 0;
161                                 // now put it back in the distributor queue - we know there is room because we just made room
162                                 taskqueue_state.queue_data[taskqueue_state.queue_enqueueposition] = t;
163                                 taskqueue_state.queue_enqueueposition++;
164                                 if (taskqueue_state.queue_enqueueposition >= taskqueue_state.queue_size)
165                                         taskqueue_state.queue_enqueueposition = 0;
166                                 // we do not refresh the attempt counter here to avoid deadlock - quite often the only things sitting in the distributor queue are waiting on other tasks
167                         }
168                         else
169                         {
170                                 taskqueue_state_thread_t *s = &taskqueue_state.threads[taskqueue_state.enqueuethread];
171                                 if (s->enqueueposition - s->dequeueposition < THREADTASKS)
172                                 {
173                                         // add the task to the thread's queue
174                                         s->queue[(s->enqueueposition++) % THREADTASKS] = t;
175                                         // since we succeeded in assigning the task, advance the distributor queue
176                                         taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition] = NULL;
177                                         taskqueue_state.queue_dequeueposition++;
178                                         if (taskqueue_state.queue_dequeueposition >= taskqueue_state.queue_size)
179                                                 taskqueue_state.queue_dequeueposition = 0;
180                                         // refresh our attempt counter because we did manage to assign something to a thread
181                                         attempts = taskqueue_state.numthreads;
182                                 }
183                         }
184                 }
185         }
186         Thread_AtomicUnlock(&taskqueue_state.command_lock);
187         // execute one pending task on the distributor queue, this matters if numthreads is 0
188         if (taskqueue_state.queue_dequeueposition != taskqueue_state.queue_enqueueposition)
189         {
190                 taskqueue_task_t *t = taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition];
191                 taskqueue_state.queue_dequeueposition++;
192                 if (taskqueue_state.queue_dequeueposition >= taskqueue_state.queue_size)
193                         taskqueue_state.queue_dequeueposition = 0;
194                 if (t)
195                         TaskQueue_ExecuteTask(t);
196         }
197 }
198
199 void TaskQueue_WaitForTaskDone(taskqueue_task_t *t)
200 {
201         qbool done = false;
202         for (;;)
203         {
204                 Thread_AtomicLock(&taskqueue_state.command_lock);
205                 done = t->done != 0;
206                 Thread_AtomicUnlock(&taskqueue_state.command_lock);
207                 if (done)
208                         break;
209                 TaskQueue_DistributeTasks();
210         }
211 }
212
213 void TaskQueue_Frame(qbool shutdown)
214 {
215         int i;
216         unsigned long long int avg;
217         int maxthreads = bound(0, taskqueue_maxthreads.integer, MAXTHREADS);
218         int numthreads = maxthreads;
219         int tasksperthread = bound(10, taskqueue_tasksperthread.integer, 100000);
220 #ifdef THREADDISABLE
221         numthreads = 0;
222 #endif
223
224         Thread_AtomicLock(&taskqueue_state.command_lock);
225         taskqueue_state.tasks_recentframesindex = (taskqueue_state.tasks_recentframesindex + 1) % RECENTFRAMES;
226         taskqueue_state.tasks_recentframes[taskqueue_state.tasks_recentframesindex] = taskqueue_state.tasks_thisframe;
227         taskqueue_state.tasks_thisframe = 0;
228         avg = 0;
229         for (i = 0; i < RECENTFRAMES; i++)
230                 avg += taskqueue_state.tasks_recentframes[i];
231         taskqueue_state.tasks_averageperframe = avg / RECENTFRAMES;
232         Thread_AtomicUnlock(&taskqueue_state.command_lock);
233
234         numthreads = taskqueue_state.tasks_averageperframe / tasksperthread;
235         numthreads = bound(taskqueue_minthreads.integer, numthreads, taskqueue_maxthreads.integer);
236
237         if (shutdown)
238                 numthreads = 0;
239
240         // check if we need to close some threads
241         if (taskqueue_state.numthreads > numthreads)
242         {
243                 // tell extra threads to quit
244                 Thread_AtomicLock(&taskqueue_state.command_lock);
245                 for (i = numthreads; i < taskqueue_state.numthreads; i++)
246                         taskqueue_state.threads[i].quit = 1;
247                 Thread_AtomicUnlock(&taskqueue_state.command_lock);
248                 for (i = numthreads; i < taskqueue_state.numthreads; i++)
249                 {
250                         if (taskqueue_state.threads[i].handle)
251                                 Thread_WaitThread(taskqueue_state.threads[i].handle, 0);
252                         taskqueue_state.threads[i].handle = NULL;
253                 }
254                 // okay we're at the new state now
255                 taskqueue_state.numthreads = numthreads;
256         }
257
258         // check if we need to start more threads
259         if (taskqueue_state.numthreads < numthreads)
260         {
261                 // make sure we're not telling new threads to just quit on startup
262                 Thread_AtomicLock(&taskqueue_state.command_lock);
263                 for (i = taskqueue_state.numthreads; i < numthreads; i++)
264                         taskqueue_state.threads[i].quit = 0;
265                 Thread_AtomicUnlock(&taskqueue_state.command_lock);
266
267                 // start new threads
268                 for (i = taskqueue_state.numthreads; i < numthreads; i++)
269                 {
270                         taskqueue_state.threads[i].thread_index = i;
271                         taskqueue_state.threads[i].handle = Thread_CreateThread(TaskQueue_ThreadFunc, &taskqueue_state.threads[i]);
272                 }
273
274                 // okay we're at the new state now
275                 taskqueue_state.numthreads = numthreads;
276         }
277
278         // just for good measure, distribute any pending tasks that span across frames
279         TaskQueue_DistributeTasks();
280 }
281
282 void TaskQueue_Setup(taskqueue_task_t *t, taskqueue_task_t *preceding, void(*func)(taskqueue_task_t *), size_t i0, size_t i1, void *p0, void *p1)
283 {
284         memset(t, 0, sizeof(*t));
285         t->preceding = preceding;
286         t->func = func;
287         t->i[0] = i0;
288         t->i[1] = i1;
289         t->p[0] = p0;
290         t->p[1] = p1;
291 }
292
293 void TaskQueue_Task_CheckTasksDone(taskqueue_task_t *t)
294 {
295         size_t numtasks = t->i[0];
296         taskqueue_task_t *tasks = (taskqueue_task_t *)t->p[0];
297         while (numtasks > 0)
298         {
299                 // check the last task first as it's usually going to be the last to finish, so we do the least work by checking it first
300                 if (!tasks[numtasks - 1].done)
301                 {
302                         // update our partial progress, then yield to another pending task.
303                         t->i[0] = numtasks;
304                         // set our preceding task to one of the ones we are watching for
305                         t->preceding = &tasks[numtasks - 1];
306                         TaskQueue_Yield(t);
307                         return;
308                 }
309                 numtasks--;
310         }
311         t->done = 1;
312 }