1 #include "quakedef.h"
\r
2 #include "taskqueue.h"
\r
4 cvar_t taskqueue_minthreads = {CVAR_CLIENT | CVAR_SERVER | CVAR_SAVE, "taskqueue_minthreads", "4", "minimum number of threads to keep active for executing tasks"};
\r
5 cvar_t taskqueue_maxthreads = {CVAR_CLIENT | CVAR_SERVER | CVAR_SAVE, "taskqueue_maxthreads", "32", "maximum number of threads to start up as needed based on task count"};
\r
6 cvar_t taskqueue_tasksperthread = {CVAR_CLIENT | CVAR_SERVER | CVAR_SAVE, "taskqueue_tasksperthread", "4000", "expected amount of work that a single thread can do in a frame - the number of threads being used depends on the average workload in recent frames"};
\r
8 #define MAXTHREADS 1024
\r
9 #define RECENTFRAMES 64 // averaging thread activity over this many frames to decide how many threads we need
\r
10 #define THREADTASKS 256 // thread can hold this many tasks in its own queue
\r
11 #define THREADBATCH 64 // thread will run this many tasks before checking status again
\r
12 #define THREADSLEEPCOUNT 1000 // thread will sleep for a little while if it checks this many times and has no work to do
\r
14 typedef struct taskqueue_state_thread_s
\r
18 unsigned int thread_index;
\r
19 unsigned int tasks_completed;
\r
21 unsigned int enqueueposition;
\r
22 unsigned int dequeueposition;
\r
23 taskqueue_task_t *queue[THREADTASKS];
\r
25 taskqueue_state_thread_t;
\r
27 typedef struct taskqueue_state_s
\r
29 // TaskQueue_DistributeTasks cycles through the threads when assigning, each has its own queue
\r
30 unsigned int enqueuethread;
\r
32 taskqueue_state_thread_t threads[MAXTHREADS];
\r
34 // synchronization point for enqueue and some other memory access
\r
35 Thread_SpinLock command_lock;
\r
37 // distributor queue (not assigned to threads yet, or waiting on other tasks)
\r
38 unsigned int queue_enqueueposition;
\r
39 unsigned int queue_dequeueposition;
\r
40 unsigned int queue_size;
\r
41 taskqueue_task_t **queue_data;
\r
43 // metrics to balance workload vs cpu resources
\r
44 unsigned int tasks_recentframesindex;
\r
45 unsigned int tasks_recentframes[RECENTFRAMES];
\r
46 unsigned int tasks_thisframe;
\r
47 unsigned int tasks_averageperframe;
\r
51 static taskqueue_state_t taskqueue_state;
\r
53 void TaskQueue_Init(void)
\r
55 Cvar_RegisterVariable(&taskqueue_minthreads);
\r
56 Cvar_RegisterVariable(&taskqueue_maxthreads);
\r
57 Cvar_RegisterVariable(&taskqueue_tasksperthread);
\r
60 void TaskQueue_Shutdown(void)
\r
62 if (taskqueue_state.numthreads)
\r
63 TaskQueue_Frame(true);
\r
66 static void TaskQueue_ExecuteTask(taskqueue_task_t *t)
\r
68 // see if t is waiting on something
\r
69 if (t->preceding && t->preceding->done == 0)
\r
75 // FIXME: don't use mutex
\r
76 // FIXME: this is basically fibers but less featureful - context switching for yield is not implemented
\r
77 static int TaskQueue_ThreadFunc(void *d)
\r
79 taskqueue_state_thread_t *s = (taskqueue_state_thread_t *)d;
\r
80 unsigned int sleepcounter = 0;
\r
84 while (s->dequeueposition != s->enqueueposition)
\r
86 taskqueue_task_t *t = s->queue[s->dequeueposition % THREADTASKS];
\r
87 TaskQueue_ExecuteTask(t);
\r
88 // when we advance, also clear the pointer for good measure
\r
89 s->queue[s->dequeueposition++ % THREADTASKS] = NULL;
\r
92 Thread_AtomicLock(&taskqueue_state.command_lock);
\r
93 quit = s->quit != 0;
\r
94 Thread_AtomicUnlock(&taskqueue_state.command_lock);
\r
98 if (sleepcounter >= THREADSLEEPCOUNT)
\r
105 void TaskQueue_Enqueue(int numtasks, taskqueue_task_t *tasks)
\r
108 Thread_AtomicLock(&taskqueue_state.command_lock);
\r
109 if (taskqueue_state.queue_size <
\r
110 (taskqueue_state.queue_enqueueposition < taskqueue_state.queue_dequeueposition ? taskqueue_state.queue_size : 0) +
\r
111 taskqueue_state.queue_enqueueposition - taskqueue_state.queue_dequeueposition + numtasks)
\r
113 // we have to grow the queue...
\r
114 unsigned int newsize = (taskqueue_state.queue_size + numtasks) * 2;
\r
115 if (newsize < 1024)
\r
117 taskqueue_state.queue_data = Mem_Realloc(zonemempool, taskqueue_state.queue_data, sizeof(*taskqueue_state.queue_data) * newsize);
\r
118 taskqueue_state.queue_size = newsize;
\r
120 for (i = 0; i < numtasks; i++)
\r
122 if (tasks[i].yieldcount == 0)
\r
123 taskqueue_state.tasks_thisframe++;
\r
124 taskqueue_state.queue_data[taskqueue_state.queue_enqueueposition] = &tasks[i];
\r
125 taskqueue_state.queue_enqueueposition++;
\r
126 if (taskqueue_state.queue_enqueueposition >= taskqueue_state.queue_size)
\r
127 taskqueue_state.queue_enqueueposition = 0;
\r
129 Thread_AtomicUnlock(&taskqueue_state.command_lock);
\r
132 // if the task can not be completed due yet to preconditions, just enqueue it again...
\r
133 void TaskQueue_Yield(taskqueue_task_t *t)
\r
136 TaskQueue_Enqueue(1, t);
\r
139 qboolean TaskQueue_IsDone(taskqueue_task_t *t)
\r
141 return !t->done != 0;
\r
144 void TaskQueue_DistributeTasks(void)
\r
146 Thread_AtomicLock(&taskqueue_state.command_lock);
\r
147 if (taskqueue_state.numthreads > 0)
\r
149 unsigned int attempts = taskqueue_state.numthreads;
\r
150 while (attempts-- > 0 && taskqueue_state.queue_enqueueposition != taskqueue_state.queue_dequeueposition)
\r
152 taskqueue_task_t *t = taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition];
\r
153 if (t->preceding && t->preceding->done == 0)
\r
155 // task is waiting on something
\r
156 // first dequeue it properly
\r
157 taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition] = NULL;
\r
158 taskqueue_state.queue_dequeueposition++;
\r
159 if (taskqueue_state.queue_dequeueposition >= taskqueue_state.queue_size)
\r
160 taskqueue_state.queue_dequeueposition = 0;
\r
161 // now put it back in the distributor queue - we know there is room because we just made room
\r
162 taskqueue_state.queue_data[taskqueue_state.queue_enqueueposition] = t;
\r
163 taskqueue_state.queue_enqueueposition++;
\r
164 if (taskqueue_state.queue_enqueueposition >= taskqueue_state.queue_size)
\r
165 taskqueue_state.queue_enqueueposition = 0;
\r
166 // we do not refresh the attempt counter here to avoid deadlock - quite often the only things sitting in the distributor queue are waiting on other tasks
\r
170 taskqueue_state_thread_t *s = &taskqueue_state.threads[taskqueue_state.enqueuethread];
\r
171 if (s->enqueueposition - s->dequeueposition < THREADTASKS)
\r
173 // add the task to the thread's queue
\r
174 s->queue[(s->enqueueposition++) % THREADTASKS] = t;
\r
175 // since we succeeded in assigning the task, advance the distributor queue
\r
176 taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition] = NULL;
\r
177 taskqueue_state.queue_dequeueposition++;
\r
178 if (taskqueue_state.queue_dequeueposition >= taskqueue_state.queue_size)
\r
179 taskqueue_state.queue_dequeueposition = 0;
\r
180 // refresh our attempt counter because we did manage to assign something to a thread
\r
181 attempts = taskqueue_state.numthreads;
\r
186 Thread_AtomicUnlock(&taskqueue_state.command_lock);
\r
187 // execute one pending task on the distributor queue, this matters if numthreads is 0
\r
188 if (taskqueue_state.queue_dequeueposition != taskqueue_state.queue_enqueueposition)
\r
190 taskqueue_task_t *t = taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition];
\r
191 taskqueue_state.queue_dequeueposition++;
\r
192 if (taskqueue_state.queue_dequeueposition >= taskqueue_state.queue_size)
\r
193 taskqueue_state.queue_dequeueposition = 0;
\r
195 TaskQueue_ExecuteTask(t);
\r
199 void TaskQueue_WaitForTaskDone(taskqueue_task_t *t)
\r
201 qboolean done = false;
\r
204 Thread_AtomicLock(&taskqueue_state.command_lock);
\r
205 done = t->done != 0;
\r
206 Thread_AtomicUnlock(&taskqueue_state.command_lock);
\r
209 TaskQueue_DistributeTasks();
\r
213 void TaskQueue_Frame(qboolean shutdown)
\r
216 unsigned long long int avg;
\r
217 int maxthreads = bound(0, taskqueue_maxthreads.integer, MAXTHREADS);
\r
218 int numthreads = maxthreads;
\r
219 int tasksperthread = bound(10, taskqueue_tasksperthread.integer, 100000);
\r
220 #ifdef THREADDISABLE
\r
224 Thread_AtomicLock(&taskqueue_state.command_lock);
\r
225 taskqueue_state.tasks_recentframesindex = (taskqueue_state.tasks_recentframesindex + 1) % RECENTFRAMES;
\r
226 taskqueue_state.tasks_recentframes[taskqueue_state.tasks_recentframesindex] = taskqueue_state.tasks_thisframe;
\r
227 taskqueue_state.tasks_thisframe = 0;
\r
229 for (i = 0; i < RECENTFRAMES; i++)
\r
230 avg += taskqueue_state.tasks_recentframes[i];
\r
231 taskqueue_state.tasks_averageperframe = avg / RECENTFRAMES;
\r
232 Thread_AtomicUnlock(&taskqueue_state.command_lock);
\r
234 numthreads = taskqueue_state.tasks_averageperframe / tasksperthread;
\r
235 numthreads = bound(taskqueue_minthreads.integer, numthreads, taskqueue_maxthreads.integer);
\r
240 // check if we need to close some threads
\r
241 if (taskqueue_state.numthreads > numthreads)
\r
243 // tell extra threads to quit
\r
244 Thread_AtomicLock(&taskqueue_state.command_lock);
\r
245 for (i = numthreads; i < taskqueue_state.numthreads; i++)
\r
246 taskqueue_state.threads[i].quit = 1;
\r
247 Thread_AtomicUnlock(&taskqueue_state.command_lock);
\r
248 for (i = numthreads; i < taskqueue_state.numthreads; i++)
\r
250 if (taskqueue_state.threads[i].handle)
\r
251 Thread_WaitThread(taskqueue_state.threads[i].handle, 0);
\r
252 taskqueue_state.threads[i].handle = NULL;
\r
254 // okay we're at the new state now
\r
255 taskqueue_state.numthreads = numthreads;
\r
258 // check if we need to start more threads
\r
259 if (taskqueue_state.numthreads < numthreads)
\r
261 // make sure we're not telling new threads to just quit on startup
\r
262 Thread_AtomicLock(&taskqueue_state.command_lock);
\r
263 for (i = taskqueue_state.numthreads; i < numthreads; i++)
\r
264 taskqueue_state.threads[i].quit = 0;
\r
265 Thread_AtomicUnlock(&taskqueue_state.command_lock);
\r
267 // start new threads
\r
268 for (i = taskqueue_state.numthreads; i < numthreads; i++)
\r
270 taskqueue_state.threads[i].thread_index = i;
\r
271 taskqueue_state.threads[i].handle = Thread_CreateThread(TaskQueue_ThreadFunc, &taskqueue_state.threads[i]);
\r
274 // okay we're at the new state now
\r
275 taskqueue_state.numthreads = numthreads;
\r
278 // just for good measure, distribute any pending tasks that span across frames
\r
279 TaskQueue_DistributeTasks();
\r
282 void TaskQueue_Setup(taskqueue_task_t *t, taskqueue_task_t *preceding, void(*func)(taskqueue_task_t *), size_t i0, size_t i1, void *p0, void *p1)
\r
284 memset(t, 0, sizeof(*t));
\r
285 t->preceding = preceding;
\r
293 void TaskQueue_Task_CheckTasksDone(taskqueue_task_t *t)
\r
295 size_t numtasks = t->i[0];
\r
296 taskqueue_task_t *tasks = t->p[0];
\r
297 while (numtasks > 0)
\r
299 // check the last task first as it's usually going to be the last to finish, so we do the least work by checking it first
\r
300 if (!tasks[numtasks - 1].done)
\r
302 // update our partial progress, then yield to another pending task.
\r
303 t->i[0] = numtasks;
\r
304 // set our preceding task to one of the ones we are watching for
\r
305 t->preceding = &tasks[numtasks - 1];
\r
306 TaskQueue_Yield(t);
\r