]> git.rm.cloudns.org Git - xonotic/darkplaces.git/commitdiff
Fix line endings on taskqueue.[ch]
authorhavoc <havoc@d7cf8633-e32d-0410-b094-e92efae38249>
Wed, 29 Jan 2020 22:41:19 +0000 (22:41 +0000)
committerhavoc <havoc@d7cf8633-e32d-0410-b094-e92efae38249>
Wed, 29 Jan 2020 22:41:19 +0000 (22:41 +0000)
git-svn-id: svn://svn.icculus.org/twilight/trunk/darkplaces@12517 d7cf8633-e32d-0410-b094-e92efae38249

taskqueue.c
taskqueue.h

index 35a6c753d691378a87c30aba381ecf891399a152..b43f8d46cce25a89ed97ddd8db8a408b0e171807 100644 (file)
-#include "quakedef.h"\r
-#include "taskqueue.h"\r
-\r
-cvar_t taskqueue_minthreads = {CVAR_CLIENT | CVAR_SERVER | CVAR_SAVE, "taskqueue_minthreads", "4", "minimum number of threads to keep active for executing tasks"};\r
-cvar_t taskqueue_maxthreads = {CVAR_CLIENT | CVAR_SERVER | CVAR_SAVE, "taskqueue_maxthreads", "32", "maximum number of threads to start up as needed based on task count"};\r
-cvar_t taskqueue_tasksperthread = {CVAR_CLIENT | CVAR_SERVER | CVAR_SAVE, "taskqueue_tasksperthread", "4000", "expected amount of work that a single thread can do in a frame - the number of threads being used depends on the average workload in recent frames"};\r
-\r
-#define MAXTHREADS 1024\r
-#define RECENTFRAMES 64 // averaging thread activity over this many frames to decide how many threads we need\r
-#define THREADTASKS 256 // thread can hold this many tasks in its own queue\r
-#define THREADBATCH 64 // thread will run this many tasks before checking status again\r
-#define THREADSLEEPCOUNT 1000 // thread will sleep for a little while if it checks this many times and has no work to do\r
-\r
-typedef struct taskqueue_state_thread_s\r
-{\r
-       void *handle;\r
-       unsigned int quit;\r
-       unsigned int thread_index;\r
-       unsigned int tasks_completed;\r
-\r
-       unsigned int enqueueposition;\r
-       unsigned int dequeueposition;\r
-       taskqueue_task_t *queue[THREADTASKS];\r
-}\r
-taskqueue_state_thread_t;\r
-\r
-typedef struct taskqueue_state_s\r
-{\r
-       // TaskQueue_DistributeTasks cycles through the threads when assigning, each has its own queue\r
-       unsigned int enqueuethread;\r
-       int numthreads;\r
-       taskqueue_state_thread_t threads[MAXTHREADS];\r
-\r
-       // synchronization point for enqueue and some other memory access\r
-       Thread_SpinLock command_lock;\r
-\r
-       // distributor queue (not assigned to threads yet, or waiting on other tasks)\r
-       unsigned int queue_enqueueposition;\r
-       unsigned int queue_dequeueposition;\r
-       unsigned int queue_size;\r
-       taskqueue_task_t **queue_data;\r
-\r
-       // metrics to balance workload vs cpu resources\r
-       unsigned int tasks_recentframesindex;\r
-       unsigned int tasks_recentframes[RECENTFRAMES];\r
-       unsigned int tasks_thisframe;\r
-       unsigned int tasks_averageperframe;\r
-}\r
-taskqueue_state_t;\r
-\r
-static taskqueue_state_t taskqueue_state;\r
-\r
-void TaskQueue_Init(void)\r
-{\r
-       Cvar_RegisterVariable(&taskqueue_minthreads);\r
-       Cvar_RegisterVariable(&taskqueue_maxthreads);\r
-       Cvar_RegisterVariable(&taskqueue_tasksperthread);\r
-}\r
-\r
-void TaskQueue_Shutdown(void)\r
-{\r
-       if (taskqueue_state.numthreads)\r
-               TaskQueue_Frame(true);\r
-}\r
-\r
-static void TaskQueue_ExecuteTask(taskqueue_task_t *t)\r
-{\r
-       // see if t is waiting on something\r
-       if (t->preceding && t->preceding->done == 0)\r
-               TaskQueue_Yield(t);\r
-       else\r
-               t->func(t);\r
-}\r
-\r
-// FIXME: don't use mutex\r
-// FIXME: this is basically fibers but less featureful - context switching for yield is not implemented\r
-static int TaskQueue_ThreadFunc(void *d)\r
-{\r
-       taskqueue_state_thread_t *s = (taskqueue_state_thread_t *)d;\r
-       unsigned int sleepcounter = 0;\r
-       for (;;)\r
-       {\r
-               qboolean quit;\r
-               while (s->dequeueposition != s->enqueueposition)\r
-               {\r
-                       taskqueue_task_t *t = s->queue[s->dequeueposition % THREADTASKS];\r
-                       TaskQueue_ExecuteTask(t);\r
-                       // when we advance, also clear the pointer for good measure\r
-                       s->queue[s->dequeueposition++ % THREADTASKS] = NULL;\r
-                       sleepcounter = 0;\r
-               }\r
-               Thread_AtomicLock(&taskqueue_state.command_lock);\r
-               quit = s->quit != 0;\r
-               Thread_AtomicUnlock(&taskqueue_state.command_lock);\r
-               if (quit)\r
-                       break;\r
-               sleepcounter++;\r
-               if (sleepcounter >= THREADSLEEPCOUNT)\r
-                       Sys_Sleep(1000);\r
-               sleepcounter = 0;\r
-       }\r
-       return 0;\r
-}\r
-\r
-void TaskQueue_Enqueue(int numtasks, taskqueue_task_t *tasks)\r
-{\r
-       int i;\r
-       Thread_AtomicLock(&taskqueue_state.command_lock);\r
-       if (taskqueue_state.queue_size <\r
-               (taskqueue_state.queue_enqueueposition < taskqueue_state.queue_dequeueposition ? taskqueue_state.queue_size : 0) +\r
-               taskqueue_state.queue_enqueueposition - taskqueue_state.queue_dequeueposition + numtasks)\r
-       {\r
-               // we have to grow the queue...\r
-               unsigned int newsize = (taskqueue_state.queue_size + numtasks) * 2;\r
-               if (newsize < 1024)\r
-                       newsize = 1024;\r
-               taskqueue_state.queue_data = Mem_Realloc(zonemempool, taskqueue_state.queue_data, sizeof(*taskqueue_state.queue_data) * newsize);\r
-               taskqueue_state.queue_size = newsize;\r
-       }\r
-       for (i = 0; i < numtasks; i++)\r
-       {\r
-               if (tasks[i].yieldcount == 0)\r
-                       taskqueue_state.tasks_thisframe++;\r
-               taskqueue_state.queue_data[taskqueue_state.queue_enqueueposition] = &tasks[i];\r
-               taskqueue_state.queue_enqueueposition++;\r
-               if (taskqueue_state.queue_enqueueposition >= taskqueue_state.queue_size)\r
-                       taskqueue_state.queue_enqueueposition = 0;\r
-       }\r
-       Thread_AtomicUnlock(&taskqueue_state.command_lock);\r
-}\r
-\r
-// if the task can not be completed due yet to preconditions, just enqueue it again...\r
-void TaskQueue_Yield(taskqueue_task_t *t)\r
-{\r
-       t->yieldcount++;\r
-       TaskQueue_Enqueue(1, t);\r
-}\r
-\r
-qboolean TaskQueue_IsDone(taskqueue_task_t *t)\r
-{\r
-       return !t->done != 0;\r
-}\r
-\r
-void TaskQueue_DistributeTasks(void)\r
-{\r
-       Thread_AtomicLock(&taskqueue_state.command_lock);\r
-       if (taskqueue_state.numthreads > 0)\r
-       {\r
-               unsigned int attempts = taskqueue_state.numthreads;\r
-               while (attempts-- > 0 && taskqueue_state.queue_enqueueposition != taskqueue_state.queue_dequeueposition)\r
-               {\r
-                       taskqueue_task_t *t = taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition];\r
-                       if (t->preceding && t->preceding->done == 0)\r
-                       {\r
-                               // task is waiting on something\r
-                               // first dequeue it properly\r
-                               taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition] = NULL;\r
-                               taskqueue_state.queue_dequeueposition++;\r
-                               if (taskqueue_state.queue_dequeueposition >= taskqueue_state.queue_size)\r
-                                       taskqueue_state.queue_dequeueposition = 0;\r
-                               // now put it back in the distributor queue - we know there is room because we just made room\r
-                               taskqueue_state.queue_data[taskqueue_state.queue_enqueueposition] = t;\r
-                               taskqueue_state.queue_enqueueposition++;\r
-                               if (taskqueue_state.queue_enqueueposition >= taskqueue_state.queue_size)\r
-                                       taskqueue_state.queue_enqueueposition = 0;\r
-                               // we do not refresh the attempt counter here to avoid deadlock - quite often the only things sitting in the distributor queue are waiting on other tasks\r
-                       }\r
-                       else\r
-                       {\r
-                               taskqueue_state_thread_t *s = &taskqueue_state.threads[taskqueue_state.enqueuethread];\r
-                               if (s->enqueueposition - s->dequeueposition < THREADTASKS)\r
-                               {\r
-                                       // add the task to the thread's queue\r
-                                       s->queue[(s->enqueueposition++) % THREADTASKS] = t;\r
-                                       // since we succeeded in assigning the task, advance the distributor queue\r
-                                       taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition] = NULL;\r
-                                       taskqueue_state.queue_dequeueposition++;\r
-                                       if (taskqueue_state.queue_dequeueposition >= taskqueue_state.queue_size)\r
-                                               taskqueue_state.queue_dequeueposition = 0;\r
-                                       // refresh our attempt counter because we did manage to assign something to a thread\r
-                                       attempts = taskqueue_state.numthreads;\r
-                               }\r
-                       }\r
-               }\r
-       }\r
-       Thread_AtomicUnlock(&taskqueue_state.command_lock);\r
-       // execute one pending task on the distributor queue, this matters if numthreads is 0\r
-       if (taskqueue_state.queue_dequeueposition != taskqueue_state.queue_enqueueposition)\r
-       {\r
-               taskqueue_task_t *t = taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition];\r
-               taskqueue_state.queue_dequeueposition++;\r
-               if (taskqueue_state.queue_dequeueposition >= taskqueue_state.queue_size)\r
-                       taskqueue_state.queue_dequeueposition = 0;\r
-               if (t)\r
-                       TaskQueue_ExecuteTask(t);\r
-       }\r
-}\r
-\r
-void TaskQueue_WaitForTaskDone(taskqueue_task_t *t)\r
-{\r
-       qboolean done = false;\r
-       for (;;)\r
-       {\r
-               Thread_AtomicLock(&taskqueue_state.command_lock);\r
-               done = t->done != 0;\r
-               Thread_AtomicUnlock(&taskqueue_state.command_lock);\r
-               if (done)\r
-                       break;\r
-               TaskQueue_DistributeTasks();\r
-       }\r
-}\r
-\r
-void TaskQueue_Frame(qboolean shutdown)\r
-{\r
-       int i;\r
-       unsigned long long int avg;\r
-       int maxthreads = bound(0, taskqueue_maxthreads.integer, MAXTHREADS);\r
-       int numthreads = maxthreads;\r
-       int tasksperthread = bound(10, taskqueue_tasksperthread.integer, 100000);\r
-#ifdef THREADDISABLE\r
-       numthreads = 0;\r
-#endif\r
-\r
-       Thread_AtomicLock(&taskqueue_state.command_lock);\r
-       taskqueue_state.tasks_recentframesindex = (taskqueue_state.tasks_recentframesindex + 1) % RECENTFRAMES;\r
-       taskqueue_state.tasks_recentframes[taskqueue_state.tasks_recentframesindex] = taskqueue_state.tasks_thisframe;\r
-       taskqueue_state.tasks_thisframe = 0;\r
-       avg = 0;\r
-       for (i = 0; i < RECENTFRAMES; i++)\r
-               avg += taskqueue_state.tasks_recentframes[i];\r
-       taskqueue_state.tasks_averageperframe = avg / RECENTFRAMES;\r
-       Thread_AtomicUnlock(&taskqueue_state.command_lock);\r
-\r
-       numthreads = taskqueue_state.tasks_averageperframe / tasksperthread;\r
-       numthreads = bound(taskqueue_minthreads.integer, numthreads, taskqueue_maxthreads.integer);\r
-\r
-       if (shutdown)\r
-               numthreads = 0;\r
-\r
-       // check if we need to close some threads\r
-       if (taskqueue_state.numthreads > numthreads)\r
-       {\r
-               // tell extra threads to quit\r
-               Thread_AtomicLock(&taskqueue_state.command_lock);\r
-               for (i = numthreads; i < taskqueue_state.numthreads; i++)\r
-                       taskqueue_state.threads[i].quit = 1;\r
-               Thread_AtomicUnlock(&taskqueue_state.command_lock);\r
-               for (i = numthreads; i < taskqueue_state.numthreads; i++)\r
-               {\r
-                       if (taskqueue_state.threads[i].handle)\r
-                               Thread_WaitThread(taskqueue_state.threads[i].handle, 0);\r
-                       taskqueue_state.threads[i].handle = NULL;\r
-               }\r
-               // okay we're at the new state now\r
-               taskqueue_state.numthreads = numthreads;\r
-       }\r
-\r
-       // check if we need to start more threads\r
-       if (taskqueue_state.numthreads < numthreads)\r
-       {\r
-               // make sure we're not telling new threads to just quit on startup\r
-               Thread_AtomicLock(&taskqueue_state.command_lock);\r
-               for (i = taskqueue_state.numthreads; i < numthreads; i++)\r
-                       taskqueue_state.threads[i].quit = 0;\r
-               Thread_AtomicUnlock(&taskqueue_state.command_lock);\r
-\r
-               // start new threads\r
-               for (i = taskqueue_state.numthreads; i < numthreads; i++)\r
-               {\r
-                       taskqueue_state.threads[i].thread_index = i;\r
-                       taskqueue_state.threads[i].handle = Thread_CreateThread(TaskQueue_ThreadFunc, &taskqueue_state.threads[i]);\r
-               }\r
-\r
-               // okay we're at the new state now\r
-               taskqueue_state.numthreads = numthreads;\r
-       }\r
-\r
-       // just for good measure, distribute any pending tasks that span across frames\r
-       TaskQueue_DistributeTasks();\r
-}\r
-\r
-void TaskQueue_Setup(taskqueue_task_t *t, taskqueue_task_t *preceding, void(*func)(taskqueue_task_t *), size_t i0, size_t i1, void *p0, void *p1)\r
-{\r
-       memset(t, 0, sizeof(*t));\r
-       t->preceding = preceding;\r
-       t->func = func;\r
-       t->i[0] = i0;\r
-       t->i[1] = i1;\r
-       t->p[0] = p0;\r
-       t->p[1] = p1;\r
-}\r
-\r
-void TaskQueue_Task_CheckTasksDone(taskqueue_task_t *t)\r
-{\r
-       size_t numtasks = t->i[0];\r
-       taskqueue_task_t *tasks = t->p[0];\r
-       while (numtasks > 0)\r
-       {\r
-               // check the last task first as it's usually going to be the last to finish, so we do the least work by checking it first\r
-               if (!tasks[numtasks - 1].done)\r
-               {\r
-                       // update our partial progress, then yield to another pending task.\r
-                       t->i[0] = numtasks;\r
-                       // set our preceding task to one of the ones we are watching for\r
-                       t->preceding = &tasks[numtasks - 1];\r
-                       TaskQueue_Yield(t);\r
-                       return;\r
-               }\r
-               numtasks--;\r
-       }\r
-       t->done = 1;\r
-}\r
+#include "quakedef.h"
+#include "taskqueue.h"
+
+cvar_t taskqueue_minthreads = {CVAR_CLIENT | CVAR_SERVER | CVAR_SAVE, "taskqueue_minthreads", "4", "minimum number of threads to keep active for executing tasks"};
+cvar_t taskqueue_maxthreads = {CVAR_CLIENT | CVAR_SERVER | CVAR_SAVE, "taskqueue_maxthreads", "32", "maximum number of threads to start up as needed based on task count"};
+cvar_t taskqueue_tasksperthread = {CVAR_CLIENT | CVAR_SERVER | CVAR_SAVE, "taskqueue_tasksperthread", "4000", "expected amount of work that a single thread can do in a frame - the number of threads being used depends on the average workload in recent frames"};
+
+#define MAXTHREADS 1024
+#define RECENTFRAMES 64 // averaging thread activity over this many frames to decide how many threads we need
+#define THREADTASKS 256 // thread can hold this many tasks in its own queue
+#define THREADBATCH 64 // thread will run this many tasks before checking status again
+#define THREADSLEEPCOUNT 1000 // thread will sleep for a little while if it checks this many times and has no work to do
+
+typedef struct taskqueue_state_thread_s
+{
+       void *handle;
+       unsigned int quit;
+       unsigned int thread_index;
+       unsigned int tasks_completed;
+
+       unsigned int enqueueposition;
+       unsigned int dequeueposition;
+       taskqueue_task_t *queue[THREADTASKS];
+}
+taskqueue_state_thread_t;
+
+typedef struct taskqueue_state_s
+{
+       // TaskQueue_DistributeTasks cycles through the threads when assigning, each has its own queue
+       unsigned int enqueuethread;
+       int numthreads;
+       taskqueue_state_thread_t threads[MAXTHREADS];
+
+       // synchronization point for enqueue and some other memory access
+       Thread_SpinLock command_lock;
+
+       // distributor queue (not assigned to threads yet, or waiting on other tasks)
+       unsigned int queue_enqueueposition;
+       unsigned int queue_dequeueposition;
+       unsigned int queue_size;
+       taskqueue_task_t **queue_data;
+
+       // metrics to balance workload vs cpu resources
+       unsigned int tasks_recentframesindex;
+       unsigned int tasks_recentframes[RECENTFRAMES];
+       unsigned int tasks_thisframe;
+       unsigned int tasks_averageperframe;
+}
+taskqueue_state_t;
+
+static taskqueue_state_t taskqueue_state;
+
+void TaskQueue_Init(void)
+{
+       Cvar_RegisterVariable(&taskqueue_minthreads);
+       Cvar_RegisterVariable(&taskqueue_maxthreads);
+       Cvar_RegisterVariable(&taskqueue_tasksperthread);
+}
+
+void TaskQueue_Shutdown(void)
+{
+       if (taskqueue_state.numthreads)
+               TaskQueue_Frame(true);
+}
+
+static void TaskQueue_ExecuteTask(taskqueue_task_t *t)
+{
+       // see if t is waiting on something
+       if (t->preceding && t->preceding->done == 0)
+               TaskQueue_Yield(t);
+       else
+               t->func(t);
+}
+
+// FIXME: don't use mutex
+// FIXME: this is basically fibers but less featureful - context switching for yield is not implemented
+static int TaskQueue_ThreadFunc(void *d)
+{
+       taskqueue_state_thread_t *s = (taskqueue_state_thread_t *)d;
+       unsigned int sleepcounter = 0;
+       for (;;)
+       {
+               qboolean quit;
+               while (s->dequeueposition != s->enqueueposition)
+               {
+                       taskqueue_task_t *t = s->queue[s->dequeueposition % THREADTASKS];
+                       TaskQueue_ExecuteTask(t);
+                       // when we advance, also clear the pointer for good measure
+                       s->queue[s->dequeueposition++ % THREADTASKS] = NULL;
+                       sleepcounter = 0;
+               }
+               Thread_AtomicLock(&taskqueue_state.command_lock);
+               quit = s->quit != 0;
+               Thread_AtomicUnlock(&taskqueue_state.command_lock);
+               if (quit)
+                       break;
+               sleepcounter++;
+               if (sleepcounter >= THREADSLEEPCOUNT)
+                       Sys_Sleep(1000);
+               sleepcounter = 0;
+       }
+       return 0;
+}
+
+void TaskQueue_Enqueue(int numtasks, taskqueue_task_t *tasks)
+{
+       int i;
+       Thread_AtomicLock(&taskqueue_state.command_lock);
+       if (taskqueue_state.queue_size <
+               (taskqueue_state.queue_enqueueposition < taskqueue_state.queue_dequeueposition ? taskqueue_state.queue_size : 0) +
+               taskqueue_state.queue_enqueueposition - taskqueue_state.queue_dequeueposition + numtasks)
+       {
+               // we have to grow the queue...
+               unsigned int newsize = (taskqueue_state.queue_size + numtasks) * 2;
+               if (newsize < 1024)
+                       newsize = 1024;
+               taskqueue_state.queue_data = Mem_Realloc(zonemempool, taskqueue_state.queue_data, sizeof(*taskqueue_state.queue_data) * newsize);
+               taskqueue_state.queue_size = newsize;
+       }
+       for (i = 0; i < numtasks; i++)
+       {
+               if (tasks[i].yieldcount == 0)
+                       taskqueue_state.tasks_thisframe++;
+               taskqueue_state.queue_data[taskqueue_state.queue_enqueueposition] = &tasks[i];
+               taskqueue_state.queue_enqueueposition++;
+               if (taskqueue_state.queue_enqueueposition >= taskqueue_state.queue_size)
+                       taskqueue_state.queue_enqueueposition = 0;
+       }
+       Thread_AtomicUnlock(&taskqueue_state.command_lock);
+}
+
+// if the task can not be completed due yet to preconditions, just enqueue it again...
+void TaskQueue_Yield(taskqueue_task_t *t)
+{
+       t->yieldcount++;
+       TaskQueue_Enqueue(1, t);
+}
+
+qboolean TaskQueue_IsDone(taskqueue_task_t *t)
+{
+       return !t->done != 0;
+}
+
+void TaskQueue_DistributeTasks(void)
+{
+       Thread_AtomicLock(&taskqueue_state.command_lock);
+       if (taskqueue_state.numthreads > 0)
+       {
+               unsigned int attempts = taskqueue_state.numthreads;
+               while (attempts-- > 0 && taskqueue_state.queue_enqueueposition != taskqueue_state.queue_dequeueposition)
+               {
+                       taskqueue_task_t *t = taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition];
+                       if (t->preceding && t->preceding->done == 0)
+                       {
+                               // task is waiting on something
+                               // first dequeue it properly
+                               taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition] = NULL;
+                               taskqueue_state.queue_dequeueposition++;
+                               if (taskqueue_state.queue_dequeueposition >= taskqueue_state.queue_size)
+                                       taskqueue_state.queue_dequeueposition = 0;
+                               // now put it back in the distributor queue - we know there is room because we just made room
+                               taskqueue_state.queue_data[taskqueue_state.queue_enqueueposition] = t;
+                               taskqueue_state.queue_enqueueposition++;
+                               if (taskqueue_state.queue_enqueueposition >= taskqueue_state.queue_size)
+                                       taskqueue_state.queue_enqueueposition = 0;
+                               // we do not refresh the attempt counter here to avoid deadlock - quite often the only things sitting in the distributor queue are waiting on other tasks
+                       }
+                       else
+                       {
+                               taskqueue_state_thread_t *s = &taskqueue_state.threads[taskqueue_state.enqueuethread];
+                               if (s->enqueueposition - s->dequeueposition < THREADTASKS)
+                               {
+                                       // add the task to the thread's queue
+                                       s->queue[(s->enqueueposition++) % THREADTASKS] = t;
+                                       // since we succeeded in assigning the task, advance the distributor queue
+                                       taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition] = NULL;
+                                       taskqueue_state.queue_dequeueposition++;
+                                       if (taskqueue_state.queue_dequeueposition >= taskqueue_state.queue_size)
+                                               taskqueue_state.queue_dequeueposition = 0;
+                                       // refresh our attempt counter because we did manage to assign something to a thread
+                                       attempts = taskqueue_state.numthreads;
+                               }
+                       }
+               }
+       }
+       Thread_AtomicUnlock(&taskqueue_state.command_lock);
+       // execute one pending task on the distributor queue, this matters if numthreads is 0
+       if (taskqueue_state.queue_dequeueposition != taskqueue_state.queue_enqueueposition)
+       {
+               taskqueue_task_t *t = taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition];
+               taskqueue_state.queue_dequeueposition++;
+               if (taskqueue_state.queue_dequeueposition >= taskqueue_state.queue_size)
+                       taskqueue_state.queue_dequeueposition = 0;
+               if (t)
+                       TaskQueue_ExecuteTask(t);
+       }
+}
+
+void TaskQueue_WaitForTaskDone(taskqueue_task_t *t)
+{
+       qboolean done = false;
+       for (;;)
+       {
+               Thread_AtomicLock(&taskqueue_state.command_lock);
+               done = t->done != 0;
+               Thread_AtomicUnlock(&taskqueue_state.command_lock);
+               if (done)
+                       break;
+               TaskQueue_DistributeTasks();
+       }
+}
+
+void TaskQueue_Frame(qboolean shutdown)
+{
+       int i;
+       unsigned long long int avg;
+       int maxthreads = bound(0, taskqueue_maxthreads.integer, MAXTHREADS);
+       int numthreads = maxthreads;
+       int tasksperthread = bound(10, taskqueue_tasksperthread.integer, 100000);
+#ifdef THREADDISABLE
+       numthreads = 0;
+#endif
+
+       Thread_AtomicLock(&taskqueue_state.command_lock);
+       taskqueue_state.tasks_recentframesindex = (taskqueue_state.tasks_recentframesindex + 1) % RECENTFRAMES;
+       taskqueue_state.tasks_recentframes[taskqueue_state.tasks_recentframesindex] = taskqueue_state.tasks_thisframe;
+       taskqueue_state.tasks_thisframe = 0;
+       avg = 0;
+       for (i = 0; i < RECENTFRAMES; i++)
+               avg += taskqueue_state.tasks_recentframes[i];
+       taskqueue_state.tasks_averageperframe = avg / RECENTFRAMES;
+       Thread_AtomicUnlock(&taskqueue_state.command_lock);
+
+       numthreads = taskqueue_state.tasks_averageperframe / tasksperthread;
+       numthreads = bound(taskqueue_minthreads.integer, numthreads, taskqueue_maxthreads.integer);
+
+       if (shutdown)
+               numthreads = 0;
+
+       // check if we need to close some threads
+       if (taskqueue_state.numthreads > numthreads)
+       {
+               // tell extra threads to quit
+               Thread_AtomicLock(&taskqueue_state.command_lock);
+               for (i = numthreads; i < taskqueue_state.numthreads; i++)
+                       taskqueue_state.threads[i].quit = 1;
+               Thread_AtomicUnlock(&taskqueue_state.command_lock);
+               for (i = numthreads; i < taskqueue_state.numthreads; i++)
+               {
+                       if (taskqueue_state.threads[i].handle)
+                               Thread_WaitThread(taskqueue_state.threads[i].handle, 0);
+                       taskqueue_state.threads[i].handle = NULL;
+               }
+               // okay we're at the new state now
+               taskqueue_state.numthreads = numthreads;
+       }
+
+       // check if we need to start more threads
+       if (taskqueue_state.numthreads < numthreads)
+       {
+               // make sure we're not telling new threads to just quit on startup
+               Thread_AtomicLock(&taskqueue_state.command_lock);
+               for (i = taskqueue_state.numthreads; i < numthreads; i++)
+                       taskqueue_state.threads[i].quit = 0;
+               Thread_AtomicUnlock(&taskqueue_state.command_lock);
+
+               // start new threads
+               for (i = taskqueue_state.numthreads; i < numthreads; i++)
+               {
+                       taskqueue_state.threads[i].thread_index = i;
+                       taskqueue_state.threads[i].handle = Thread_CreateThread(TaskQueue_ThreadFunc, &taskqueue_state.threads[i]);
+               }
+
+               // okay we're at the new state now
+               taskqueue_state.numthreads = numthreads;
+       }
+
+       // just for good measure, distribute any pending tasks that span across frames
+       TaskQueue_DistributeTasks();
+}
+
+void TaskQueue_Setup(taskqueue_task_t *t, taskqueue_task_t *preceding, void(*func)(taskqueue_task_t *), size_t i0, size_t i1, void *p0, void *p1)
+{
+       memset(t, 0, sizeof(*t));
+       t->preceding = preceding;
+       t->func = func;
+       t->i[0] = i0;
+       t->i[1] = i1;
+       t->p[0] = p0;
+       t->p[1] = p1;
+}
+
+void TaskQueue_Task_CheckTasksDone(taskqueue_task_t *t)
+{
+       size_t numtasks = t->i[0];
+       taskqueue_task_t *tasks = t->p[0];
+       while (numtasks > 0)
+       {
+               // check the last task first as it's usually going to be the last to finish, so we do the least work by checking it first
+               if (!tasks[numtasks - 1].done)
+               {
+                       // update our partial progress, then yield to another pending task.
+                       t->i[0] = numtasks;
+                       // set our preceding task to one of the ones we are watching for
+                       t->preceding = &tasks[numtasks - 1];
+                       TaskQueue_Yield(t);
+                       return;
+               }
+               numtasks--;
+       }
+       t->done = 1;
+}
index c7a53d981e6b0766b3e9da2c55c6b3d6f0e0309f..9e8741f4407894349e70496a00e3156125b89cc5 100644 (file)
@@ -1,50 +1,50 @@
-\r
-#ifndef TASKQUEUE_H\r
-#define TASKQUEUE_H\r
-\r
-#include "qtypes.h"\r
-#include "thread.h"\r
-\r
-typedef struct taskqueue_task_s\r
-{\r
-       // if not NULL, this task must be done before this one will dequeue (faster than simply calling TaskQueue_Yield immediately)\r
-       struct taskqueue_task_s *preceding;\r
-\r
-       // use TaskQueue_IsDone() to poll done status\r
-       volatile int done;\r
-\r
-       // function to call, and parameters for it to use\r
-       void(*func)(struct taskqueue_task_s *task);\r
-       // general purpose parameters\r
-       void *p[2];\r
-       size_t i[2];\r
-\r
-       unsigned int yieldcount; // number of times this task has been requeued - each task counts only once for purposes of tasksperthread averaging\r
-}\r
-taskqueue_task_t;\r
-\r
-// queue the tasks to be executed, but does not start them (until TaskQueue_WaitforTaskDone is called)\r
-void TaskQueue_Enqueue(int numtasks, taskqueue_task_t *tasks);\r
-\r
-// if the task can not be completed due yet to preconditions, just enqueue it again...\r
-void TaskQueue_Yield(taskqueue_task_t *t);\r
-\r
-// polls for status of task and returns the result, does not cause tasks to be executed (see TaskQueue_WaitForTaskDone for that)\r
-qboolean TaskQueue_IsDone(taskqueue_task_t *t);\r
-\r
-// triggers execution of queued tasks, and waits for the specified task to be done\r
-void TaskQueue_WaitForTaskDone(taskqueue_task_t *t);\r
-\r
-// convenience function for setting up a task structure.  Does not do the Enqueue, just fills in the struct.\r
-void TaskQueue_Setup(taskqueue_task_t *t, taskqueue_task_t *preceding, void(*func)(taskqueue_task_t *), size_t i0, size_t i1, void *p0, void *p1);\r
-\r
-// general purpose tasks\r
-// t->i[0] = number of tasks in array\r
-// t->p[0] = array of taskqueue_task_t to check\r
-void TaskQueue_Task_CheckTasksDone(taskqueue_task_t *t);\r
-\r
-void TaskQueue_Init(void);\r
-void TaskQueue_Shutdown(void);\r
-void TaskQueue_Frame(qboolean shutdown);\r
-\r
-#endif\r
+
+#ifndef TASKQUEUE_H
+#define TASKQUEUE_H
+
+#include "qtypes.h"
+#include "thread.h"
+
+typedef struct taskqueue_task_s
+{
+       // if not NULL, this task must be done before this one will dequeue (faster than simply calling TaskQueue_Yield immediately)
+       struct taskqueue_task_s *preceding;
+
+       // use TaskQueue_IsDone() to poll done status
+       volatile int done;
+
+       // function to call, and parameters for it to use
+       void(*func)(struct taskqueue_task_s *task);
+       // general purpose parameters
+       void *p[2];
+       size_t i[2];
+
+       unsigned int yieldcount; // number of times this task has been requeued - each task counts only once for purposes of tasksperthread averaging
+}
+taskqueue_task_t;
+
+// queue the tasks to be executed, but does not start them (until TaskQueue_WaitforTaskDone is called)
+void TaskQueue_Enqueue(int numtasks, taskqueue_task_t *tasks);
+
+// if the task can not be completed due yet to preconditions, just enqueue it again...
+void TaskQueue_Yield(taskqueue_task_t *t);
+
+// polls for status of task and returns the result, does not cause tasks to be executed (see TaskQueue_WaitForTaskDone for that)
+qboolean TaskQueue_IsDone(taskqueue_task_t *t);
+
+// triggers execution of queued tasks, and waits for the specified task to be done
+void TaskQueue_WaitForTaskDone(taskqueue_task_t *t);
+
+// convenience function for setting up a task structure.  Does not do the Enqueue, just fills in the struct.
+void TaskQueue_Setup(taskqueue_task_t *t, taskqueue_task_t *preceding, void(*func)(taskqueue_task_t *), size_t i0, size_t i1, void *p0, void *p1);
+
+// general purpose tasks
+// t->i[0] = number of tasks in array
+// t->p[0] = array of taskqueue_task_t to check
+void TaskQueue_Task_CheckTasksDone(taskqueue_task_t *t);
+
+void TaskQueue_Init(void);
+void TaskQueue_Shutdown(void);
+void TaskQueue_Frame(qboolean shutdown);
+
+#endif