From: havoc Date: Sat, 18 Jan 2020 05:14:06 +0000 (+0000) Subject: Added taskqueue.[ch]. X-Git-Url: https://git.rm.cloudns.org/?a=commitdiff_plain;h=020178ff4f855970d5233c3d3d4becc8c80875e5;p=xonotic%2Fdarkplaces.git Added taskqueue.[ch]. git-svn-id: svn://svn.icculus.org/twilight/trunk/darkplaces@12496 d7cf8633-e32d-0410-b094-e92efae38249 --- diff --git a/taskqueue.c b/taskqueue.c new file mode 100644 index 00000000..9ef40a65 --- /dev/null +++ b/taskqueue.c @@ -0,0 +1,246 @@ +#include "quakedef.h" +#include "taskqueue.h" + +cvar_t taskqueue_maxthreads = { CVAR_SAVE, "taskqueue_maxthreads", "32", "how many threads to use for executing tasks" }; +cvar_t taskqueue_linkedlist = { CVAR_SAVE, "taskqueue_linkedlist", "1", "whether to use a doubly linked list or an array for the FIFO queue" }; + +typedef struct taskqueue_state_thread_s +{ + void *handle; +} +taskqueue_state_thread_t; + +typedef struct taskqueue_state_s +{ + int numthreads; + taskqueue_state_thread_t threads[1024]; + + // we can enqueue this many tasks before execution of them must proceed + int queue_used; + int queue_max; // size of queue array + taskqueue_task_t **queue_tasks; + + // command + Thread_SpinLock command_lock; + + int threads_quit; + + // doubly linked list - enqueue pushes to list.prev, dequeue pops from list.next + taskqueue_task_t list; +} +taskqueue_state_t; + +static taskqueue_state_t taskqueue_state; + +int TaskQueue_Init(void) +{ + Cvar_RegisterVariable(&taskqueue_maxthreads); + Cvar_RegisterVariable(&taskqueue_linkedlist); + // initialize the doubly-linked list header + taskqueue_state.list.next = &taskqueue_state.list; + taskqueue_state.list.prev = &taskqueue_state.list; + return 0; +} + +void TaskQueue_Shutdown(void) +{ + if (taskqueue_state.numthreads) + TaskQueue_Frame(true); + if (taskqueue_state.queue_tasks) + Mem_Free(taskqueue_state.queue_tasks); + taskqueue_state.queue_tasks = NULL; +} + +static taskqueue_task_t *TaskQueue_GetPending(void) +{ + taskqueue_task_t *t = NULL; + if (taskqueue_state.list.next != &taskqueue_state.list) + { + // pop from list.next + t = taskqueue_state.list.next; + t->next->prev = t->prev; + t->prev->next = t->next; + t->prev = t->next = NULL; + } + if (t == NULL) + { + if (taskqueue_state.queue_used > 0) + { + t = taskqueue_state.queue_tasks[0]; + taskqueue_state.queue_used--; + memmove(taskqueue_state.queue_tasks, taskqueue_state.queue_tasks + 1, taskqueue_state.queue_used * sizeof(taskqueue_task_t *)); + taskqueue_state.queue_tasks[taskqueue_state.queue_used] = NULL; + } + } + return t; +} + +static void TaskQueue_ExecuteTask(taskqueue_task_t *t) +{ + // see if t is waiting on something + if (t->preceding && t->preceding->done == 0) + TaskQueue_Yield(t); + else + t->func(t); +} + +// FIXME: don't use mutex +// FIXME: this is basically fibers but less featureful - context switching for yield is not implemented +static int TaskQueue_ThreadFunc(void *d) +{ + for (;;) + { + qboolean quit; + taskqueue_task_t *t = NULL; + Thread_AtomicLock(&taskqueue_state.command_lock); + quit = taskqueue_state.threads_quit != 0; + t = TaskQueue_GetPending(); + Thread_AtomicUnlock(&taskqueue_state.command_lock); + if (t) + TaskQueue_ExecuteTask(t); + else if (quit) + break; + } + return 0; +} + +void TaskQueue_Execute(qboolean force) +{ + // if we have no threads to run the tasks, just start executing them now + if (taskqueue_state.numthreads == 0 || force) + { + for (;;) + { + taskqueue_task_t *t = NULL; + Thread_AtomicLock(&taskqueue_state.command_lock); + t = TaskQueue_GetPending(); + Thread_AtomicUnlock(&taskqueue_state.command_lock); + if (!t) + break; + TaskQueue_ExecuteTask(t); + } + } +} + +void TaskQueue_Enqueue(int numtasks, taskqueue_task_t *tasks) +{ + int i; + // try not to spinlock for a long time by breaking up large enqueues + while (numtasks > 64) + { + TaskQueue_Enqueue(64, tasks); + tasks += 64; + numtasks -= 64; + } + Thread_AtomicLock(&taskqueue_state.command_lock); + for (i = 0; i < numtasks; i++) + { + taskqueue_task_t *t = &tasks[i]; + if (taskqueue_linkedlist.integer) + { + // push to list.prev + t->next = &taskqueue_state.list; + t->prev = taskqueue_state.list.prev; + t->next->prev = t; + t->prev->next = t; + } + else + { + if (taskqueue_state.queue_used >= taskqueue_state.queue_max) + { + taskqueue_state.queue_max *= 2; + if (taskqueue_state.queue_max < 1024) + taskqueue_state.queue_max = 1024; + taskqueue_state.queue_tasks = (taskqueue_task_t **)Mem_Realloc(cls.permanentmempool, taskqueue_state.queue_tasks, taskqueue_state.queue_max * sizeof(taskqueue_task_t *)); + } + taskqueue_state.queue_tasks[taskqueue_state.queue_used++] = t; + } + } + Thread_AtomicUnlock(&taskqueue_state.command_lock); +} + +// if the task can not be completed due yet to preconditions, just enqueue it again... +void TaskQueue_Yield(taskqueue_task_t *t) +{ + t->yieldcount++; + TaskQueue_Enqueue(1, t); +} + +qboolean TaskQueue_IsDone(taskqueue_task_t *t) +{ + return !t->done != 0; +} + +void TaskQueue_WaitForTaskDone(taskqueue_task_t *t) +{ + qboolean done = false; + while (!done) + { + Thread_AtomicLock(&taskqueue_state.command_lock); + done = t->done != 0; + Thread_AtomicUnlock(&taskqueue_state.command_lock); + // if there are no threads, just execute the tasks immediately + if (!done && taskqueue_state.numthreads == 0) + TaskQueue_Execute(true); + } +} + +void TaskQueue_Frame(qboolean shutdown) +{ + int numthreads = shutdown ? 0 : bound(0, taskqueue_maxthreads.integer, sizeof(taskqueue_state.threads) / sizeof(taskqueue_state.threads[0])); +#ifdef THREADDISABLE + numthreads = 0; +#endif + if (taskqueue_state.numthreads != numthreads) + { + int i; + Thread_AtomicLock(&taskqueue_state.command_lock); + taskqueue_state.threads_quit = 1; + Thread_AtomicUnlock(&taskqueue_state.command_lock); + for (i = 0; i < taskqueue_state.numthreads; i++) + { + if (taskqueue_state.threads[i].handle) + Thread_WaitThread(taskqueue_state.threads[i].handle, 0); + taskqueue_state.threads[i].handle = NULL; + } + Thread_AtomicLock(&taskqueue_state.command_lock); + taskqueue_state.threads_quit = 0; + Thread_AtomicUnlock(&taskqueue_state.command_lock); + taskqueue_state.numthreads = numthreads; + for (i = 0; i < taskqueue_state.numthreads; i++) + taskqueue_state.threads[i].handle = Thread_CreateThread(TaskQueue_ThreadFunc, &taskqueue_state.threads[i]); + // if there are still pending tasks (e.g. no threads), execute them on main thread now + TaskQueue_Execute(true); + } +} + +void TaskQueue_Setup(taskqueue_task_t *t, taskqueue_task_t *preceding, void(*func)(taskqueue_task_t *), size_t i0, size_t i1, void *p0, void *p1) +{ + memset(t, 0, sizeof(*t)); + t->preceding = preceding; + t->func = func; + t->i[0] = i0; + t->i[1] = i1; + t->p[0] = p0; + t->p[1] = p1; +} + +void TaskQueue_Task_CheckTasksDone(taskqueue_task_t *t) +{ + size_t numtasks = t->i[0]; + taskqueue_task_t *tasks = t->p[0]; + while (numtasks > 0) + { + // check the last task first as it's usually going to be the last to finish, so we do the least work by checking it first + if (!tasks[numtasks - 1].done) + { + // update our partial progress, then yield to another pending task. + t->i[0] = numtasks; + TaskQueue_Yield(t); + return; + } + numtasks--; + } + t->started = 1; + t->done = 1; +} diff --git a/taskqueue.h b/taskqueue.h new file mode 100644 index 00000000..e233032d --- /dev/null +++ b/taskqueue.h @@ -0,0 +1,58 @@ + +#ifndef TASKQUEUE_H +#define TASKQUEUE_H + +#include "qtypes.h" +#include "thread.h" + +typedef struct taskqueue_task_s +{ + // doubly linked list + struct taskqueue_task_s * volatile prev; + struct taskqueue_task_s * volatile next; + + // if not NULL, this task must be done before this one will dequeue (faster than simply Yielding immediately) + struct taskqueue_task_s *preceding; + + // see TaskQueue_IsDone() to use proper atomics to poll done status + volatile int started; + volatile int done; + + // function to call, and parameters for it to use + void(*func)(struct taskqueue_task_s *task); + void *p[4]; + size_t i[4]; + + // stats: + unsigned int yieldcount; // number of times this task has been requeued +} +taskqueue_task_t; + +// immediately execute any pending tasks if threading is disabled (or if force is true) +// TRY NOT TO USE THIS IF POSSIBLE - poll task->done instead. +void TaskQueue_Execute(qboolean force); + +// queue the tasks to be executed, or executes them immediately if threading is disabled. +void TaskQueue_Enqueue(int numtasks, taskqueue_task_t *tasks); + +// if the task can not be completed due yet to preconditions, just enqueue it again... +void TaskQueue_Yield(taskqueue_task_t *t); + +// polls for status of task and returns the result immediately - use this instead of checking ->done directly, as this uses atomics +qboolean TaskQueue_IsDone(taskqueue_task_t *t); + +// polls for status of task and waits for it to be done +void TaskQueue_WaitForTaskDone(taskqueue_task_t *t); + +// updates thread count based on the cvar. +void TaskQueue_Frame(qboolean shutdown); + +// convenience function for setting up a task structure. Does not do the Enqueue, just fills in the struct. +void TaskQueue_Setup(taskqueue_task_t *t, taskqueue_task_t *preceding, void(*func)(taskqueue_task_t *), size_t i0, size_t i1, void *p0, void *p1); + +// general purpose tasks +// t->i[0] = number of tasks in array +// t->p[0] = array of taskqueue_task_t to check +void TaskQueue_Task_CheckTasksDone(taskqueue_task_t *t); + +#endif