4 cvar_t taskqueue_minthreads = {CF_CLIENT | CF_SERVER | CF_ARCHIVE, "taskqueue_minthreads", "0", "minimum number of threads to keep active for executing tasks"};
5 cvar_t taskqueue_maxthreads = {CF_CLIENT | CF_SERVER | CF_ARCHIVE, "taskqueue_maxthreads", "32", "maximum number of threads to start up as needed based on task count"};
6 cvar_t taskqueue_tasksperthread = {CF_CLIENT | CF_SERVER | CF_ARCHIVE, "taskqueue_tasksperthread", "4000", "expected amount of work that a single thread can do in a frame - the number of threads being used depends on the average workload in recent frames"};
8 #define MAXTHREADS 1024
9 #define RECENTFRAMES 64 // averaging thread activity over this many frames to decide how many threads we need
10 #define THREADTASKS 256 // thread can hold this many tasks in its own queue
11 #define THREADBATCH 64 // thread will run this many tasks before checking status again
12 #define THREADSLEEPCOUNT 1000 // thread will sleep for a little while if it checks this many times and has no work to do
14 typedef struct taskqueue_state_thread_s
18 unsigned int thread_index;
19 unsigned int tasks_completed;
21 unsigned int enqueueposition;
22 unsigned int dequeueposition;
23 taskqueue_task_t *queue[THREADTASKS];
25 taskqueue_state_thread_t;
27 typedef struct taskqueue_state_s
29 // TaskQueue_DistributeTasks cycles through the threads when assigning, each has its own queue
30 unsigned int enqueuethread;
32 taskqueue_state_thread_t threads[MAXTHREADS];
34 // synchronization point for enqueue and some other memory access
35 Thread_SpinLock command_lock;
37 // distributor queue (not assigned to threads yet, or waiting on other tasks)
38 unsigned int queue_enqueueposition;
39 unsigned int queue_dequeueposition;
40 unsigned int queue_size;
41 taskqueue_task_t **queue_data;
43 // metrics to balance workload vs cpu resources
44 unsigned int tasks_recentframesindex;
45 unsigned int tasks_recentframes[RECENTFRAMES];
46 unsigned int tasks_thisframe;
47 unsigned int tasks_averageperframe;
51 static taskqueue_state_t taskqueue_state;
53 void TaskQueue_Init(void)
55 Cvar_RegisterVariable(&taskqueue_minthreads);
56 Cvar_RegisterVariable(&taskqueue_maxthreads);
57 Cvar_RegisterVariable(&taskqueue_tasksperthread);
60 void TaskQueue_Shutdown(void)
62 if (taskqueue_state.numthreads)
63 TaskQueue_Frame(true);
66 static void TaskQueue_ExecuteTask(taskqueue_task_t *t)
68 // see if t is waiting on something
69 if (t->preceding && t->preceding->done == 0)
75 // FIXME: don't use mutex
76 // FIXME: this is basically fibers but less featureful - context switching for yield is not implemented
77 static int TaskQueue_ThreadFunc(void *d)
79 taskqueue_state_thread_t *s = (taskqueue_state_thread_t *)d;
80 unsigned int sleepcounter = 0;
84 while (s->dequeueposition != s->enqueueposition)
86 taskqueue_task_t *t = s->queue[s->dequeueposition % THREADTASKS];
87 TaskQueue_ExecuteTask(t);
88 // when we advance, also clear the pointer for good measure
89 s->queue[s->dequeueposition++ % THREADTASKS] = NULL;
92 Thread_AtomicLock(&taskqueue_state.command_lock);
94 Thread_AtomicUnlock(&taskqueue_state.command_lock);
98 if (sleepcounter >= THREADSLEEPCOUNT)
105 void TaskQueue_Enqueue(int numtasks, taskqueue_task_t *tasks)
108 Thread_AtomicLock(&taskqueue_state.command_lock);
109 if (taskqueue_state.queue_size <
110 (taskqueue_state.queue_enqueueposition < taskqueue_state.queue_dequeueposition ? taskqueue_state.queue_size : 0) +
111 taskqueue_state.queue_enqueueposition - taskqueue_state.queue_dequeueposition + numtasks)
113 // we have to grow the queue...
114 unsigned int newsize = (taskqueue_state.queue_size + numtasks) * 2;
117 taskqueue_state.queue_data = (taskqueue_task_t **)Mem_Realloc(zonemempool, taskqueue_state.queue_data, sizeof(*taskqueue_state.queue_data) * newsize);
118 taskqueue_state.queue_size = newsize;
120 for (i = 0; i < numtasks; i++)
122 if (tasks[i].yieldcount == 0)
123 taskqueue_state.tasks_thisframe++;
124 taskqueue_state.queue_data[taskqueue_state.queue_enqueueposition] = &tasks[i];
125 taskqueue_state.queue_enqueueposition++;
126 if (taskqueue_state.queue_enqueueposition >= taskqueue_state.queue_size)
127 taskqueue_state.queue_enqueueposition = 0;
129 Thread_AtomicUnlock(&taskqueue_state.command_lock);
132 // if the task can not be completed due yet to preconditions, just enqueue it again...
133 void TaskQueue_Yield(taskqueue_task_t *t)
136 TaskQueue_Enqueue(1, t);
139 qbool TaskQueue_IsDone(taskqueue_task_t *t)
144 static void TaskQueue_DistributeTasks(void)
146 Thread_AtomicLock(&taskqueue_state.command_lock);
147 if (taskqueue_state.numthreads > 0)
149 unsigned int attempts = taskqueue_state.numthreads;
150 while (attempts-- > 0 && taskqueue_state.queue_enqueueposition != taskqueue_state.queue_dequeueposition)
152 taskqueue_task_t *t = taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition];
153 if (t->preceding && t->preceding->done == 0)
155 // task is waiting on something
156 // first dequeue it properly
157 taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition] = NULL;
158 taskqueue_state.queue_dequeueposition++;
159 if (taskqueue_state.queue_dequeueposition >= taskqueue_state.queue_size)
160 taskqueue_state.queue_dequeueposition = 0;
161 // now put it back in the distributor queue - we know there is room because we just made room
162 taskqueue_state.queue_data[taskqueue_state.queue_enqueueposition] = t;
163 taskqueue_state.queue_enqueueposition++;
164 if (taskqueue_state.queue_enqueueposition >= taskqueue_state.queue_size)
165 taskqueue_state.queue_enqueueposition = 0;
166 // we do not refresh the attempt counter here to avoid deadlock - quite often the only things sitting in the distributor queue are waiting on other tasks
170 taskqueue_state_thread_t *s = &taskqueue_state.threads[taskqueue_state.enqueuethread];
171 if (s->enqueueposition - s->dequeueposition < THREADTASKS)
173 // add the task to the thread's queue
174 s->queue[(s->enqueueposition++) % THREADTASKS] = t;
175 // since we succeeded in assigning the task, advance the distributor queue
176 taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition] = NULL;
177 taskqueue_state.queue_dequeueposition++;
178 if (taskqueue_state.queue_dequeueposition >= taskqueue_state.queue_size)
179 taskqueue_state.queue_dequeueposition = 0;
180 // refresh our attempt counter because we did manage to assign something to a thread
181 attempts = taskqueue_state.numthreads;
186 Thread_AtomicUnlock(&taskqueue_state.command_lock);
187 // execute one pending task on the distributor queue, this matters if numthreads is 0
188 if (taskqueue_state.queue_dequeueposition != taskqueue_state.queue_enqueueposition)
190 taskqueue_task_t *t = taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition];
191 taskqueue_state.queue_dequeueposition++;
192 if (taskqueue_state.queue_dequeueposition >= taskqueue_state.queue_size)
193 taskqueue_state.queue_dequeueposition = 0;
195 TaskQueue_ExecuteTask(t);
199 void TaskQueue_WaitForTaskDone(taskqueue_task_t *t)
204 Thread_AtomicLock(&taskqueue_state.command_lock);
206 Thread_AtomicUnlock(&taskqueue_state.command_lock);
209 TaskQueue_DistributeTasks();
213 void TaskQueue_Frame(qbool shutdown)
216 unsigned long long int avg;
217 int maxthreads = bound(0, taskqueue_maxthreads.integer, MAXTHREADS);
218 int numthreads = maxthreads;
219 int tasksperthread = bound(10, taskqueue_tasksperthread.integer, 100000);
224 Thread_AtomicLock(&taskqueue_state.command_lock);
225 taskqueue_state.tasks_recentframesindex = (taskqueue_state.tasks_recentframesindex + 1) % RECENTFRAMES;
226 taskqueue_state.tasks_recentframes[taskqueue_state.tasks_recentframesindex] = taskqueue_state.tasks_thisframe;
227 taskqueue_state.tasks_thisframe = 0;
229 for (i = 0; i < RECENTFRAMES; i++)
230 avg += taskqueue_state.tasks_recentframes[i];
231 taskqueue_state.tasks_averageperframe = avg / RECENTFRAMES;
232 Thread_AtomicUnlock(&taskqueue_state.command_lock);
234 numthreads = taskqueue_state.tasks_averageperframe / tasksperthread;
235 numthreads = bound(taskqueue_minthreads.integer, numthreads, taskqueue_maxthreads.integer);
240 // check if we need to close some threads
241 if (taskqueue_state.numthreads > numthreads)
243 // tell extra threads to quit
244 Thread_AtomicLock(&taskqueue_state.command_lock);
245 for (i = numthreads; i < taskqueue_state.numthreads; i++)
246 taskqueue_state.threads[i].quit = 1;
247 Thread_AtomicUnlock(&taskqueue_state.command_lock);
248 for (i = numthreads; i < taskqueue_state.numthreads; i++)
250 if (taskqueue_state.threads[i].handle)
251 Thread_WaitThread(taskqueue_state.threads[i].handle, 0);
252 taskqueue_state.threads[i].handle = NULL;
254 // okay we're at the new state now
255 taskqueue_state.numthreads = numthreads;
258 // check if we need to start more threads
259 if (taskqueue_state.numthreads < numthreads)
261 // make sure we're not telling new threads to just quit on startup
262 Thread_AtomicLock(&taskqueue_state.command_lock);
263 for (i = taskqueue_state.numthreads; i < numthreads; i++)
264 taskqueue_state.threads[i].quit = 0;
265 Thread_AtomicUnlock(&taskqueue_state.command_lock);
268 for (i = taskqueue_state.numthreads; i < numthreads; i++)
270 taskqueue_state.threads[i].thread_index = i;
271 taskqueue_state.threads[i].handle = Thread_CreateThread(TaskQueue_ThreadFunc, &taskqueue_state.threads[i]);
274 // okay we're at the new state now
275 taskqueue_state.numthreads = numthreads;
278 // just for good measure, distribute any pending tasks that span across frames
279 TaskQueue_DistributeTasks();
282 void TaskQueue_Setup(taskqueue_task_t *t, taskqueue_task_t *preceding, void(*func)(taskqueue_task_t *), size_t i0, size_t i1, void *p0, void *p1)
284 memset(t, 0, sizeof(*t));
285 t->preceding = preceding;
293 void TaskQueue_Task_CheckTasksDone(taskqueue_task_t *t)
295 size_t numtasks = t->i[0];
296 taskqueue_task_t *tasks = (taskqueue_task_t *)t->p[0];
299 // check the last task first as it's usually going to be the last to finish, so we do the least work by checking it first
300 if (!tasks[numtasks - 1].done)
302 // update our partial progress, then yield to another pending task.
304 // set our preceding task to one of the ones we are watching for
305 t->preceding = &tasks[numtasks - 1];