diff options
author | Sebastian Huber <sebastian.huber@embedded-brains.de> | 2016-10-07 15:10:20 +0200 |
---|---|---|
committer | Sebastian Huber <sebastian.huber@embedded-brains.de> | 2017-01-10 09:53:31 +0100 |
commit | c40e45b75eb76d79a05c7fa85c1fa9b5c728a12f (patch) | |
tree | ad4f2519067709f00ab98b3c591186c26dc3a21f /freebsd/sys/kern/subr_taskqueue.c | |
parent | userspace-header-gen.py: Simplify program ports (diff) | |
download | rtems-libbsd-c40e45b75eb76d79a05c7fa85c1fa9b5c728a12f.tar.bz2 |
Update to FreeBSD head 2016-08-23
Git mirror commit 9fe7c416e6abb28b1398fd3e5687099846800cfd.
Diffstat (limited to 'freebsd/sys/kern/subr_taskqueue.c')
-rw-r--r-- | freebsd/sys/kern/subr_taskqueue.c | 299 |
1 files changed, 239 insertions, 60 deletions
diff --git a/freebsd/sys/kern/subr_taskqueue.c b/freebsd/sys/kern/subr_taskqueue.c index 99640026..8580e8fc 100644 --- a/freebsd/sys/kern/subr_taskqueue.c +++ b/freebsd/sys/kern/subr_taskqueue.c @@ -32,15 +32,18 @@ __FBSDID("$FreeBSD$"); #include <rtems/bsd/sys/param.h> #include <sys/systm.h> #include <sys/bus.h> +#include <rtems/bsd/sys/cpuset.h> #include <sys/interrupt.h> #include <sys/kernel.h> #include <sys/kthread.h> +#include <sys/libkern.h> #include <sys/limits.h> #include <rtems/bsd/sys/lock.h> #include <sys/malloc.h> #include <sys/mutex.h> #include <sys/proc.h> #include <sys/sched.h> +#include <sys/smp.h> #include <sys/taskqueue.h> #include <rtems/bsd/sys/unistd.h> #include <machine/stdarg.h> @@ -50,16 +53,24 @@ static MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues"); static void *taskqueue_giant_ih; #endif /* __rtems__ */ static void *taskqueue_ih; +static void taskqueue_fast_enqueue(void *); +static void taskqueue_swi_enqueue(void *); +#ifndef __rtems__ +static void taskqueue_swi_giant_enqueue(void *); +#endif /* __rtems__ */ struct taskqueue_busy { struct task *tb_running; TAILQ_ENTRY(taskqueue_busy) tb_link; }; +struct task * const TB_DRAIN_WAITER = (struct task *)0x1; + struct taskqueue { STAILQ_HEAD(, task) tq_queue; taskqueue_enqueue_fn tq_enqueue; void *tq_context; + char *tq_name; TAILQ_HEAD(, taskqueue_busy) tq_active; struct mtx tq_mutex; struct thread **tq_threads; @@ -69,11 +80,13 @@ struct taskqueue { #endif /* __rtems__ */ int tq_flags; int tq_callouts; + taskqueue_callback_fn tq_callbacks[TASKQUEUE_NUM_CALLBACKS]; + void *tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS]; }; #define TQ_FLAGS_ACTIVE (1 << 0) #define TQ_FLAGS_BLOCKED (1 << 1) -#define TQ_FLAGS_PENDING (1 << 2) +#define TQ_FLAGS_UNLOCKED_ENQUEUE (1 << 2) #define DT_CALLOUT_ARMED (1 << 0) @@ -85,7 +98,15 @@ struct taskqueue { else \ mtx_lock(&(tq)->tq_mutex); \ } while (0) +#else /* __rtems__ */ +#define TQ_LOCK(tq) \ + do { \ + mtx_lock(&(tq)->tq_mutex); \ + } while (0) +#endif /* __rtems__ */ +#define TQ_ASSERT_LOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_OWNED) +#ifndef __rtems__ #define TQ_UNLOCK(tq) \ do { \ if ((tq)->tq_spin) \ @@ -94,16 +115,12 @@ struct taskqueue { mtx_unlock(&(tq)->tq_mutex); \ } while (0) #else /* __rtems__ */ -#define TQ_LOCK(tq) \ - do { \ - mtx_lock(&(tq)->tq_mutex); \ - } while (0) - #define TQ_UNLOCK(tq) \ do { \ mtx_unlock(&(tq)->tq_mutex); \ } while (0) #endif /* __rtems__ */ +#define TQ_ASSERT_UNLOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_NOTOWNED) void _timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task, @@ -111,7 +128,8 @@ _timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task, { TASK_INIT(&timeout_task->t, priority, func, context); - callout_init_mtx(&timeout_task->c, &queue->tq_mutex, 0); + callout_init_mtx(&timeout_task->c, &queue->tq_mutex, + CALLOUT_RETURNUNLOCKED); timeout_task->q = queue; timeout_task->f = 0; } @@ -128,20 +146,30 @@ TQ_SLEEP(struct taskqueue *tq, void *p, struct mtx *m, int pri, const char *wm, } static struct taskqueue * -_taskqueue_create(const char *name __unused, int mflags, +_taskqueue_create(const char *name, int mflags, taskqueue_enqueue_fn enqueue, void *context, - int mtxflags, const char *mtxname) + int mtxflags, const char *mtxname __unused) { struct taskqueue *queue; + char *tq_name; + + tq_name = malloc(TASKQUEUE_NAMELEN, M_TASKQUEUE, mflags | M_ZERO); + if (tq_name == NULL) + return (NULL); queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO); - if (!queue) - return NULL; + if (queue == NULL) { + free(tq_name, M_TASKQUEUE); + return (NULL); + } + + snprintf(tq_name, TASKQUEUE_NAMELEN, "%s", (name) ? name : "taskqueue"); STAILQ_INIT(&queue->tq_queue); TAILQ_INIT(&queue->tq_active); queue->tq_enqueue = enqueue; queue->tq_context = context; + queue->tq_name = tq_name; #ifndef __rtems__ queue->tq_spin = (mtxflags & MTX_SPIN) != 0; #else /* __rtems__ */ @@ -151,17 +179,42 @@ _taskqueue_create(const char *name __unused, int mflags, */ #endif /* __rtems__ */ queue->tq_flags |= TQ_FLAGS_ACTIVE; - mtx_init(&queue->tq_mutex, mtxname, NULL, mtxflags); + if (enqueue == taskqueue_fast_enqueue || + enqueue == taskqueue_swi_enqueue || +#ifndef __rtems__ + enqueue == taskqueue_swi_giant_enqueue || +#endif /* __rtems__ */ + enqueue == taskqueue_thread_enqueue) + queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE; + mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags); - return queue; + return (queue); } struct taskqueue * taskqueue_create(const char *name, int mflags, taskqueue_enqueue_fn enqueue, void *context) { + return _taskqueue_create(name, mflags, enqueue, context, - MTX_DEF, "taskqueue"); + MTX_DEF, name); +} + +void +taskqueue_set_callback(struct taskqueue *queue, + enum taskqueue_callback_type cb_type, taskqueue_callback_fn callback, + void *context) +{ + + KASSERT(((cb_type >= TASKQUEUE_CALLBACK_TYPE_MIN) && + (cb_type <= TASKQUEUE_CALLBACK_TYPE_MAX)), + ("Callback type %d not valid, must be %d-%d", cb_type, + TASKQUEUE_CALLBACK_TYPE_MIN, TASKQUEUE_CALLBACK_TYPE_MAX)); + KASSERT((queue->tq_callbacks[cb_type] == NULL), + ("Re-initialization of taskqueue callback?")); + + queue->tq_callbacks[cb_type] = callback; + queue->tq_cb_contexts[cb_type] = context; } /* @@ -188,6 +241,7 @@ taskqueue_free(struct taskqueue *queue) KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks")); mtx_destroy(&queue->tq_mutex); free(queue->tq_threads, M_TASKQUEUE); + free(queue->tq_name, M_TASKQUEUE); free(queue, M_TASKQUEUE); } @@ -197,12 +251,14 @@ taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task) struct task *ins; struct task *prev; + KASSERT(task->ta_func != NULL, ("enqueueing task with NULL func")); /* * Count multiple enqueues. */ if (task->ta_pending) { if (task->ta_pending < USHRT_MAX) task->ta_pending++; + TQ_UNLOCK(queue); return (0); } @@ -226,13 +282,17 @@ taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task) } task->ta_pending = 1; + if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) != 0) + TQ_UNLOCK(queue); if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0) queue->tq_enqueue(queue->tq_context); - else - queue->tq_flags |= TQ_FLAGS_PENDING; + if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) == 0) + TQ_UNLOCK(queue); + /* Return with lock released. */ return (0); } + int taskqueue_enqueue(struct taskqueue *queue, struct task *task) { @@ -240,7 +300,7 @@ taskqueue_enqueue(struct taskqueue *queue, struct task *task) TQ_LOCK(queue); res = taskqueue_enqueue_locked(queue, task); - TQ_UNLOCK(queue); + /* The lock is released inside. */ return (res); } @@ -257,6 +317,7 @@ taskqueue_timeout_func(void *arg) timeout_task->f &= ~DT_CALLOUT_ARMED; queue->tq_callouts--; taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t); + /* The lock is released inside. */ } int @@ -275,6 +336,7 @@ taskqueue_enqueue_timeout(struct taskqueue *queue, res = timeout_task->t.ta_pending; if (ticks == 0) { taskqueue_enqueue_locked(queue, &timeout_task->t); + /* The lock is released inside. */ } else { if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) { res++; @@ -288,18 +350,87 @@ taskqueue_enqueue_timeout(struct taskqueue *queue, callout_reset(&timeout_task->c, ticks, taskqueue_timeout_func, timeout_task); } + TQ_UNLOCK(queue); } - TQ_UNLOCK(queue); return (res); } static void -taskqueue_drain_running(struct taskqueue *queue) +taskqueue_task_nop_fn(void *context, int pending) +{ +} + +/* + * Block until all currently queued tasks in this taskqueue + * have begun execution. Tasks queued during execution of + * this function are ignored. + */ +static void +taskqueue_drain_tq_queue(struct taskqueue *queue) +{ + struct task t_barrier; + + if (STAILQ_EMPTY(&queue->tq_queue)) + return; + + /* + * Enqueue our barrier after all current tasks, but with + * the highest priority so that newly queued tasks cannot + * pass it. Because of the high priority, we can not use + * taskqueue_enqueue_locked directly (which drops the lock + * anyway) so just insert it at tail while we have the + * queue lock. + */ + TASK_INIT(&t_barrier, USHRT_MAX, taskqueue_task_nop_fn, &t_barrier); + STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link); + t_barrier.ta_pending = 1; + + /* + * Once the barrier has executed, all previously queued tasks + * have completed or are currently executing. + */ + while (t_barrier.ta_pending != 0) + TQ_SLEEP(queue, &t_barrier, &queue->tq_mutex, PWAIT, "-", 0); +} + +/* + * Block until all currently executing tasks for this taskqueue + * complete. Tasks that begin execution during the execution + * of this function are ignored. + */ +static void +taskqueue_drain_tq_active(struct taskqueue *queue) { + struct taskqueue_busy tb_marker, *tb_first; + + if (TAILQ_EMPTY(&queue->tq_active)) + return; + + /* Block taskq_terminate().*/ + queue->tq_callouts++; + + /* + * Wait for all currently executing taskqueue threads + * to go idle. + */ + tb_marker.tb_running = TB_DRAIN_WAITER; + TAILQ_INSERT_TAIL(&queue->tq_active, &tb_marker, tb_link); + while (TAILQ_FIRST(&queue->tq_active) != &tb_marker) + TQ_SLEEP(queue, &tb_marker, &queue->tq_mutex, PWAIT, "-", 0); + TAILQ_REMOVE(&queue->tq_active, &tb_marker, tb_link); + + /* + * Wakeup any other drain waiter that happened to queue up + * without any intervening active thread. + */ + tb_first = TAILQ_FIRST(&queue->tq_active); + if (tb_first != NULL && tb_first->tb_running == TB_DRAIN_WAITER) + wakeup(tb_first); - while (!TAILQ_EMPTY(&queue->tq_active)) - TQ_SLEEP(queue, &queue->tq_active, &queue->tq_mutex, - PWAIT, "-", 0); + /* Release taskqueue_terminate(). */ + queue->tq_callouts--; + if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0) + wakeup_one(queue->tq_threads); } void @@ -317,10 +448,8 @@ taskqueue_unblock(struct taskqueue *queue) TQ_LOCK(queue); queue->tq_flags &= ~TQ_FLAGS_BLOCKED; - if (queue->tq_flags & TQ_FLAGS_PENDING) { - queue->tq_flags &= ~TQ_FLAGS_PENDING; + if (!STAILQ_EMPTY(&queue->tq_queue)) queue->tq_enqueue(queue->tq_context); - } TQ_UNLOCK(queue); } @@ -328,34 +457,42 @@ static void taskqueue_run_locked(struct taskqueue *queue) { struct taskqueue_busy tb; + struct taskqueue_busy *tb_first; struct task *task; int pending; - mtx_assert(&queue->tq_mutex, MA_OWNED); + KASSERT(queue != NULL, ("tq is NULL")); + TQ_ASSERT_LOCKED(queue); tb.tb_running = NULL; - TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link); while (STAILQ_FIRST(&queue->tq_queue)) { + TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link); + /* * Carefully remove the first task from the queue and * zero its pending count. */ task = STAILQ_FIRST(&queue->tq_queue); + KASSERT(task != NULL, ("task is NULL")); STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); pending = task->ta_pending; task->ta_pending = 0; tb.tb_running = task; TQ_UNLOCK(queue); + KASSERT(task->ta_func != NULL, ("task->ta_func is NULL")); task->ta_func(task->ta_context, pending); TQ_LOCK(queue); tb.tb_running = NULL; wakeup(task); + + TAILQ_REMOVE(&queue->tq_active, &tb, tb_link); + tb_first = TAILQ_FIRST(&queue->tq_active); + if (tb_first != NULL && + tb_first->tb_running == TB_DRAIN_WAITER) + wakeup(tb_first); } - TAILQ_REMOVE(&queue->tq_active, &tb, tb_link); - if (TAILQ_EMPTY(&queue->tq_active)) - wakeup(&queue->tq_active); } void @@ -372,7 +509,7 @@ task_is_running(struct taskqueue *queue, struct task *task) { struct taskqueue_busy *tb; - mtx_assert(&queue->tq_mutex, MA_OWNED); + TQ_ASSERT_LOCKED(queue); TAILQ_FOREACH(tb, &queue->tq_active, tb_link) { if (tb->tb_running == task) return (1); @@ -413,7 +550,7 @@ taskqueue_cancel_timeout(struct taskqueue *queue, int error; TQ_LOCK(queue); - pending = !!callout_stop(&timeout_task->c); + pending = !!(callout_stop(&timeout_task->c) > 0); error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1); if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) { timeout_task->f &= ~DT_CALLOUT_ARMED; @@ -444,7 +581,6 @@ taskqueue_drain(struct taskqueue *queue, struct task *task) void taskqueue_drain_all(struct taskqueue *queue) { - struct task *task; #ifndef __rtems__ if (!queue->tq_spin) @@ -452,13 +588,8 @@ taskqueue_drain_all(struct taskqueue *queue) #endif /* __rtems__ */ TQ_LOCK(queue); - task = STAILQ_LAST(&queue->tq_queue, task, ta_link); - if (task != NULL) - while (task->ta_pending != 0) - TQ_SLEEP(queue, task, &queue->tq_mutex, PWAIT, "-", 0); - taskqueue_drain_running(queue); - KASSERT(STAILQ_EMPTY(&queue->tq_queue), - ("taskqueue queue is not empty after draining")); + taskqueue_drain_tq_queue(queue); + taskqueue_drain_tq_active(queue); TQ_UNLOCK(queue); } @@ -497,24 +628,20 @@ taskqueue_swi_giant_run(void *dummy) } #endif /* __rtems__ */ -int -taskqueue_start_threads(struct taskqueue **tqp, int count, int pri, - const char *name, ...) +static int +_taskqueue_start_threads(struct taskqueue **tqp, int count, int pri, + cpuset_t *mask, const char *name, va_list ap) { - va_list ap; + char ktname[MAXCOMLEN + 1]; struct thread *td; struct taskqueue *tq; int i, error; - char ktname[MAXCOMLEN + 1]; if (count <= 0) return (EINVAL); - tq = *tqp; - - va_start(ap, name); vsnprintf(ktname, sizeof(ktname), name, ap); - va_end(ap); + tq = *tqp; tq->tq_threads = malloc(sizeof(struct thread *) * count, M_TASKQUEUE, M_NOWAIT | M_ZERO); @@ -544,6 +671,19 @@ taskqueue_start_threads(struct taskqueue **tqp, int count, int pri, if (tq->tq_threads[i] == NULL) continue; td = tq->tq_threads[i]; + if (mask) { + error = cpuset_setthread(td->td_tid, mask); + /* + * Failing to pin is rarely an actual fatal error; + * it'll just affect performance. + */ + if (error) + printf("%s: curthread=%llu: can't pin; " + "error=%d\n", + __func__, + (unsigned long long) td->td_tid, + error); + } thread_lock(td); sched_prio(td, pri); sched_add(td, SRQ_BORING); @@ -556,6 +696,44 @@ taskqueue_start_threads(struct taskqueue **tqp, int count, int pri, return (0); } +int +taskqueue_start_threads(struct taskqueue **tqp, int count, int pri, + const char *name, ...) +{ + va_list ap; + int error; + + va_start(ap, name); + error = _taskqueue_start_threads(tqp, count, pri, NULL, name, ap); + va_end(ap); + return (error); +} + +int +taskqueue_start_threads_cpuset(struct taskqueue **tqp, int count, int pri, + cpuset_t *mask, const char *name, ...) +{ + va_list ap; + int error; + + va_start(ap, name); + error = _taskqueue_start_threads(tqp, count, pri, mask, name, ap); + va_end(ap); + return (error); +} + +static inline void +taskqueue_run_callback(struct taskqueue *tq, + enum taskqueue_callback_type cb_type) +{ + taskqueue_callback_fn tq_callback; + + TQ_ASSERT_UNLOCKED(tq); + tq_callback = tq->tq_callbacks[cb_type]; + if (tq_callback != NULL) + tq_callback(tq->tq_cb_contexts[cb_type]); +} + void taskqueue_thread_loop(void *arg) { @@ -563,8 +741,10 @@ taskqueue_thread_loop(void *arg) tqp = arg; tq = *tqp; + taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT); TQ_LOCK(tq); while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) { + /* XXX ? */ taskqueue_run_locked(tq); /* * Because taskqueue_run() can drop tq_mutex, we need to @@ -576,6 +756,14 @@ taskqueue_thread_loop(void *arg) TQ_SLEEP(tq, tq, &tq->tq_mutex, 0, "-", 0); } taskqueue_run_locked(tq); + /* + * This thread is on its way out, so just drop the lock temporarily + * in order to call the shutdown callback. This allows the callback + * to look at the taskqueue, even just before it dies. + */ + TQ_UNLOCK(tq); + taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN); + TQ_LOCK(tq); /* rendezvous with thread that asked us to terminate */ tq->tq_tcount--; @@ -591,19 +779,17 @@ taskqueue_thread_enqueue(void *context) tqp = context; tq = *tqp; - - mtx_assert(&tq->tq_mutex, MA_OWNED); wakeup_one(tq); } TASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, NULL, swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ, - INTR_MPSAFE, &taskqueue_ih)); + INTR_MPSAFE, &taskqueue_ih)); #ifndef __rtems__ TASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, NULL, swi_add(NULL, "Giant taskq", taskqueue_swi_giant_run, - NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih)); + NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih)); #endif /* __rtems__ */ TASKQUEUE_DEFINE_THREAD(thread); @@ -616,13 +802,6 @@ taskqueue_create_fast(const char *name, int mflags, MTX_SPIN, "fast_taskqueue"); } -/* NB: for backwards compatibility */ -int -taskqueue_enqueue_fast(struct taskqueue *queue, struct task *task) -{ - return taskqueue_enqueue(queue, task); -} - static void *taskqueue_fast_ih; static void |