summaryrefslogtreecommitdiffstats
path: root/freebsd/sys/kern/subr_taskqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'freebsd/sys/kern/subr_taskqueue.c')
-rw-r--r--freebsd/sys/kern/subr_taskqueue.c299
1 files changed, 239 insertions, 60 deletions
diff --git a/freebsd/sys/kern/subr_taskqueue.c b/freebsd/sys/kern/subr_taskqueue.c
index 99640026..8580e8fc 100644
--- a/freebsd/sys/kern/subr_taskqueue.c
+++ b/freebsd/sys/kern/subr_taskqueue.c
@@ -32,15 +32,18 @@ __FBSDID("$FreeBSD$");
#include <rtems/bsd/sys/param.h>
#include <sys/systm.h>
#include <sys/bus.h>
+#include <rtems/bsd/sys/cpuset.h>
#include <sys/interrupt.h>
#include <sys/kernel.h>
#include <sys/kthread.h>
+#include <sys/libkern.h>
#include <sys/limits.h>
#include <rtems/bsd/sys/lock.h>
#include <sys/malloc.h>
#include <sys/mutex.h>
#include <sys/proc.h>
#include <sys/sched.h>
+#include <sys/smp.h>
#include <sys/taskqueue.h>
#include <rtems/bsd/sys/unistd.h>
#include <machine/stdarg.h>
@@ -50,16 +53,24 @@ static MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues");
static void *taskqueue_giant_ih;
#endif /* __rtems__ */
static void *taskqueue_ih;
+static void taskqueue_fast_enqueue(void *);
+static void taskqueue_swi_enqueue(void *);
+#ifndef __rtems__
+static void taskqueue_swi_giant_enqueue(void *);
+#endif /* __rtems__ */
struct taskqueue_busy {
struct task *tb_running;
TAILQ_ENTRY(taskqueue_busy) tb_link;
};
+struct task * const TB_DRAIN_WAITER = (struct task *)0x1;
+
struct taskqueue {
STAILQ_HEAD(, task) tq_queue;
taskqueue_enqueue_fn tq_enqueue;
void *tq_context;
+ char *tq_name;
TAILQ_HEAD(, taskqueue_busy) tq_active;
struct mtx tq_mutex;
struct thread **tq_threads;
@@ -69,11 +80,13 @@ struct taskqueue {
#endif /* __rtems__ */
int tq_flags;
int tq_callouts;
+ taskqueue_callback_fn tq_callbacks[TASKQUEUE_NUM_CALLBACKS];
+ void *tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS];
};
#define TQ_FLAGS_ACTIVE (1 << 0)
#define TQ_FLAGS_BLOCKED (1 << 1)
-#define TQ_FLAGS_PENDING (1 << 2)
+#define TQ_FLAGS_UNLOCKED_ENQUEUE (1 << 2)
#define DT_CALLOUT_ARMED (1 << 0)
@@ -85,7 +98,15 @@ struct taskqueue {
else \
mtx_lock(&(tq)->tq_mutex); \
} while (0)
+#else /* __rtems__ */
+#define TQ_LOCK(tq) \
+ do { \
+ mtx_lock(&(tq)->tq_mutex); \
+ } while (0)
+#endif /* __rtems__ */
+#define TQ_ASSERT_LOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_OWNED)
+#ifndef __rtems__
#define TQ_UNLOCK(tq) \
do { \
if ((tq)->tq_spin) \
@@ -94,16 +115,12 @@ struct taskqueue {
mtx_unlock(&(tq)->tq_mutex); \
} while (0)
#else /* __rtems__ */
-#define TQ_LOCK(tq) \
- do { \
- mtx_lock(&(tq)->tq_mutex); \
- } while (0)
-
#define TQ_UNLOCK(tq) \
do { \
mtx_unlock(&(tq)->tq_mutex); \
} while (0)
#endif /* __rtems__ */
+#define TQ_ASSERT_UNLOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_NOTOWNED)
void
_timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task,
@@ -111,7 +128,8 @@ _timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task,
{
TASK_INIT(&timeout_task->t, priority, func, context);
- callout_init_mtx(&timeout_task->c, &queue->tq_mutex, 0);
+ callout_init_mtx(&timeout_task->c, &queue->tq_mutex,
+ CALLOUT_RETURNUNLOCKED);
timeout_task->q = queue;
timeout_task->f = 0;
}
@@ -128,20 +146,30 @@ TQ_SLEEP(struct taskqueue *tq, void *p, struct mtx *m, int pri, const char *wm,
}
static struct taskqueue *
-_taskqueue_create(const char *name __unused, int mflags,
+_taskqueue_create(const char *name, int mflags,
taskqueue_enqueue_fn enqueue, void *context,
- int mtxflags, const char *mtxname)
+ int mtxflags, const char *mtxname __unused)
{
struct taskqueue *queue;
+ char *tq_name;
+
+ tq_name = malloc(TASKQUEUE_NAMELEN, M_TASKQUEUE, mflags | M_ZERO);
+ if (tq_name == NULL)
+ return (NULL);
queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO);
- if (!queue)
- return NULL;
+ if (queue == NULL) {
+ free(tq_name, M_TASKQUEUE);
+ return (NULL);
+ }
+
+ snprintf(tq_name, TASKQUEUE_NAMELEN, "%s", (name) ? name : "taskqueue");
STAILQ_INIT(&queue->tq_queue);
TAILQ_INIT(&queue->tq_active);
queue->tq_enqueue = enqueue;
queue->tq_context = context;
+ queue->tq_name = tq_name;
#ifndef __rtems__
queue->tq_spin = (mtxflags & MTX_SPIN) != 0;
#else /* __rtems__ */
@@ -151,17 +179,42 @@ _taskqueue_create(const char *name __unused, int mflags,
*/
#endif /* __rtems__ */
queue->tq_flags |= TQ_FLAGS_ACTIVE;
- mtx_init(&queue->tq_mutex, mtxname, NULL, mtxflags);
+ if (enqueue == taskqueue_fast_enqueue ||
+ enqueue == taskqueue_swi_enqueue ||
+#ifndef __rtems__
+ enqueue == taskqueue_swi_giant_enqueue ||
+#endif /* __rtems__ */
+ enqueue == taskqueue_thread_enqueue)
+ queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE;
+ mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags);
- return queue;
+ return (queue);
}
struct taskqueue *
taskqueue_create(const char *name, int mflags,
taskqueue_enqueue_fn enqueue, void *context)
{
+
return _taskqueue_create(name, mflags, enqueue, context,
- MTX_DEF, "taskqueue");
+ MTX_DEF, name);
+}
+
+void
+taskqueue_set_callback(struct taskqueue *queue,
+ enum taskqueue_callback_type cb_type, taskqueue_callback_fn callback,
+ void *context)
+{
+
+ KASSERT(((cb_type >= TASKQUEUE_CALLBACK_TYPE_MIN) &&
+ (cb_type <= TASKQUEUE_CALLBACK_TYPE_MAX)),
+ ("Callback type %d not valid, must be %d-%d", cb_type,
+ TASKQUEUE_CALLBACK_TYPE_MIN, TASKQUEUE_CALLBACK_TYPE_MAX));
+ KASSERT((queue->tq_callbacks[cb_type] == NULL),
+ ("Re-initialization of taskqueue callback?"));
+
+ queue->tq_callbacks[cb_type] = callback;
+ queue->tq_cb_contexts[cb_type] = context;
}
/*
@@ -188,6 +241,7 @@ taskqueue_free(struct taskqueue *queue)
KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks"));
mtx_destroy(&queue->tq_mutex);
free(queue->tq_threads, M_TASKQUEUE);
+ free(queue->tq_name, M_TASKQUEUE);
free(queue, M_TASKQUEUE);
}
@@ -197,12 +251,14 @@ taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task)
struct task *ins;
struct task *prev;
+ KASSERT(task->ta_func != NULL, ("enqueueing task with NULL func"));
/*
* Count multiple enqueues.
*/
if (task->ta_pending) {
if (task->ta_pending < USHRT_MAX)
task->ta_pending++;
+ TQ_UNLOCK(queue);
return (0);
}
@@ -226,13 +282,17 @@ taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task)
}
task->ta_pending = 1;
+ if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) != 0)
+ TQ_UNLOCK(queue);
if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0)
queue->tq_enqueue(queue->tq_context);
- else
- queue->tq_flags |= TQ_FLAGS_PENDING;
+ if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) == 0)
+ TQ_UNLOCK(queue);
+ /* Return with lock released. */
return (0);
}
+
int
taskqueue_enqueue(struct taskqueue *queue, struct task *task)
{
@@ -240,7 +300,7 @@ taskqueue_enqueue(struct taskqueue *queue, struct task *task)
TQ_LOCK(queue);
res = taskqueue_enqueue_locked(queue, task);
- TQ_UNLOCK(queue);
+ /* The lock is released inside. */
return (res);
}
@@ -257,6 +317,7 @@ taskqueue_timeout_func(void *arg)
timeout_task->f &= ~DT_CALLOUT_ARMED;
queue->tq_callouts--;
taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t);
+ /* The lock is released inside. */
}
int
@@ -275,6 +336,7 @@ taskqueue_enqueue_timeout(struct taskqueue *queue,
res = timeout_task->t.ta_pending;
if (ticks == 0) {
taskqueue_enqueue_locked(queue, &timeout_task->t);
+ /* The lock is released inside. */
} else {
if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
res++;
@@ -288,18 +350,87 @@ taskqueue_enqueue_timeout(struct taskqueue *queue,
callout_reset(&timeout_task->c, ticks,
taskqueue_timeout_func, timeout_task);
}
+ TQ_UNLOCK(queue);
}
- TQ_UNLOCK(queue);
return (res);
}
static void
-taskqueue_drain_running(struct taskqueue *queue)
+taskqueue_task_nop_fn(void *context, int pending)
+{
+}
+
+/*
+ * Block until all currently queued tasks in this taskqueue
+ * have begun execution. Tasks queued during execution of
+ * this function are ignored.
+ */
+static void
+taskqueue_drain_tq_queue(struct taskqueue *queue)
+{
+ struct task t_barrier;
+
+ if (STAILQ_EMPTY(&queue->tq_queue))
+ return;
+
+ /*
+ * Enqueue our barrier after all current tasks, but with
+ * the highest priority so that newly queued tasks cannot
+ * pass it. Because of the high priority, we can not use
+ * taskqueue_enqueue_locked directly (which drops the lock
+ * anyway) so just insert it at tail while we have the
+ * queue lock.
+ */
+ TASK_INIT(&t_barrier, USHRT_MAX, taskqueue_task_nop_fn, &t_barrier);
+ STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link);
+ t_barrier.ta_pending = 1;
+
+ /*
+ * Once the barrier has executed, all previously queued tasks
+ * have completed or are currently executing.
+ */
+ while (t_barrier.ta_pending != 0)
+ TQ_SLEEP(queue, &t_barrier, &queue->tq_mutex, PWAIT, "-", 0);
+}
+
+/*
+ * Block until all currently executing tasks for this taskqueue
+ * complete. Tasks that begin execution during the execution
+ * of this function are ignored.
+ */
+static void
+taskqueue_drain_tq_active(struct taskqueue *queue)
{
+ struct taskqueue_busy tb_marker, *tb_first;
+
+ if (TAILQ_EMPTY(&queue->tq_active))
+ return;
+
+ /* Block taskq_terminate().*/
+ queue->tq_callouts++;
+
+ /*
+ * Wait for all currently executing taskqueue threads
+ * to go idle.
+ */
+ tb_marker.tb_running = TB_DRAIN_WAITER;
+ TAILQ_INSERT_TAIL(&queue->tq_active, &tb_marker, tb_link);
+ while (TAILQ_FIRST(&queue->tq_active) != &tb_marker)
+ TQ_SLEEP(queue, &tb_marker, &queue->tq_mutex, PWAIT, "-", 0);
+ TAILQ_REMOVE(&queue->tq_active, &tb_marker, tb_link);
+
+ /*
+ * Wakeup any other drain waiter that happened to queue up
+ * without any intervening active thread.
+ */
+ tb_first = TAILQ_FIRST(&queue->tq_active);
+ if (tb_first != NULL && tb_first->tb_running == TB_DRAIN_WAITER)
+ wakeup(tb_first);
- while (!TAILQ_EMPTY(&queue->tq_active))
- TQ_SLEEP(queue, &queue->tq_active, &queue->tq_mutex,
- PWAIT, "-", 0);
+ /* Release taskqueue_terminate(). */
+ queue->tq_callouts--;
+ if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0)
+ wakeup_one(queue->tq_threads);
}
void
@@ -317,10 +448,8 @@ taskqueue_unblock(struct taskqueue *queue)
TQ_LOCK(queue);
queue->tq_flags &= ~TQ_FLAGS_BLOCKED;
- if (queue->tq_flags & TQ_FLAGS_PENDING) {
- queue->tq_flags &= ~TQ_FLAGS_PENDING;
+ if (!STAILQ_EMPTY(&queue->tq_queue))
queue->tq_enqueue(queue->tq_context);
- }
TQ_UNLOCK(queue);
}
@@ -328,34 +457,42 @@ static void
taskqueue_run_locked(struct taskqueue *queue)
{
struct taskqueue_busy tb;
+ struct taskqueue_busy *tb_first;
struct task *task;
int pending;
- mtx_assert(&queue->tq_mutex, MA_OWNED);
+ KASSERT(queue != NULL, ("tq is NULL"));
+ TQ_ASSERT_LOCKED(queue);
tb.tb_running = NULL;
- TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link);
while (STAILQ_FIRST(&queue->tq_queue)) {
+ TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link);
+
/*
* Carefully remove the first task from the queue and
* zero its pending count.
*/
task = STAILQ_FIRST(&queue->tq_queue);
+ KASSERT(task != NULL, ("task is NULL"));
STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
pending = task->ta_pending;
task->ta_pending = 0;
tb.tb_running = task;
TQ_UNLOCK(queue);
+ KASSERT(task->ta_func != NULL, ("task->ta_func is NULL"));
task->ta_func(task->ta_context, pending);
TQ_LOCK(queue);
tb.tb_running = NULL;
wakeup(task);
+
+ TAILQ_REMOVE(&queue->tq_active, &tb, tb_link);
+ tb_first = TAILQ_FIRST(&queue->tq_active);
+ if (tb_first != NULL &&
+ tb_first->tb_running == TB_DRAIN_WAITER)
+ wakeup(tb_first);
}
- TAILQ_REMOVE(&queue->tq_active, &tb, tb_link);
- if (TAILQ_EMPTY(&queue->tq_active))
- wakeup(&queue->tq_active);
}
void
@@ -372,7 +509,7 @@ task_is_running(struct taskqueue *queue, struct task *task)
{
struct taskqueue_busy *tb;
- mtx_assert(&queue->tq_mutex, MA_OWNED);
+ TQ_ASSERT_LOCKED(queue);
TAILQ_FOREACH(tb, &queue->tq_active, tb_link) {
if (tb->tb_running == task)
return (1);
@@ -413,7 +550,7 @@ taskqueue_cancel_timeout(struct taskqueue *queue,
int error;
TQ_LOCK(queue);
- pending = !!callout_stop(&timeout_task->c);
+ pending = !!(callout_stop(&timeout_task->c) > 0);
error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1);
if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
timeout_task->f &= ~DT_CALLOUT_ARMED;
@@ -444,7 +581,6 @@ taskqueue_drain(struct taskqueue *queue, struct task *task)
void
taskqueue_drain_all(struct taskqueue *queue)
{
- struct task *task;
#ifndef __rtems__
if (!queue->tq_spin)
@@ -452,13 +588,8 @@ taskqueue_drain_all(struct taskqueue *queue)
#endif /* __rtems__ */
TQ_LOCK(queue);
- task = STAILQ_LAST(&queue->tq_queue, task, ta_link);
- if (task != NULL)
- while (task->ta_pending != 0)
- TQ_SLEEP(queue, task, &queue->tq_mutex, PWAIT, "-", 0);
- taskqueue_drain_running(queue);
- KASSERT(STAILQ_EMPTY(&queue->tq_queue),
- ("taskqueue queue is not empty after draining"));
+ taskqueue_drain_tq_queue(queue);
+ taskqueue_drain_tq_active(queue);
TQ_UNLOCK(queue);
}
@@ -497,24 +628,20 @@ taskqueue_swi_giant_run(void *dummy)
}
#endif /* __rtems__ */
-int
-taskqueue_start_threads(struct taskqueue **tqp, int count, int pri,
- const char *name, ...)
+static int
+_taskqueue_start_threads(struct taskqueue **tqp, int count, int pri,
+ cpuset_t *mask, const char *name, va_list ap)
{
- va_list ap;
+ char ktname[MAXCOMLEN + 1];
struct thread *td;
struct taskqueue *tq;
int i, error;
- char ktname[MAXCOMLEN + 1];
if (count <= 0)
return (EINVAL);
- tq = *tqp;
-
- va_start(ap, name);
vsnprintf(ktname, sizeof(ktname), name, ap);
- va_end(ap);
+ tq = *tqp;
tq->tq_threads = malloc(sizeof(struct thread *) * count, M_TASKQUEUE,
M_NOWAIT | M_ZERO);
@@ -544,6 +671,19 @@ taskqueue_start_threads(struct taskqueue **tqp, int count, int pri,
if (tq->tq_threads[i] == NULL)
continue;
td = tq->tq_threads[i];
+ if (mask) {
+ error = cpuset_setthread(td->td_tid, mask);
+ /*
+ * Failing to pin is rarely an actual fatal error;
+ * it'll just affect performance.
+ */
+ if (error)
+ printf("%s: curthread=%llu: can't pin; "
+ "error=%d\n",
+ __func__,
+ (unsigned long long) td->td_tid,
+ error);
+ }
thread_lock(td);
sched_prio(td, pri);
sched_add(td, SRQ_BORING);
@@ -556,6 +696,44 @@ taskqueue_start_threads(struct taskqueue **tqp, int count, int pri,
return (0);
}
+int
+taskqueue_start_threads(struct taskqueue **tqp, int count, int pri,
+ const char *name, ...)
+{
+ va_list ap;
+ int error;
+
+ va_start(ap, name);
+ error = _taskqueue_start_threads(tqp, count, pri, NULL, name, ap);
+ va_end(ap);
+ return (error);
+}
+
+int
+taskqueue_start_threads_cpuset(struct taskqueue **tqp, int count, int pri,
+ cpuset_t *mask, const char *name, ...)
+{
+ va_list ap;
+ int error;
+
+ va_start(ap, name);
+ error = _taskqueue_start_threads(tqp, count, pri, mask, name, ap);
+ va_end(ap);
+ return (error);
+}
+
+static inline void
+taskqueue_run_callback(struct taskqueue *tq,
+ enum taskqueue_callback_type cb_type)
+{
+ taskqueue_callback_fn tq_callback;
+
+ TQ_ASSERT_UNLOCKED(tq);
+ tq_callback = tq->tq_callbacks[cb_type];
+ if (tq_callback != NULL)
+ tq_callback(tq->tq_cb_contexts[cb_type]);
+}
+
void
taskqueue_thread_loop(void *arg)
{
@@ -563,8 +741,10 @@ taskqueue_thread_loop(void *arg)
tqp = arg;
tq = *tqp;
+ taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT);
TQ_LOCK(tq);
while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) {
+ /* XXX ? */
taskqueue_run_locked(tq);
/*
* Because taskqueue_run() can drop tq_mutex, we need to
@@ -576,6 +756,14 @@ taskqueue_thread_loop(void *arg)
TQ_SLEEP(tq, tq, &tq->tq_mutex, 0, "-", 0);
}
taskqueue_run_locked(tq);
+ /*
+ * This thread is on its way out, so just drop the lock temporarily
+ * in order to call the shutdown callback. This allows the callback
+ * to look at the taskqueue, even just before it dies.
+ */
+ TQ_UNLOCK(tq);
+ taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN);
+ TQ_LOCK(tq);
/* rendezvous with thread that asked us to terminate */
tq->tq_tcount--;
@@ -591,19 +779,17 @@ taskqueue_thread_enqueue(void *context)
tqp = context;
tq = *tqp;
-
- mtx_assert(&tq->tq_mutex, MA_OWNED);
wakeup_one(tq);
}
TASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, NULL,
swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ,
- INTR_MPSAFE, &taskqueue_ih));
+ INTR_MPSAFE, &taskqueue_ih));
#ifndef __rtems__
TASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, NULL,
swi_add(NULL, "Giant taskq", taskqueue_swi_giant_run,
- NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih));
+ NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih));
#endif /* __rtems__ */
TASKQUEUE_DEFINE_THREAD(thread);
@@ -616,13 +802,6 @@ taskqueue_create_fast(const char *name, int mflags,
MTX_SPIN, "fast_taskqueue");
}
-/* NB: for backwards compatibility */
-int
-taskqueue_enqueue_fast(struct taskqueue *queue, struct task *task)
-{
- return taskqueue_enqueue(queue, task);
-}
-
static void *taskqueue_fast_ih;
static void