From: havoc <havoc@d7cf8633-e32d-0410-b094-e92efae38249>
Date: Sat, 18 Jan 2020 05:14:06 +0000 (+0000)
Subject: Added taskqueue.[ch].
X-Git-Url: https://git.rm.cloudns.org/?a=commitdiff_plain;h=020178ff4f855970d5233c3d3d4becc8c80875e5;p=xonotic%2Fdarkplaces.git

Added taskqueue.[ch].

git-svn-id: svn://svn.icculus.org/twilight/trunk/darkplaces@12496 d7cf8633-e32d-0410-b094-e92efae38249
---

diff --git a/taskqueue.c b/taskqueue.c
new file mode 100644
index 00000000..9ef40a65
--- /dev/null
+++ b/taskqueue.c
@@ -0,0 +1,246 @@
+#include "quakedef.h"
+#include "taskqueue.h"
+
+cvar_t taskqueue_maxthreads = { CVAR_SAVE, "taskqueue_maxthreads", "32", "how many threads to use for executing tasks" };
+cvar_t taskqueue_linkedlist = { CVAR_SAVE, "taskqueue_linkedlist", "1", "whether to use a doubly linked list or an array for the FIFO queue" };
+
+typedef struct taskqueue_state_thread_s
+{
+	void *handle;
+}
+taskqueue_state_thread_t;
+
+typedef struct taskqueue_state_s
+{
+	int numthreads;
+	taskqueue_state_thread_t threads[1024];
+
+	// we can enqueue this many tasks before execution of them must proceed
+	int queue_used;
+	int queue_max; // size of queue array
+	taskqueue_task_t **queue_tasks;
+
+	// command 
+	Thread_SpinLock command_lock;
+
+	int threads_quit;
+
+	// doubly linked list - enqueue pushes to list.prev, dequeue pops from list.next
+	taskqueue_task_t list;
+}
+taskqueue_state_t;
+
+static taskqueue_state_t taskqueue_state;
+
+int TaskQueue_Init(void)
+{
+	Cvar_RegisterVariable(&taskqueue_maxthreads);
+	Cvar_RegisterVariable(&taskqueue_linkedlist);
+	// initialize the doubly-linked list header
+	taskqueue_state.list.next = &taskqueue_state.list;
+	taskqueue_state.list.prev = &taskqueue_state.list;
+	return 0;
+}
+
+void TaskQueue_Shutdown(void)
+{
+	if (taskqueue_state.numthreads)
+		TaskQueue_Frame(true);
+	if (taskqueue_state.queue_tasks)
+		Mem_Free(taskqueue_state.queue_tasks);
+	taskqueue_state.queue_tasks = NULL;
+}
+
+static taskqueue_task_t *TaskQueue_GetPending(void)
+{
+	taskqueue_task_t *t = NULL;
+	if (taskqueue_state.list.next != &taskqueue_state.list)
+	{
+		// pop from list.next
+		t = taskqueue_state.list.next;
+		t->next->prev = t->prev;
+		t->prev->next = t->next;
+		t->prev = t->next = NULL;
+	}
+	if (t == NULL)
+	{
+		if (taskqueue_state.queue_used > 0)
+		{
+			t = taskqueue_state.queue_tasks[0];
+			taskqueue_state.queue_used--;
+			memmove(taskqueue_state.queue_tasks, taskqueue_state.queue_tasks + 1, taskqueue_state.queue_used * sizeof(taskqueue_task_t *));
+			taskqueue_state.queue_tasks[taskqueue_state.queue_used] = NULL;
+		}
+	}
+	return t;
+}
+
+static void TaskQueue_ExecuteTask(taskqueue_task_t *t)
+{
+	// see if t is waiting on something
+	if (t->preceding && t->preceding->done == 0)
+		TaskQueue_Yield(t);
+	else
+		t->func(t);
+}
+
+// FIXME: don't use mutex
+// FIXME: this is basically fibers but less featureful - context switching for yield is not implemented
+static int TaskQueue_ThreadFunc(void *d)
+{
+	for (;;)
+	{
+		qboolean quit;
+		taskqueue_task_t *t = NULL;
+		Thread_AtomicLock(&taskqueue_state.command_lock);
+		quit = taskqueue_state.threads_quit != 0;
+		t = TaskQueue_GetPending();
+		Thread_AtomicUnlock(&taskqueue_state.command_lock);
+		if (t)
+			TaskQueue_ExecuteTask(t);
+		else if (quit)
+			break;
+	}
+	return 0;
+}
+
+void TaskQueue_Execute(qboolean force)
+{
+	// if we have no threads to run the tasks, just start executing them now
+	if (taskqueue_state.numthreads == 0 || force)
+	{
+		for (;;)
+		{
+			taskqueue_task_t *t = NULL;
+			Thread_AtomicLock(&taskqueue_state.command_lock);
+			t = TaskQueue_GetPending();
+			Thread_AtomicUnlock(&taskqueue_state.command_lock);
+			if (!t)
+				break;
+			TaskQueue_ExecuteTask(t);
+		}
+	}
+}
+
+void TaskQueue_Enqueue(int numtasks, taskqueue_task_t *tasks)
+{
+	int i;
+	// try not to spinlock for a long time by breaking up large enqueues
+	while (numtasks > 64)
+	{
+		TaskQueue_Enqueue(64, tasks);
+		tasks += 64;
+		numtasks -= 64;
+	}
+	Thread_AtomicLock(&taskqueue_state.command_lock);
+	for (i = 0; i < numtasks; i++)
+	{
+		taskqueue_task_t *t = &tasks[i];
+		if (taskqueue_linkedlist.integer)
+		{
+			// push to list.prev
+			t->next = &taskqueue_state.list;
+			t->prev = taskqueue_state.list.prev;
+			t->next->prev = t;
+			t->prev->next = t;
+		}
+		else
+		{
+			if (taskqueue_state.queue_used >= taskqueue_state.queue_max)
+			{
+				taskqueue_state.queue_max *= 2;
+				if (taskqueue_state.queue_max < 1024)
+					taskqueue_state.queue_max = 1024;
+				taskqueue_state.queue_tasks = (taskqueue_task_t **)Mem_Realloc(cls.permanentmempool, taskqueue_state.queue_tasks, taskqueue_state.queue_max * sizeof(taskqueue_task_t *));
+			}
+			taskqueue_state.queue_tasks[taskqueue_state.queue_used++] = t;
+		}
+	}
+	Thread_AtomicUnlock(&taskqueue_state.command_lock);
+}
+
+// if the task can not be completed due yet to preconditions, just enqueue it again...
+void TaskQueue_Yield(taskqueue_task_t *t)
+{
+	t->yieldcount++;
+	TaskQueue_Enqueue(1, t);
+}
+
+qboolean TaskQueue_IsDone(taskqueue_task_t *t)
+{
+	return !t->done != 0;
+}
+
+void TaskQueue_WaitForTaskDone(taskqueue_task_t *t)
+{
+	qboolean done = false;
+	while (!done)
+	{
+		Thread_AtomicLock(&taskqueue_state.command_lock);
+		done = t->done != 0;
+		Thread_AtomicUnlock(&taskqueue_state.command_lock);
+		// if there are no threads, just execute the tasks immediately
+		if (!done && taskqueue_state.numthreads == 0)
+			TaskQueue_Execute(true);
+	}
+}
+
+void TaskQueue_Frame(qboolean shutdown)
+{
+	int numthreads = shutdown ? 0 : bound(0, taskqueue_maxthreads.integer, sizeof(taskqueue_state.threads) / sizeof(taskqueue_state.threads[0]));
+#ifdef THREADDISABLE
+	numthreads = 0;
+#endif
+	if (taskqueue_state.numthreads != numthreads)
+	{
+		int i;
+		Thread_AtomicLock(&taskqueue_state.command_lock);
+		taskqueue_state.threads_quit = 1;
+		Thread_AtomicUnlock(&taskqueue_state.command_lock);
+		for (i = 0; i < taskqueue_state.numthreads; i++)
+		{
+			if (taskqueue_state.threads[i].handle)
+				Thread_WaitThread(taskqueue_state.threads[i].handle, 0);
+			taskqueue_state.threads[i].handle = NULL;
+		}
+		Thread_AtomicLock(&taskqueue_state.command_lock);
+		taskqueue_state.threads_quit = 0;
+		Thread_AtomicUnlock(&taskqueue_state.command_lock);
+		taskqueue_state.numthreads = numthreads;
+		for (i = 0; i < taskqueue_state.numthreads; i++)
+			taskqueue_state.threads[i].handle = Thread_CreateThread(TaskQueue_ThreadFunc, &taskqueue_state.threads[i]);
+		// if there are still pending tasks (e.g. no threads), execute them on main thread now
+		TaskQueue_Execute(true);
+	}
+}
+
+void TaskQueue_Setup(taskqueue_task_t *t, taskqueue_task_t *preceding, void(*func)(taskqueue_task_t *), size_t i0, size_t i1, void *p0, void *p1)
+{
+	memset(t, 0, sizeof(*t));
+	t->preceding = preceding;
+	t->func = func;
+	t->i[0] = i0;
+	t->i[1] = i1;
+	t->p[0] = p0;
+	t->p[1] = p1;
+}
+
+void TaskQueue_Task_CheckTasksDone(taskqueue_task_t *t)
+{
+	size_t numtasks = t->i[0];
+	taskqueue_task_t *tasks = t->p[0];
+	while (numtasks > 0)
+	{
+		// check the last task first as it's usually going to be the last to finish, so we do the least work by checking it first
+		if (!tasks[numtasks - 1].done)
+		{
+			// update our partial progress, then yield to another pending task.
+			t->i[0] = numtasks;
+			TaskQueue_Yield(t);
+			return;
+		}
+		numtasks--;
+	}
+	t->started = 1;
+	t->done = 1;
+}
diff --git a/taskqueue.h b/taskqueue.h
new file mode 100644
index 00000000..e233032d
--- /dev/null
+++ b/taskqueue.h
@@ -0,0 +1,58 @@
+
+#ifndef TASKQUEUE_H
+#define TASKQUEUE_H
+
+#include "qtypes.h"
+#include "thread.h"
+
+typedef struct taskqueue_task_s
+{
+	// doubly linked list
+	struct taskqueue_task_s * volatile prev;
+	struct taskqueue_task_s * volatile next;
+
+	// if not NULL, this task must be done before this one will dequeue (faster than simply Yielding immediately)
+	struct taskqueue_task_s *preceding;
+
+	// see TaskQueue_IsDone() to use proper atomics to poll done status
+	volatile int started;
+	volatile int done;
+
+	// function to call, and parameters for it to use
+	void(*func)(struct taskqueue_task_s *task);
+	void *p[4];
+	size_t i[4];
+
+	// stats:
+	unsigned int yieldcount; // number of times this task has been requeued
+}
+taskqueue_task_t;
+
+// immediately execute any pending tasks if threading is disabled (or if force is true)
+// TRY NOT TO USE THIS IF POSSIBLE - poll task->done instead.
+void TaskQueue_Execute(qboolean force);
+
+// queue the tasks to be executed, or executes them immediately if threading is disabled.
+void TaskQueue_Enqueue(int numtasks, taskqueue_task_t *tasks);
+
+// if the task can not be completed due yet to preconditions, just enqueue it again...
+void TaskQueue_Yield(taskqueue_task_t *t);
+
+// polls for status of task and returns the result immediately - use this instead of checking ->done directly, as this uses atomics
+qboolean TaskQueue_IsDone(taskqueue_task_t *t);
+
+// polls for status of task and waits for it to be done
+void TaskQueue_WaitForTaskDone(taskqueue_task_t *t);
+
+// updates thread count based on the cvar.
+void TaskQueue_Frame(qboolean shutdown);
+
+// convenience function for setting up a task structure.  Does not do the Enqueue, just fills in the struct.
+void TaskQueue_Setup(taskqueue_task_t *t, taskqueue_task_t *preceding, void(*func)(taskqueue_task_t *), size_t i0, size_t i1, void *p0, void *p1);
+
+// general purpose tasks
+// t->i[0] = number of tasks in array
+// t->p[0] = array of taskqueue_task_t to check
+void TaskQueue_Task_CheckTasksDone(taskqueue_task_t *t);
+
+#endif