From adbedd10cfe5259018b1682d903ab40f6005b3f0 Mon Sep 17 00:00:00 2001 From: Sebastian Huber Date: Fri, 15 Apr 2016 21:18:26 +0200 Subject: score: Introduce _Thread_queue_Flush_critical() Replace _Thread_queue_Flush() with _Thread_queue_Flush_critical() and add a filter function for customization of the thread queue flush operation. Update #2555. --- cpukit/score/include/rtems/score/corebarrierimpl.h | 28 +++++-- cpukit/score/include/rtems/score/coremuteximpl.h | 36 +++++++-- cpukit/score/include/rtems/score/coresemimpl.h | 46 ++++++++--- cpukit/score/include/rtems/score/threadqimpl.h | 87 ++++++++++++++------ cpukit/score/src/corebarrier.c | 12 ++- cpukit/score/src/coremsgclose.c | 23 +++++- cpukit/score/src/coremsgflush.c | 2 +- cpukit/score/src/coremutex.c | 22 +++++ cpukit/score/src/coresem.c | 22 +++++ cpukit/score/src/threadqflush.c | 94 +++++++++++++++------- 10 files changed, 288 insertions(+), 84 deletions(-) (limited to 'cpukit/score') diff --git a/cpukit/score/include/rtems/score/corebarrierimpl.h b/cpukit/score/include/rtems/score/corebarrierimpl.h index 1d774052d7..03abecf4e3 100644 --- a/cpukit/score/include/rtems/score/corebarrierimpl.h +++ b/cpukit/score/include/rtems/score/corebarrierimpl.h @@ -193,19 +193,33 @@ uint32_t _CORE_barrier_Do_release( ) #endif +Thread_Control *_CORE_barrier_Was_deleted( + Thread_Control *the_thread, + Thread_queue_Queue *queue, + ISR_lock_Context *lock_context +); + /* Must be a macro due to the multiprocessing dependent parameters */ #define _CORE_barrier_Flush( \ the_barrier, \ mp_callout, \ mp_id \ ) \ - _Thread_queue_Flush( \ - &( the_barrier )->Wait_queue, \ - CORE_BARRIER_TQ_OPERATIONS, \ - CORE_BARRIER_WAS_DELETED, \ - mp_callout, \ - mp_id \ - ) + do { \ + ISR_lock_Context _core_barrier_flush_lock_context; \ + _Thread_queue_Acquire( \ + &( the_barrier )->Wait_queue, \ + &_core_barrier_flush_lock_context \ + ); \ + _Thread_queue_Flush_critical( \ + &( the_barrier )->Wait_queue.Queue, \ + CORE_BARRIER_TQ_OPERATIONS, \ + _CORE_barrier_Was_deleted, \ + mp_callout, \ + mp_id, \ + &_core_barrier_flush_lock_context \ + ); \ + } while ( 0 ) /** * This function returns true if the automatic release attribute is diff --git a/cpukit/score/include/rtems/score/coremuteximpl.h b/cpukit/score/include/rtems/score/coremuteximpl.h index eae6ef16f9..73331a5f32 100644 --- a/cpukit/score/include/rtems/score/coremuteximpl.h +++ b/cpukit/score/include/rtems/score/coremuteximpl.h @@ -338,20 +338,40 @@ CORE_mutex_Status _CORE_mutex_Do_surrender( ) #endif +Thread_Control *_CORE_mutex_Was_deleted( + Thread_Control *the_thread, + Thread_queue_Queue *queue, + ISR_lock_Context *lock_context +); + +Thread_Control *_CORE_mutex_Unsatisfied_nowait( + Thread_Control *the_thread, + Thread_queue_Queue *queue, + ISR_lock_Context *lock_context +); + /* Must be a macro due to the multiprocessing dependent parameters */ #define _CORE_mutex_Flush( \ the_mutex, \ - status, \ + filter, \ mp_callout, \ mp_id \ ) \ - _Thread_queue_Flush( \ - &( the_mutex )->Wait_queue, \ - ( the_mutex )->operations, \ - status, \ - mp_callout, \ - mp_id \ - ) + do { \ + ISR_lock_Context _core_mutex_flush_lock_context; \ + _Thread_queue_Acquire( \ + &( the_mutex )->Wait_queue, \ + &_core_mutex_flush_lock_context \ + ); \ + _Thread_queue_Flush_critical( \ + &( the_mutex )->Wait_queue.Queue, \ + ( the_mutex )->operations, \ + filter, \ + mp_callout, \ + mp_id, \ + &_core_mutex_flush_lock_context \ + ); \ + } while ( 0 ) /** * @brief Is mutex locked. diff --git a/cpukit/score/include/rtems/score/coresemimpl.h b/cpukit/score/include/rtems/score/coresemimpl.h index e0e278843e..fd01f93150 100644 --- a/cpukit/score/include/rtems/score/coresemimpl.h +++ b/cpukit/score/include/rtems/score/coresemimpl.h @@ -86,18 +86,36 @@ void _CORE_semaphore_Initialize( uint32_t initial_value ); +Thread_Control *_CORE_semaphore_Was_deleted( + Thread_Control *the_thread, + Thread_queue_Queue *queue, + ISR_lock_Context *lock_context +); + +Thread_Control *_CORE_semaphore_Unsatisfied_nowait( + Thread_Control *the_thread, + Thread_queue_Queue *queue, + ISR_lock_Context *lock_context +); + #define _CORE_semaphore_Destroy( \ the_semaphore, \ mp_callout, \ mp_id \ ) \ do { \ - _Thread_queue_Flush( \ + ISR_lock_Context _core_semaphore_destroy_lock_context; \ + _Thread_queue_Acquire( \ &( the_semaphore )->Wait_queue, \ + &_core_semaphore_destroy_lock_context \ + ); \ + _Thread_queue_Flush_critical( \ + &( the_semaphore )->Wait_queue.Queue, \ ( the_semaphore )->operations, \ - CORE_SEMAPHORE_WAS_DELETED, \ + _CORE_semaphore_Was_deleted, \ mp_callout, \ - mp_id \ + mp_id, \ + &_core_semaphore_destroy_lock_context \ ); \ _Thread_queue_Destroy( &( the_semaphore )->Wait_queue ); \ } while ( 0 ) @@ -192,13 +210,21 @@ RTEMS_INLINE_ROUTINE CORE_semaphore_Status _CORE_semaphore_Do_surrender( mp_callout, \ mp_id \ ) \ - _Thread_queue_Flush( \ - &( the_semaphore )->Wait_queue, \ - ( the_semaphore )->operations, \ - CORE_SEMAPHORE_STATUS_UNSATISFIED_NOWAIT, \ - mp_callout, \ - mp_id \ - ) + do { \ + ISR_lock_Context _core_semaphore_flush_lock_context; \ + _Thread_queue_Acquire( \ + &( the_semaphore )->Wait_queue, \ + &_core_semaphore_flush_lock_context \ + ); \ + _Thread_queue_Flush_critical( \ + &( the_semaphore )->Wait_queue.Queue, \ + ( the_semaphore )->operations, \ + _CORE_semaphore_Unsatisfied_nowait, \ + mp_callout, \ + mp_id, \ + &_core_semaphore_flush_lock_context \ + ); \ + } while ( 0 ) /** * This routine returns the current count associated with the semaphore. diff --git a/cpukit/score/include/rtems/score/threadqimpl.h b/cpukit/score/include/rtems/score/threadqimpl.h index d56be79a45..7b1c896f12 100644 --- a/cpukit/score/include/rtems/score/threadqimpl.h +++ b/cpukit/score/include/rtems/score/threadqimpl.h @@ -590,57 +590,96 @@ Thread_Control *_Thread_queue_First( const Thread_queue_Operations *operations ); -void _Thread_queue_Do_flush( - Thread_queue_Control *the_thread_queue, +/** + * @brief Thread queue flush filter function. + * + * Called under protection of the thread queue lock by + * _Thread_queue_Flush_critical() to optionally alter the thread wait + * information and control the iteration. + * + * @param the_thread The thread to extract. This is the first parameter to + * optimize for architectures that use the same register for the first + * parameter and the return value. + * @param queue The actual thread queue. + * @param lock_context The lock context of the lock acquire. May be used to + * pass additional data to the filter function via an overlay structure. The + * filter function should not release or acquire the thread queue lock. + * + * @retval the_thread Extract this thread. + * @retval NULL Do not extract this thread and stop the thread queue flush + * operation. Threads that are already extracted will complete the flush + * operation. + */ +typedef Thread_Control *( *Thread_queue_Flush_filter )( + Thread_Control *the_thread, + Thread_queue_Queue *queue, + ISR_lock_Context *lock_context +); + +size_t _Thread_queue_Do_flush_critical( + Thread_queue_Queue *queue, const Thread_queue_Operations *operations, - uint32_t status + Thread_queue_Flush_filter filter, #if defined(RTEMS_MULTIPROCESSING) - , Thread_queue_MP_callout mp_callout, - Objects_Id mp_id + Objects_Id mp_id, #endif + ISR_lock_Context *lock_context ); /** - * @brief Unblocks all threads blocked on the thread queue. + * @brief Unblocks all threads enqueued on the thread queue. * - * The thread timers of the threads are cancelled. + * This function iteratively extracts the first enqueued thread of the thread + * queue until the thread queue is empty or the filter function indicates a + * stop. The thread timers of the extracted threads are cancelled. The + * extracted threads are unblocked. * - * @param the_thread_queue The thread queue. + * @param queue The actual thread queue. * @param operations The thread queue operations. - * @param status The return status for the threads. + * @param filter The filter functions is called for each thread to extract from + * the thread queue. It may be used to alter the thread under protection of + * the thread queue lock, for example to set the thread wait return code. + * The return value of the filter function controls if the thread queue flush + * operation should stop or continue. * @param mp_callout Callout to extract the proxy of a remote thread. This * parameter is only used on multiprocessing configurations. * @param mp_id Object identifier of the object containing the thread queue. * This parameter is only used on multiprocessing configurations. + * + * @return The count of extracted threads. */ #if defined(RTEMS_MULTIPROCESSING) - #define _Thread_queue_Flush( \ - the_thread_queue, \ + #define _Thread_queue_Flush_critical( \ + queue, \ operations, \ - status, \ + filter, \ mp_callout, \ - mp_id \ + mp_id, \ + lock_context \ ) \ - _Thread_queue_Do_flush( \ - the_thread_queue, \ + _Thread_queue_Do_flush_critical( \ + queue, \ operations, \ - status, \ + filter, \ mp_callout, \ - mp_id \ + mp_id, \ + lock_context \ ) #else - #define _Thread_queue_Flush( \ - the_thread_queue, \ + #define _Thread_queue_Flush_critical( \ + queue, \ operations, \ - status, \ + filter, \ mp_callout, \ - mp_id \ + mp_id, \ + lock_context \ ) \ - _Thread_queue_Do_flush( \ - the_thread_queue, \ + _Thread_queue_Do_flush_critical( \ + queue, \ operations, \ - status \ + filter, \ + lock_context \ ) #endif diff --git a/cpukit/score/src/corebarrier.c b/cpukit/score/src/corebarrier.c index 5313a0f6c8..3cb7906289 100644 --- a/cpukit/score/src/corebarrier.c +++ b/cpukit/score/src/corebarrier.c @@ -19,7 +19,6 @@ #endif #include -#include void _CORE_barrier_Initialize( CORE_barrier_Control *the_barrier, @@ -32,3 +31,14 @@ void _CORE_barrier_Initialize( _Thread_queue_Initialize( &the_barrier->Wait_queue ); } + +Thread_Control *_CORE_barrier_Was_deleted( + Thread_Control *the_thread, + Thread_queue_Queue *queue, + ISR_lock_Context *lock_context +) +{ + the_thread->Wait.return_code = CORE_BARRIER_WAS_DELETED; + + return the_thread; +} diff --git a/cpukit/score/src/coremsgclose.c b/cpukit/score/src/coremsgclose.c index 1511f83b0a..7184b11bfa 100644 --- a/cpukit/score/src/coremsgclose.c +++ b/cpukit/score/src/coremsgclose.c @@ -21,6 +21,17 @@ #include #include +static Thread_Control *_CORE_message_queue_Was_deleted( + Thread_Control *the_thread, + Thread_queue_Queue *queue, + ISR_lock_Context *lock_context +) +{ + the_thread->Wait.return_code = CORE_MESSAGE_QUEUE_STATUS_WAS_DELETED; + + return the_thread; +} + void _CORE_message_queue_Do_close( CORE_message_queue_Control *the_message_queue #if defined(RTEMS_MULTIPROCESSING) @@ -30,17 +41,21 @@ void _CORE_message_queue_Do_close( #endif ) { + ISR_lock_Context lock_context; + /* * This will flush blocked threads whether they were blocked on * a send or receive. */ - _Thread_queue_Flush( - &the_message_queue->Wait_queue, + _CORE_message_queue_Acquire( the_message_queue, &lock_context ); + _Thread_queue_Flush_critical( + &the_message_queue->Wait_queue.Queue, the_message_queue->operations, - CORE_MESSAGE_QUEUE_STATUS_WAS_DELETED, + _CORE_message_queue_Was_deleted, mp_callout, - mp_id + mp_id, + &lock_context ); (void) _Workspace_Free( the_message_queue->message_buffers ); diff --git a/cpukit/score/src/coremsgflush.c b/cpukit/score/src/coremsgflush.c index f67dcf28eb..38f26b7b87 100644 --- a/cpukit/score/src/coremsgflush.c +++ b/cpukit/score/src/coremsgflush.c @@ -41,7 +41,7 @@ uint32_t _CORE_message_queue_Flush( * * (1) The thread queue of pending senders is a logical extension * of the pending message queue. In this case, it should be - * flushed using the _Thread_queue_Flush() service with a status + * flushed using the _Thread_queue_Flush_critical() service with a status * such as CORE_MESSAGE_QUEUE_SENDER_FLUSHED (which currently does * not exist). This can be implemented without changing the "big-O" * of the message flushing part of the routine. diff --git a/cpukit/score/src/coremutex.c b/cpukit/score/src/coremutex.c index 88d487ca4b..75e0c49592 100644 --- a/cpukit/score/src/coremutex.c +++ b/cpukit/score/src/coremutex.c @@ -96,3 +96,25 @@ CORE_mutex_Status _CORE_mutex_Initialize( return CORE_MUTEX_STATUS_SUCCESSFUL; } + +Thread_Control *_CORE_mutex_Was_deleted( + Thread_Control *the_thread, + Thread_queue_Queue *queue, + ISR_lock_Context *lock_context +) +{ + the_thread->Wait.return_code = CORE_MUTEX_WAS_DELETED; + + return the_thread; +} + +Thread_Control *_CORE_mutex_Unsatisfied_nowait( + Thread_Control *the_thread, + Thread_queue_Queue *queue, + ISR_lock_Context *lock_context +) +{ + the_thread->Wait.return_code = CORE_MUTEX_STATUS_UNSATISFIED_NOWAIT; + + return the_thread; +} diff --git a/cpukit/score/src/coresem.c b/cpukit/score/src/coresem.c index 2bdd81c76a..02a3837410 100644 --- a/cpukit/score/src/coresem.c +++ b/cpukit/score/src/coresem.c @@ -36,3 +36,25 @@ void _CORE_semaphore_Initialize( the_semaphore->operations = &_Thread_queue_Operations_FIFO; } } + +Thread_Control *_CORE_semaphore_Was_deleted( + Thread_Control *the_thread, + Thread_queue_Queue *queue, + ISR_lock_Context *lock_context +) +{ + the_thread->Wait.return_code = CORE_SEMAPHORE_WAS_DELETED; + + return the_thread; +} + +Thread_Control *_CORE_semaphore_Unsatisfied_nowait( + Thread_Control *the_thread, + Thread_queue_Queue *queue, + ISR_lock_Context *lock_context +) +{ + the_thread->Wait.return_code = CORE_SEMAPHORE_STATUS_UNSATISFIED_NOWAIT; + + return the_thread; +} diff --git a/cpukit/score/src/threadqflush.c b/cpukit/score/src/threadqflush.c index c50831466d..0ce639eb68 100644 --- a/cpukit/score/src/threadqflush.c +++ b/cpukit/score/src/threadqflush.c @@ -18,46 +18,82 @@ #include "config.h" #endif -#include -#include +#include -void _Thread_queue_Do_flush( - Thread_queue_Control *the_thread_queue, +size_t _Thread_queue_Do_flush_critical( + Thread_queue_Queue *queue, const Thread_queue_Operations *operations, - uint32_t status + Thread_queue_Flush_filter filter, #if defined(RTEMS_MULTIPROCESSING) - , Thread_queue_MP_callout mp_callout, - Objects_Id mp_id + Objects_Id mp_id, #endif + ISR_lock_Context *lock_context ) { - ISR_lock_Context lock_context; - Thread_Control *the_thread; - - _Thread_queue_Acquire( the_thread_queue, &lock_context ); - - while ( - ( - the_thread = _Thread_queue_First_locked( - the_thread_queue, - operations - ) - ) - ) { - the_thread->Wait.return_code = status; - - _Thread_queue_Extract_critical( - &the_thread_queue->Queue, + size_t flushed; + Chain_Control unblock; + Chain_Node *node; + Chain_Node *tail; + + flushed = 0; + _Chain_Initialize_empty( &unblock ); + + while ( true ) { + Thread_queue_Heads *heads; + Thread_Control *first; + bool do_unblock; + + heads = queue->heads; + if ( heads == NULL ) { + break; + } + + first = ( *operations->first )( heads ); + first = ( *filter )( first, queue, lock_context ); + if ( first == NULL ) { + break; + } + + do_unblock = _Thread_queue_Extract_locked( + queue, operations, - the_thread, + first, mp_callout, - mp_id, - &lock_context + mp_id ); + if ( do_unblock ) { + _Chain_Append_unprotected( &unblock, &first->Wait.Node.Chain ); + } + + ++flushed; + } + + node = _Chain_First( &unblock ); + tail = _Chain_Tail( &unblock ); + + if ( node != tail ) { + Per_CPU_Control *cpu_self; + + cpu_self = _Thread_Dispatch_disable_critical( lock_context ); + _Thread_queue_Queue_release( queue, lock_context ); + + do { + Thread_Control *the_thread; + Chain_Node *next; + + next = _Chain_Next( node ); + the_thread = THREAD_CHAIN_NODE_TO_THREAD( node ); + _Thread_Timer_remove( the_thread ); + _Thread_Unblock( the_thread ); + + node = next; + } while ( node != tail ); - _Thread_queue_Acquire( the_thread_queue, &lock_context ); + _Thread_Dispatch_enable( cpu_self ); + } else { + _Thread_queue_Queue_release( queue, lock_context ); } - _Thread_queue_Release( the_thread_queue, &lock_context ); + return flushed; } -- cgit v1.2.3