-#include "quakedef.h"\r
-#include "taskqueue.h"\r
-\r
-cvar_t taskqueue_minthreads = {CVAR_CLIENT | CVAR_SERVER | CVAR_SAVE, "taskqueue_minthreads", "4", "minimum number of threads to keep active for executing tasks"};\r
-cvar_t taskqueue_maxthreads = {CVAR_CLIENT | CVAR_SERVER | CVAR_SAVE, "taskqueue_maxthreads", "32", "maximum number of threads to start up as needed based on task count"};\r
-cvar_t taskqueue_tasksperthread = {CVAR_CLIENT | CVAR_SERVER | CVAR_SAVE, "taskqueue_tasksperthread", "4000", "expected amount of work that a single thread can do in a frame - the number of threads being used depends on the average workload in recent frames"};\r
-\r
-#define MAXTHREADS 1024\r
-#define RECENTFRAMES 64 // averaging thread activity over this many frames to decide how many threads we need\r
-#define THREADTASKS 256 // thread can hold this many tasks in its own queue\r
-#define THREADBATCH 64 // thread will run this many tasks before checking status again\r
-#define THREADSLEEPCOUNT 1000 // thread will sleep for a little while if it checks this many times and has no work to do\r
-\r
-typedef struct taskqueue_state_thread_s\r
-{\r
- void *handle;\r
- unsigned int quit;\r
- unsigned int thread_index;\r
- unsigned int tasks_completed;\r
-\r
- unsigned int enqueueposition;\r
- unsigned int dequeueposition;\r
- taskqueue_task_t *queue[THREADTASKS];\r
-}\r
-taskqueue_state_thread_t;\r
-\r
-typedef struct taskqueue_state_s\r
-{\r
- // TaskQueue_DistributeTasks cycles through the threads when assigning, each has its own queue\r
- unsigned int enqueuethread;\r
- int numthreads;\r
- taskqueue_state_thread_t threads[MAXTHREADS];\r
-\r
- // synchronization point for enqueue and some other memory access\r
- Thread_SpinLock command_lock;\r
-\r
- // distributor queue (not assigned to threads yet, or waiting on other tasks)\r
- unsigned int queue_enqueueposition;\r
- unsigned int queue_dequeueposition;\r
- unsigned int queue_size;\r
- taskqueue_task_t **queue_data;\r
-\r
- // metrics to balance workload vs cpu resources\r
- unsigned int tasks_recentframesindex;\r
- unsigned int tasks_recentframes[RECENTFRAMES];\r
- unsigned int tasks_thisframe;\r
- unsigned int tasks_averageperframe;\r
-}\r
-taskqueue_state_t;\r
-\r
-static taskqueue_state_t taskqueue_state;\r
-\r
-void TaskQueue_Init(void)\r
-{\r
- Cvar_RegisterVariable(&taskqueue_minthreads);\r
- Cvar_RegisterVariable(&taskqueue_maxthreads);\r
- Cvar_RegisterVariable(&taskqueue_tasksperthread);\r
-}\r
-\r
-void TaskQueue_Shutdown(void)\r
-{\r
- if (taskqueue_state.numthreads)\r
- TaskQueue_Frame(true);\r
-}\r
-\r
-static void TaskQueue_ExecuteTask(taskqueue_task_t *t)\r
-{\r
- // see if t is waiting on something\r
- if (t->preceding && t->preceding->done == 0)\r
- TaskQueue_Yield(t);\r
- else\r
- t->func(t);\r
-}\r
-\r
-// FIXME: don't use mutex\r
-// FIXME: this is basically fibers but less featureful - context switching for yield is not implemented\r
-static int TaskQueue_ThreadFunc(void *d)\r
-{\r
- taskqueue_state_thread_t *s = (taskqueue_state_thread_t *)d;\r
- unsigned int sleepcounter = 0;\r
- for (;;)\r
- {\r
- qboolean quit;\r
- while (s->dequeueposition != s->enqueueposition)\r
- {\r
- taskqueue_task_t *t = s->queue[s->dequeueposition % THREADTASKS];\r
- TaskQueue_ExecuteTask(t);\r
- // when we advance, also clear the pointer for good measure\r
- s->queue[s->dequeueposition++ % THREADTASKS] = NULL;\r
- sleepcounter = 0;\r
- }\r
- Thread_AtomicLock(&taskqueue_state.command_lock);\r
- quit = s->quit != 0;\r
- Thread_AtomicUnlock(&taskqueue_state.command_lock);\r
- if (quit)\r
- break;\r
- sleepcounter++;\r
- if (sleepcounter >= THREADSLEEPCOUNT)\r
- Sys_Sleep(1000);\r
- sleepcounter = 0;\r
- }\r
- return 0;\r
-}\r
-\r
-void TaskQueue_Enqueue(int numtasks, taskqueue_task_t *tasks)\r
-{\r
- int i;\r
- Thread_AtomicLock(&taskqueue_state.command_lock);\r
- if (taskqueue_state.queue_size <\r
- (taskqueue_state.queue_enqueueposition < taskqueue_state.queue_dequeueposition ? taskqueue_state.queue_size : 0) +\r
- taskqueue_state.queue_enqueueposition - taskqueue_state.queue_dequeueposition + numtasks)\r
- {\r
- // we have to grow the queue...\r
- unsigned int newsize = (taskqueue_state.queue_size + numtasks) * 2;\r
- if (newsize < 1024)\r
- newsize = 1024;\r
- taskqueue_state.queue_data = Mem_Realloc(zonemempool, taskqueue_state.queue_data, sizeof(*taskqueue_state.queue_data) * newsize);\r
- taskqueue_state.queue_size = newsize;\r
- }\r
- for (i = 0; i < numtasks; i++)\r
- {\r
- if (tasks[i].yieldcount == 0)\r
- taskqueue_state.tasks_thisframe++;\r
- taskqueue_state.queue_data[taskqueue_state.queue_enqueueposition] = &tasks[i];\r
- taskqueue_state.queue_enqueueposition++;\r
- if (taskqueue_state.queue_enqueueposition >= taskqueue_state.queue_size)\r
- taskqueue_state.queue_enqueueposition = 0;\r
- }\r
- Thread_AtomicUnlock(&taskqueue_state.command_lock);\r
-}\r
-\r
-// if the task can not be completed due yet to preconditions, just enqueue it again...\r
-void TaskQueue_Yield(taskqueue_task_t *t)\r
-{\r
- t->yieldcount++;\r
- TaskQueue_Enqueue(1, t);\r
-}\r
-\r
-qboolean TaskQueue_IsDone(taskqueue_task_t *t)\r
-{\r
- return !t->done != 0;\r
-}\r
-\r
-void TaskQueue_DistributeTasks(void)\r
-{\r
- Thread_AtomicLock(&taskqueue_state.command_lock);\r
- if (taskqueue_state.numthreads > 0)\r
- {\r
- unsigned int attempts = taskqueue_state.numthreads;\r
- while (attempts-- > 0 && taskqueue_state.queue_enqueueposition != taskqueue_state.queue_dequeueposition)\r
- {\r
- taskqueue_task_t *t = taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition];\r
- if (t->preceding && t->preceding->done == 0)\r
- {\r
- // task is waiting on something\r
- // first dequeue it properly\r
- taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition] = NULL;\r
- taskqueue_state.queue_dequeueposition++;\r
- if (taskqueue_state.queue_dequeueposition >= taskqueue_state.queue_size)\r
- taskqueue_state.queue_dequeueposition = 0;\r
- // now put it back in the distributor queue - we know there is room because we just made room\r
- taskqueue_state.queue_data[taskqueue_state.queue_enqueueposition] = t;\r
- taskqueue_state.queue_enqueueposition++;\r
- if (taskqueue_state.queue_enqueueposition >= taskqueue_state.queue_size)\r
- taskqueue_state.queue_enqueueposition = 0;\r
- // we do not refresh the attempt counter here to avoid deadlock - quite often the only things sitting in the distributor queue are waiting on other tasks\r
- }\r
- else\r
- {\r
- taskqueue_state_thread_t *s = &taskqueue_state.threads[taskqueue_state.enqueuethread];\r
- if (s->enqueueposition - s->dequeueposition < THREADTASKS)\r
- {\r
- // add the task to the thread's queue\r
- s->queue[(s->enqueueposition++) % THREADTASKS] = t;\r
- // since we succeeded in assigning the task, advance the distributor queue\r
- taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition] = NULL;\r
- taskqueue_state.queue_dequeueposition++;\r
- if (taskqueue_state.queue_dequeueposition >= taskqueue_state.queue_size)\r
- taskqueue_state.queue_dequeueposition = 0;\r
- // refresh our attempt counter because we did manage to assign something to a thread\r
- attempts = taskqueue_state.numthreads;\r
- }\r
- }\r
- }\r
- }\r
- Thread_AtomicUnlock(&taskqueue_state.command_lock);\r
- // execute one pending task on the distributor queue, this matters if numthreads is 0\r
- if (taskqueue_state.queue_dequeueposition != taskqueue_state.queue_enqueueposition)\r
- {\r
- taskqueue_task_t *t = taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition];\r
- taskqueue_state.queue_dequeueposition++;\r
- if (taskqueue_state.queue_dequeueposition >= taskqueue_state.queue_size)\r
- taskqueue_state.queue_dequeueposition = 0;\r
- if (t)\r
- TaskQueue_ExecuteTask(t);\r
- }\r
-}\r
-\r
-void TaskQueue_WaitForTaskDone(taskqueue_task_t *t)\r
-{\r
- qboolean done = false;\r
- for (;;)\r
- {\r
- Thread_AtomicLock(&taskqueue_state.command_lock);\r
- done = t->done != 0;\r
- Thread_AtomicUnlock(&taskqueue_state.command_lock);\r
- if (done)\r
- break;\r
- TaskQueue_DistributeTasks();\r
- }\r
-}\r
-\r
-void TaskQueue_Frame(qboolean shutdown)\r
-{\r
- int i;\r
- unsigned long long int avg;\r
- int maxthreads = bound(0, taskqueue_maxthreads.integer, MAXTHREADS);\r
- int numthreads = maxthreads;\r
- int tasksperthread = bound(10, taskqueue_tasksperthread.integer, 100000);\r
-#ifdef THREADDISABLE\r
- numthreads = 0;\r
-#endif\r
-\r
- Thread_AtomicLock(&taskqueue_state.command_lock);\r
- taskqueue_state.tasks_recentframesindex = (taskqueue_state.tasks_recentframesindex + 1) % RECENTFRAMES;\r
- taskqueue_state.tasks_recentframes[taskqueue_state.tasks_recentframesindex] = taskqueue_state.tasks_thisframe;\r
- taskqueue_state.tasks_thisframe = 0;\r
- avg = 0;\r
- for (i = 0; i < RECENTFRAMES; i++)\r
- avg += taskqueue_state.tasks_recentframes[i];\r
- taskqueue_state.tasks_averageperframe = avg / RECENTFRAMES;\r
- Thread_AtomicUnlock(&taskqueue_state.command_lock);\r
-\r
- numthreads = taskqueue_state.tasks_averageperframe / tasksperthread;\r
- numthreads = bound(taskqueue_minthreads.integer, numthreads, taskqueue_maxthreads.integer);\r
-\r
- if (shutdown)\r
- numthreads = 0;\r
-\r
- // check if we need to close some threads\r
- if (taskqueue_state.numthreads > numthreads)\r
- {\r
- // tell extra threads to quit\r
- Thread_AtomicLock(&taskqueue_state.command_lock);\r
- for (i = numthreads; i < taskqueue_state.numthreads; i++)\r
- taskqueue_state.threads[i].quit = 1;\r
- Thread_AtomicUnlock(&taskqueue_state.command_lock);\r
- for (i = numthreads; i < taskqueue_state.numthreads; i++)\r
- {\r
- if (taskqueue_state.threads[i].handle)\r
- Thread_WaitThread(taskqueue_state.threads[i].handle, 0);\r
- taskqueue_state.threads[i].handle = NULL;\r
- }\r
- // okay we're at the new state now\r
- taskqueue_state.numthreads = numthreads;\r
- }\r
-\r
- // check if we need to start more threads\r
- if (taskqueue_state.numthreads < numthreads)\r
- {\r
- // make sure we're not telling new threads to just quit on startup\r
- Thread_AtomicLock(&taskqueue_state.command_lock);\r
- for (i = taskqueue_state.numthreads; i < numthreads; i++)\r
- taskqueue_state.threads[i].quit = 0;\r
- Thread_AtomicUnlock(&taskqueue_state.command_lock);\r
-\r
- // start new threads\r
- for (i = taskqueue_state.numthreads; i < numthreads; i++)\r
- {\r
- taskqueue_state.threads[i].thread_index = i;\r
- taskqueue_state.threads[i].handle = Thread_CreateThread(TaskQueue_ThreadFunc, &taskqueue_state.threads[i]);\r
- }\r
-\r
- // okay we're at the new state now\r
- taskqueue_state.numthreads = numthreads;\r
- }\r
-\r
- // just for good measure, distribute any pending tasks that span across frames\r
- TaskQueue_DistributeTasks();\r
-}\r
-\r
-void TaskQueue_Setup(taskqueue_task_t *t, taskqueue_task_t *preceding, void(*func)(taskqueue_task_t *), size_t i0, size_t i1, void *p0, void *p1)\r
-{\r
- memset(t, 0, sizeof(*t));\r
- t->preceding = preceding;\r
- t->func = func;\r
- t->i[0] = i0;\r
- t->i[1] = i1;\r
- t->p[0] = p0;\r
- t->p[1] = p1;\r
-}\r
-\r
-void TaskQueue_Task_CheckTasksDone(taskqueue_task_t *t)\r
-{\r
- size_t numtasks = t->i[0];\r
- taskqueue_task_t *tasks = t->p[0];\r
- while (numtasks > 0)\r
- {\r
- // check the last task first as it's usually going to be the last to finish, so we do the least work by checking it first\r
- if (!tasks[numtasks - 1].done)\r
- {\r
- // update our partial progress, then yield to another pending task.\r
- t->i[0] = numtasks;\r
- // set our preceding task to one of the ones we are watching for\r
- t->preceding = &tasks[numtasks - 1];\r
- TaskQueue_Yield(t);\r
- return;\r
- }\r
- numtasks--;\r
- }\r
- t->done = 1;\r
-}\r
+#include "quakedef.h"
+#include "taskqueue.h"
+
+cvar_t taskqueue_minthreads = {CVAR_CLIENT | CVAR_SERVER | CVAR_SAVE, "taskqueue_minthreads", "4", "minimum number of threads to keep active for executing tasks"};
+cvar_t taskqueue_maxthreads = {CVAR_CLIENT | CVAR_SERVER | CVAR_SAVE, "taskqueue_maxthreads", "32", "maximum number of threads to start up as needed based on task count"};
+cvar_t taskqueue_tasksperthread = {CVAR_CLIENT | CVAR_SERVER | CVAR_SAVE, "taskqueue_tasksperthread", "4000", "expected amount of work that a single thread can do in a frame - the number of threads being used depends on the average workload in recent frames"};
+
+#define MAXTHREADS 1024
+#define RECENTFRAMES 64 // averaging thread activity over this many frames to decide how many threads we need
+#define THREADTASKS 256 // thread can hold this many tasks in its own queue
+#define THREADBATCH 64 // thread will run this many tasks before checking status again
+#define THREADSLEEPCOUNT 1000 // thread will sleep for a little while if it checks this many times and has no work to do
+
+typedef struct taskqueue_state_thread_s
+{
+ void *handle;
+ unsigned int quit;
+ unsigned int thread_index;
+ unsigned int tasks_completed;
+
+ unsigned int enqueueposition;
+ unsigned int dequeueposition;
+ taskqueue_task_t *queue[THREADTASKS];
+}
+taskqueue_state_thread_t;
+
+typedef struct taskqueue_state_s
+{
+ // TaskQueue_DistributeTasks cycles through the threads when assigning, each has its own queue
+ unsigned int enqueuethread;
+ int numthreads;
+ taskqueue_state_thread_t threads[MAXTHREADS];
+
+ // synchronization point for enqueue and some other memory access
+ Thread_SpinLock command_lock;
+
+ // distributor queue (not assigned to threads yet, or waiting on other tasks)
+ unsigned int queue_enqueueposition;
+ unsigned int queue_dequeueposition;
+ unsigned int queue_size;
+ taskqueue_task_t **queue_data;
+
+ // metrics to balance workload vs cpu resources
+ unsigned int tasks_recentframesindex;
+ unsigned int tasks_recentframes[RECENTFRAMES];
+ unsigned int tasks_thisframe;
+ unsigned int tasks_averageperframe;
+}
+taskqueue_state_t;
+
+static taskqueue_state_t taskqueue_state;
+
+void TaskQueue_Init(void)
+{
+ Cvar_RegisterVariable(&taskqueue_minthreads);
+ Cvar_RegisterVariable(&taskqueue_maxthreads);
+ Cvar_RegisterVariable(&taskqueue_tasksperthread);
+}
+
+void TaskQueue_Shutdown(void)
+{
+ if (taskqueue_state.numthreads)
+ TaskQueue_Frame(true);
+}
+
+static void TaskQueue_ExecuteTask(taskqueue_task_t *t)
+{
+ // see if t is waiting on something
+ if (t->preceding && t->preceding->done == 0)
+ TaskQueue_Yield(t);
+ else
+ t->func(t);
+}
+
+// FIXME: don't use mutex
+// FIXME: this is basically fibers but less featureful - context switching for yield is not implemented
+static int TaskQueue_ThreadFunc(void *d)
+{
+ taskqueue_state_thread_t *s = (taskqueue_state_thread_t *)d;
+ unsigned int sleepcounter = 0;
+ for (;;)
+ {
+ qboolean quit;
+ while (s->dequeueposition != s->enqueueposition)
+ {
+ taskqueue_task_t *t = s->queue[s->dequeueposition % THREADTASKS];
+ TaskQueue_ExecuteTask(t);
+ // when we advance, also clear the pointer for good measure
+ s->queue[s->dequeueposition++ % THREADTASKS] = NULL;
+ sleepcounter = 0;
+ }
+ Thread_AtomicLock(&taskqueue_state.command_lock);
+ quit = s->quit != 0;
+ Thread_AtomicUnlock(&taskqueue_state.command_lock);
+ if (quit)
+ break;
+ sleepcounter++;
+ if (sleepcounter >= THREADSLEEPCOUNT)
+ Sys_Sleep(1000);
+ sleepcounter = 0;
+ }
+ return 0;
+}
+
+void TaskQueue_Enqueue(int numtasks, taskqueue_task_t *tasks)
+{
+ int i;
+ Thread_AtomicLock(&taskqueue_state.command_lock);
+ if (taskqueue_state.queue_size <
+ (taskqueue_state.queue_enqueueposition < taskqueue_state.queue_dequeueposition ? taskqueue_state.queue_size : 0) +
+ taskqueue_state.queue_enqueueposition - taskqueue_state.queue_dequeueposition + numtasks)
+ {
+ // we have to grow the queue...
+ unsigned int newsize = (taskqueue_state.queue_size + numtasks) * 2;
+ if (newsize < 1024)
+ newsize = 1024;
+ taskqueue_state.queue_data = Mem_Realloc(zonemempool, taskqueue_state.queue_data, sizeof(*taskqueue_state.queue_data) * newsize);
+ taskqueue_state.queue_size = newsize;
+ }
+ for (i = 0; i < numtasks; i++)
+ {
+ if (tasks[i].yieldcount == 0)
+ taskqueue_state.tasks_thisframe++;
+ taskqueue_state.queue_data[taskqueue_state.queue_enqueueposition] = &tasks[i];
+ taskqueue_state.queue_enqueueposition++;
+ if (taskqueue_state.queue_enqueueposition >= taskqueue_state.queue_size)
+ taskqueue_state.queue_enqueueposition = 0;
+ }
+ Thread_AtomicUnlock(&taskqueue_state.command_lock);
+}
+
+// if the task can not be completed due yet to preconditions, just enqueue it again...
+void TaskQueue_Yield(taskqueue_task_t *t)
+{
+ t->yieldcount++;
+ TaskQueue_Enqueue(1, t);
+}
+
+qboolean TaskQueue_IsDone(taskqueue_task_t *t)
+{
+ return !t->done != 0;
+}
+
+void TaskQueue_DistributeTasks(void)
+{
+ Thread_AtomicLock(&taskqueue_state.command_lock);
+ if (taskqueue_state.numthreads > 0)
+ {
+ unsigned int attempts = taskqueue_state.numthreads;
+ while (attempts-- > 0 && taskqueue_state.queue_enqueueposition != taskqueue_state.queue_dequeueposition)
+ {
+ taskqueue_task_t *t = taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition];
+ if (t->preceding && t->preceding->done == 0)
+ {
+ // task is waiting on something
+ // first dequeue it properly
+ taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition] = NULL;
+ taskqueue_state.queue_dequeueposition++;
+ if (taskqueue_state.queue_dequeueposition >= taskqueue_state.queue_size)
+ taskqueue_state.queue_dequeueposition = 0;
+ // now put it back in the distributor queue - we know there is room because we just made room
+ taskqueue_state.queue_data[taskqueue_state.queue_enqueueposition] = t;
+ taskqueue_state.queue_enqueueposition++;
+ if (taskqueue_state.queue_enqueueposition >= taskqueue_state.queue_size)
+ taskqueue_state.queue_enqueueposition = 0;
+ // we do not refresh the attempt counter here to avoid deadlock - quite often the only things sitting in the distributor queue are waiting on other tasks
+ }
+ else
+ {
+ taskqueue_state_thread_t *s = &taskqueue_state.threads[taskqueue_state.enqueuethread];
+ if (s->enqueueposition - s->dequeueposition < THREADTASKS)
+ {
+ // add the task to the thread's queue
+ s->queue[(s->enqueueposition++) % THREADTASKS] = t;
+ // since we succeeded in assigning the task, advance the distributor queue
+ taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition] = NULL;
+ taskqueue_state.queue_dequeueposition++;
+ if (taskqueue_state.queue_dequeueposition >= taskqueue_state.queue_size)
+ taskqueue_state.queue_dequeueposition = 0;
+ // refresh our attempt counter because we did manage to assign something to a thread
+ attempts = taskqueue_state.numthreads;
+ }
+ }
+ }
+ }
+ Thread_AtomicUnlock(&taskqueue_state.command_lock);
+ // execute one pending task on the distributor queue, this matters if numthreads is 0
+ if (taskqueue_state.queue_dequeueposition != taskqueue_state.queue_enqueueposition)
+ {
+ taskqueue_task_t *t = taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition];
+ taskqueue_state.queue_dequeueposition++;
+ if (taskqueue_state.queue_dequeueposition >= taskqueue_state.queue_size)
+ taskqueue_state.queue_dequeueposition = 0;
+ if (t)
+ TaskQueue_ExecuteTask(t);
+ }
+}
+
+void TaskQueue_WaitForTaskDone(taskqueue_task_t *t)
+{
+ qboolean done = false;
+ for (;;)
+ {
+ Thread_AtomicLock(&taskqueue_state.command_lock);
+ done = t->done != 0;
+ Thread_AtomicUnlock(&taskqueue_state.command_lock);
+ if (done)
+ break;
+ TaskQueue_DistributeTasks();
+ }
+}
+
+void TaskQueue_Frame(qboolean shutdown)
+{
+ int i;
+ unsigned long long int avg;
+ int maxthreads = bound(0, taskqueue_maxthreads.integer, MAXTHREADS);
+ int numthreads = maxthreads;
+ int tasksperthread = bound(10, taskqueue_tasksperthread.integer, 100000);
+#ifdef THREADDISABLE
+ numthreads = 0;
+#endif
+
+ Thread_AtomicLock(&taskqueue_state.command_lock);
+ taskqueue_state.tasks_recentframesindex = (taskqueue_state.tasks_recentframesindex + 1) % RECENTFRAMES;
+ taskqueue_state.tasks_recentframes[taskqueue_state.tasks_recentframesindex] = taskqueue_state.tasks_thisframe;
+ taskqueue_state.tasks_thisframe = 0;
+ avg = 0;
+ for (i = 0; i < RECENTFRAMES; i++)
+ avg += taskqueue_state.tasks_recentframes[i];
+ taskqueue_state.tasks_averageperframe = avg / RECENTFRAMES;
+ Thread_AtomicUnlock(&taskqueue_state.command_lock);
+
+ numthreads = taskqueue_state.tasks_averageperframe / tasksperthread;
+ numthreads = bound(taskqueue_minthreads.integer, numthreads, taskqueue_maxthreads.integer);
+
+ if (shutdown)
+ numthreads = 0;
+
+ // check if we need to close some threads
+ if (taskqueue_state.numthreads > numthreads)
+ {
+ // tell extra threads to quit
+ Thread_AtomicLock(&taskqueue_state.command_lock);
+ for (i = numthreads; i < taskqueue_state.numthreads; i++)
+ taskqueue_state.threads[i].quit = 1;
+ Thread_AtomicUnlock(&taskqueue_state.command_lock);
+ for (i = numthreads; i < taskqueue_state.numthreads; i++)
+ {
+ if (taskqueue_state.threads[i].handle)
+ Thread_WaitThread(taskqueue_state.threads[i].handle, 0);
+ taskqueue_state.threads[i].handle = NULL;
+ }
+ // okay we're at the new state now
+ taskqueue_state.numthreads = numthreads;
+ }
+
+ // check if we need to start more threads
+ if (taskqueue_state.numthreads < numthreads)
+ {
+ // make sure we're not telling new threads to just quit on startup
+ Thread_AtomicLock(&taskqueue_state.command_lock);
+ for (i = taskqueue_state.numthreads; i < numthreads; i++)
+ taskqueue_state.threads[i].quit = 0;
+ Thread_AtomicUnlock(&taskqueue_state.command_lock);
+
+ // start new threads
+ for (i = taskqueue_state.numthreads; i < numthreads; i++)
+ {
+ taskqueue_state.threads[i].thread_index = i;
+ taskqueue_state.threads[i].handle = Thread_CreateThread(TaskQueue_ThreadFunc, &taskqueue_state.threads[i]);
+ }
+
+ // okay we're at the new state now
+ taskqueue_state.numthreads = numthreads;
+ }
+
+ // just for good measure, distribute any pending tasks that span across frames
+ TaskQueue_DistributeTasks();
+}
+
+void TaskQueue_Setup(taskqueue_task_t *t, taskqueue_task_t *preceding, void(*func)(taskqueue_task_t *), size_t i0, size_t i1, void *p0, void *p1)
+{
+ memset(t, 0, sizeof(*t));
+ t->preceding = preceding;
+ t->func = func;
+ t->i[0] = i0;
+ t->i[1] = i1;
+ t->p[0] = p0;
+ t->p[1] = p1;
+}
+
+void TaskQueue_Task_CheckTasksDone(taskqueue_task_t *t)
+{
+ size_t numtasks = t->i[0];
+ taskqueue_task_t *tasks = t->p[0];
+ while (numtasks > 0)
+ {
+ // check the last task first as it's usually going to be the last to finish, so we do the least work by checking it first
+ if (!tasks[numtasks - 1].done)
+ {
+ // update our partial progress, then yield to another pending task.
+ t->i[0] = numtasks;
+ // set our preceding task to one of the ones we are watching for
+ t->preceding = &tasks[numtasks - 1];
+ TaskQueue_Yield(t);
+ return;
+ }
+ numtasks--;
+ }
+ t->done = 1;
+}