#include "quakedef.h"\r
#include "taskqueue.h"\r
\r
-cvar_t taskqueue_maxthreads = {CVAR_CLIENT | CVAR_SERVER | CVAR_SAVE, "taskqueue_maxthreads", "4", "how many threads to use for executing tasks"};\r
+cvar_t taskqueue_minthreads = {CVAR_CLIENT | CVAR_SERVER | CVAR_SAVE, "taskqueue_minthreads", "4", "minimum number of threads to keep active for executing tasks"};\r
+cvar_t taskqueue_maxthreads = {CVAR_CLIENT | CVAR_SERVER | CVAR_SAVE, "taskqueue_maxthreads", "32", "maximum number of threads to start up as needed based on task count"};\r
+cvar_t taskqueue_tasksperthread = {CVAR_CLIENT | CVAR_SERVER | CVAR_SAVE, "taskqueue_tasksperthread", "4000", "expected amount of work that a single thread can do in a frame - the number of threads being used depends on the average workload in recent frames"};\r
+\r
+#define MAXTHREADS 1024\r
+#define RECENTFRAMES 64 // averaging thread activity over this many frames to decide how many threads we need\r
+#define THREADTASKS 256 // thread can hold this many tasks in its own queue\r
+#define THREADBATCH 64 // thread will run this many tasks before checking status again\r
+#define THREADSLEEPCOUNT 1000 // thread will sleep for a little while if it checks this many times and has no work to do\r
\r
typedef struct taskqueue_state_thread_s\r
{\r
void *handle;\r
+ unsigned int quit;\r
+ unsigned int thread_index;\r
+ unsigned int tasks_completed;\r
+\r
+ unsigned int enqueueposition;\r
+ unsigned int dequeueposition;\r
+ taskqueue_task_t *queue[THREADTASKS];\r
}\r
taskqueue_state_thread_t;\r
\r
typedef struct taskqueue_state_s\r
{\r
+ // TaskQueue_DistributeTasks cycles through the threads when assigning, each has its own queue\r
+ unsigned int enqueuethread;\r
int numthreads;\r
- taskqueue_state_thread_t threads[1024];\r
+ taskqueue_state_thread_t threads[MAXTHREADS];\r
\r
- // command \r
+ // synchronization point for enqueue and some other memory access\r
Thread_SpinLock command_lock;\r
\r
- int threads_quit;\r
+ // distributor queue (not assigned to threads yet, or waiting on other tasks)\r
+ unsigned int queue_enqueueposition;\r
+ unsigned int queue_dequeueposition;\r
+ unsigned int queue_size;\r
+ taskqueue_task_t **queue_data;\r
\r
- // doubly linked list - enqueue pushes to list.prev, dequeue pops from list.next\r
- taskqueue_task_t list;\r
+ // metrics to balance workload vs cpu resources\r
+ unsigned int tasks_recentframesindex;\r
+ unsigned int tasks_recentframes[RECENTFRAMES];\r
+ unsigned int tasks_thisframe;\r
+ unsigned int tasks_averageperframe;\r
}\r
taskqueue_state_t;\r
\r
\r
void TaskQueue_Init(void)\r
{\r
+ Cvar_RegisterVariable(&taskqueue_minthreads);\r
Cvar_RegisterVariable(&taskqueue_maxthreads);\r
- // initialize the doubly-linked list header\r
- taskqueue_state.list.next = &taskqueue_state.list;\r
- taskqueue_state.list.prev = &taskqueue_state.list;\r
+ Cvar_RegisterVariable(&taskqueue_tasksperthread);\r
}\r
\r
void TaskQueue_Shutdown(void)\r
TaskQueue_Frame(true);\r
}\r
\r
-static taskqueue_task_t *TaskQueue_GetPending(void)\r
-{\r
- taskqueue_task_t *t = NULL;\r
- if (taskqueue_state.list.next != &taskqueue_state.list)\r
- {\r
- // pop from list.next\r
- t = taskqueue_state.list.next;\r
- t->next->prev = t->prev;\r
- t->prev->next = t->next;\r
- t->prev = t->next = NULL;\r
- }\r
- return t;\r
-}\r
-\r
static void TaskQueue_ExecuteTask(taskqueue_task_t *t)\r
{\r
// see if t is waiting on something\r
// FIXME: this is basically fibers but less featureful - context switching for yield is not implemented\r
static int TaskQueue_ThreadFunc(void *d)\r
{\r
+ taskqueue_state_thread_t *s = (taskqueue_state_thread_t *)d;\r
+ unsigned int sleepcounter = 0;\r
for (;;)\r
{\r
qboolean quit;\r
- taskqueue_task_t *t = NULL;\r
+ while (s->dequeueposition != s->enqueueposition)\r
+ {\r
+ taskqueue_task_t *t = s->queue[s->dequeueposition % THREADTASKS];\r
+ TaskQueue_ExecuteTask(t);\r
+ // when we advance, also clear the pointer for good measure\r
+ s->queue[s->dequeueposition++ % THREADTASKS] = NULL;\r
+ sleepcounter = 0;\r
+ }\r
Thread_AtomicLock(&taskqueue_state.command_lock);\r
- quit = taskqueue_state.threads_quit != 0;\r
- t = TaskQueue_GetPending();\r
+ quit = s->quit != 0;\r
Thread_AtomicUnlock(&taskqueue_state.command_lock);\r
- if (t)\r
- TaskQueue_ExecuteTask(t);\r
- else if (quit)\r
+ if (quit)\r
break;\r
+ sleepcounter++;\r
+ if (sleepcounter >= THREADSLEEPCOUNT)\r
+ Sys_Sleep(1000);\r
+ sleepcounter = 0;\r
}\r
return 0;\r
}\r
\r
-void TaskQueue_Execute(qboolean force)\r
-{\r
- // if we have no threads to run the tasks, just start executing them now\r
- if (taskqueue_state.numthreads == 0 || force)\r
- {\r
- for (;;)\r
- {\r
- taskqueue_task_t *t = NULL;\r
- Thread_AtomicLock(&taskqueue_state.command_lock);\r
- t = TaskQueue_GetPending();\r
- Thread_AtomicUnlock(&taskqueue_state.command_lock);\r
- if (!t)\r
- break;\r
- TaskQueue_ExecuteTask(t);\r
- }\r
- }\r
-}\r
-\r
void TaskQueue_Enqueue(int numtasks, taskqueue_task_t *tasks)\r
{\r
int i;\r
- // try not to spinlock for a long time by breaking up large enqueues\r
- while (numtasks > 64)\r
+ Thread_AtomicLock(&taskqueue_state.command_lock);\r
+ if (taskqueue_state.queue_size <\r
+ (taskqueue_state.queue_enqueueposition < taskqueue_state.queue_dequeueposition ? taskqueue_state.queue_size : 0) +\r
+ taskqueue_state.queue_enqueueposition - taskqueue_state.queue_dequeueposition + numtasks)\r
{\r
- TaskQueue_Enqueue(64, tasks);\r
- tasks += 64;\r
- numtasks -= 64;\r
+ // we have to grow the queue...\r
+ unsigned int newsize = (taskqueue_state.queue_size + numtasks) * 2;\r
+ if (newsize < 1024)\r
+ newsize = 1024;\r
+ taskqueue_state.queue_data = Mem_Realloc(zonemempool, taskqueue_state.queue_data, sizeof(*taskqueue_state.queue_data) * newsize);\r
+ taskqueue_state.queue_size = newsize;\r
}\r
- Thread_AtomicLock(&taskqueue_state.command_lock);\r
for (i = 0; i < numtasks; i++)\r
{\r
- taskqueue_task_t *t = &tasks[i];\r
- // push to list.prev\r
- t->next = &taskqueue_state.list;\r
- t->prev = taskqueue_state.list.prev;\r
- t->next->prev = t;\r
- t->prev->next = t;\r
+ if (tasks[i].yieldcount == 0)\r
+ taskqueue_state.tasks_thisframe++;\r
+ taskqueue_state.queue_data[taskqueue_state.queue_enqueueposition] = &tasks[i];\r
+ taskqueue_state.queue_enqueueposition++;\r
+ if (taskqueue_state.queue_enqueueposition >= taskqueue_state.queue_size)\r
+ taskqueue_state.queue_enqueueposition = 0;\r
}\r
Thread_AtomicUnlock(&taskqueue_state.command_lock);\r
}\r
return !t->done != 0;\r
}\r
\r
+void TaskQueue_DistributeTasks(void)\r
+{\r
+ Thread_AtomicLock(&taskqueue_state.command_lock);\r
+ if (taskqueue_state.numthreads > 0)\r
+ {\r
+ unsigned int attempts = taskqueue_state.numthreads;\r
+ while (attempts-- > 0 && taskqueue_state.queue_enqueueposition != taskqueue_state.queue_dequeueposition)\r
+ {\r
+ taskqueue_task_t *t = taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition];\r
+ if (t->preceding && t->preceding->done == 0)\r
+ {\r
+ // task is waiting on something\r
+ // first dequeue it properly\r
+ taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition] = NULL;\r
+ taskqueue_state.queue_dequeueposition++;\r
+ if (taskqueue_state.queue_dequeueposition >= taskqueue_state.queue_size)\r
+ taskqueue_state.queue_dequeueposition = 0;\r
+ // now put it back in the distributor queue - we know there is room because we just made room\r
+ taskqueue_state.queue_data[taskqueue_state.queue_enqueueposition] = t;\r
+ taskqueue_state.queue_enqueueposition++;\r
+ if (taskqueue_state.queue_enqueueposition >= taskqueue_state.queue_size)\r
+ taskqueue_state.queue_enqueueposition = 0;\r
+ // we do not refresh the attempt counter here to avoid deadlock - quite often the only things sitting in the distributor queue are waiting on other tasks\r
+ }\r
+ else\r
+ {\r
+ taskqueue_state_thread_t *s = &taskqueue_state.threads[taskqueue_state.enqueuethread];\r
+ if (s->enqueueposition - s->dequeueposition < THREADTASKS)\r
+ {\r
+ // add the task to the thread's queue\r
+ s->queue[(s->enqueueposition++) % THREADTASKS] = t;\r
+ // since we succeeded in assigning the task, advance the distributor queue\r
+ taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition] = NULL;\r
+ taskqueue_state.queue_dequeueposition++;\r
+ if (taskqueue_state.queue_dequeueposition >= taskqueue_state.queue_size)\r
+ taskqueue_state.queue_dequeueposition = 0;\r
+ // refresh our attempt counter because we did manage to assign something to a thread\r
+ attempts = taskqueue_state.numthreads;\r
+ }\r
+ }\r
+ }\r
+ }\r
+ Thread_AtomicUnlock(&taskqueue_state.command_lock);\r
+ // execute one pending task on the distributor queue, this matters if numthreads is 0\r
+ if (taskqueue_state.queue_dequeueposition != taskqueue_state.queue_enqueueposition)\r
+ {\r
+ taskqueue_task_t *t = taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition];\r
+ taskqueue_state.queue_dequeueposition++;\r
+ if (taskqueue_state.queue_dequeueposition >= taskqueue_state.queue_size)\r
+ taskqueue_state.queue_dequeueposition = 0;\r
+ if (t)\r
+ TaskQueue_ExecuteTask(t);\r
+ }\r
+}\r
+\r
void TaskQueue_WaitForTaskDone(taskqueue_task_t *t)\r
{\r
qboolean done = false;\r
- while (!done)\r
+ for (;;)\r
{\r
Thread_AtomicLock(&taskqueue_state.command_lock);\r
done = t->done != 0;\r
Thread_AtomicUnlock(&taskqueue_state.command_lock);\r
- // if there are no threads, just execute the tasks immediately\r
- if (!done && taskqueue_state.numthreads == 0)\r
- TaskQueue_Execute(true);\r
+ if (done)\r
+ break;\r
+ TaskQueue_DistributeTasks();\r
}\r
}\r
\r
void TaskQueue_Frame(qboolean shutdown)\r
{\r
- int numthreads = shutdown ? 0 : bound(0, taskqueue_maxthreads.integer, sizeof(taskqueue_state.threads) / sizeof(taskqueue_state.threads[0]));\r
+ int i;\r
+ unsigned long long int avg;\r
+ int maxthreads = bound(0, taskqueue_maxthreads.integer, MAXTHREADS);\r
+ int numthreads = maxthreads;\r
+ int tasksperthread = bound(10, taskqueue_tasksperthread.integer, 100000);\r
#ifdef THREADDISABLE\r
numthreads = 0;\r
#endif\r
- if (taskqueue_state.numthreads != numthreads)\r
+\r
+ Thread_AtomicLock(&taskqueue_state.command_lock);\r
+ taskqueue_state.tasks_recentframesindex = (taskqueue_state.tasks_recentframesindex + 1) % RECENTFRAMES;\r
+ taskqueue_state.tasks_recentframes[taskqueue_state.tasks_recentframesindex] = taskqueue_state.tasks_thisframe;\r
+ taskqueue_state.tasks_thisframe = 0;\r
+ avg = 0;\r
+ for (i = 0; i < RECENTFRAMES; i++)\r
+ avg += taskqueue_state.tasks_recentframes[i];\r
+ taskqueue_state.tasks_averageperframe = avg / RECENTFRAMES;\r
+ Thread_AtomicUnlock(&taskqueue_state.command_lock);\r
+\r
+ numthreads = taskqueue_state.tasks_averageperframe / tasksperthread;\r
+ numthreads = bound(taskqueue_minthreads.integer, numthreads, taskqueue_maxthreads.integer);\r
+\r
+ if (shutdown)\r
+ numthreads = 0;\r
+\r
+ // check if we need to close some threads\r
+ if (taskqueue_state.numthreads > numthreads)\r
{\r
- int i;\r
+ // tell extra threads to quit\r
Thread_AtomicLock(&taskqueue_state.command_lock);\r
- taskqueue_state.threads_quit = 1;\r
+ for (i = numthreads; i < taskqueue_state.numthreads; i++)\r
+ taskqueue_state.threads[i].quit = 1;\r
Thread_AtomicUnlock(&taskqueue_state.command_lock);\r
for (i = numthreads; i < taskqueue_state.numthreads; i++)\r
{\r
Thread_WaitThread(taskqueue_state.threads[i].handle, 0);\r
taskqueue_state.threads[i].handle = NULL;\r
}\r
+ // okay we're at the new state now\r
+ taskqueue_state.numthreads = numthreads;\r
+ }\r
+\r
+ // check if we need to start more threads\r
+ if (taskqueue_state.numthreads < numthreads)\r
+ {\r
+ // make sure we're not telling new threads to just quit on startup\r
Thread_AtomicLock(&taskqueue_state.command_lock);\r
- taskqueue_state.threads_quit = 0;\r
+ for (i = taskqueue_state.numthreads; i < numthreads; i++)\r
+ taskqueue_state.threads[i].quit = 0;\r
Thread_AtomicUnlock(&taskqueue_state.command_lock);\r
+\r
+ // start new threads\r
for (i = taskqueue_state.numthreads; i < numthreads; i++)\r
+ {\r
+ taskqueue_state.threads[i].thread_index = i;\r
taskqueue_state.threads[i].handle = Thread_CreateThread(TaskQueue_ThreadFunc, &taskqueue_state.threads[i]);\r
+ }\r
+\r
+ // okay we're at the new state now\r
taskqueue_state.numthreads = numthreads;\r
- // if there are still pending tasks (e.g. no threads), execute them on main thread now\r
- TaskQueue_Execute(true);\r
}\r
+\r
+ // just for good measure, distribute any pending tasks that span across frames\r
+ TaskQueue_DistributeTasks();\r
}\r
\r
void TaskQueue_Setup(taskqueue_task_t *t, taskqueue_task_t *preceding, void(*func)(taskqueue_task_t *), size_t i0, size_t i1, void *p0, void *p1)\r
{\r
// update our partial progress, then yield to another pending task.\r
t->i[0] = numtasks;\r
+ // set our preceding task to one of the ones we are watching for\r
+ t->preceding = &tasks[numtasks - 1];\r
TaskQueue_Yield(t);\r
return;\r
}\r
numtasks--;\r
}\r
- t->started = 1;\r
t->done = 1;\r
}\r