diff options
author | Sebastian Huber <sebastian.huber@embedded-brains.de> | 2015-04-30 13:12:54 +0200 |
---|---|---|
committer | Sebastian Huber <sebastian.huber@embedded-brains.de> | 2015-05-19 12:00:46 +0200 |
commit | cc18d7bec7b3c5515cb9e6cd9771d4b94309b3bd (patch) | |
tree | 59fd8c8ca70830762e632e255a2078f22ac6a821 /cpukit/score | |
parent | score: Delete _CORE_message_queue_Flush_support() (diff) | |
download | rtems-cc18d7bec7b3c5515cb9e6cd9771d4b94309b3bd.tar.bz2 |
score: Fine grained locking for message queues
Aggregate several critical sections into a bigger one. Sending and
receiving messages is now protected by an ISR lock. Thread dispatching
is only disabled in case a blocking operation is necessary. The message
copy procedure is done inside the critical section (interrupts
disabled). Thus this change may have a negative impact on the interrupt
latency in case very large messages are transferred.
Update #2273.
Diffstat (limited to 'cpukit/score')
-rw-r--r-- | cpukit/score/include/rtems/score/coremsgimpl.h | 121 | ||||
-rw-r--r-- | cpukit/score/src/coremsgbroadcast.c | 61 | ||||
-rw-r--r-- | cpukit/score/src/coremsgclose.c | 8 | ||||
-rw-r--r-- | cpukit/score/src/coremsgflush.c | 9 | ||||
-rw-r--r-- | cpukit/score/src/coremsginsert.c | 6 | ||||
-rw-r--r-- | cpukit/score/src/coremsgseize.c | 30 | ||||
-rw-r--r-- | cpukit/score/src/coremsgsubmit.c | 81 |
7 files changed, 208 insertions, 108 deletions
diff --git a/cpukit/score/include/rtems/score/coremsgimpl.h b/cpukit/score/include/rtems/score/coremsgimpl.h index 1f6796905b..51b5f3780f 100644 --- a/cpukit/score/include/rtems/score/coremsgimpl.h +++ b/cpukit/score/include/rtems/score/coremsgimpl.h @@ -21,6 +21,7 @@ #include <rtems/score/coremsg.h> #include <rtems/score/chainimpl.h> +#include <rtems/score/threaddispatch.h> #include <rtems/score/threadqimpl.h> #include <limits.h> @@ -172,11 +173,13 @@ void _CORE_message_queue_Close( * number of messages flushed from the queue is returned. * * @param[in] the_message_queue points to the message queue to flush + * @param[in] lock_context The lock context of the interrupt disable. * * @retval This method returns the number of message pending messages flushed. */ uint32_t _CORE_message_queue_Flush( - CORE_message_queue_Control *the_message_queue + CORE_message_queue_Control *the_message_queue, + ISR_lock_Context *lock_context ); #if defined(FUNCTIONALITY_NOT_CURRENTLY_USED_BY_ANY_API) @@ -215,6 +218,7 @@ uint32_t _CORE_message_queue_Flush( * a thread that is unblocked is actually a remote thread. * @param[out] count points to the variable that will contain the * number of tasks that are sent this message + * @param[in] lock_context The lock context of the interrupt disable. * @retval @a *count will contain the number of messages sent * @retval indication of the successful completion or reason for failure */ @@ -224,7 +228,8 @@ CORE_message_queue_Status _CORE_message_queue_Broadcast( size_t size, Objects_Id id, CORE_message_queue_API_mp_support_callout api_message_queue_mp_support, - uint32_t *count + uint32_t *count, + ISR_lock_Context *lock_context ); /** @@ -250,6 +255,7 @@ CORE_message_queue_Status _CORE_message_queue_Broadcast( * if the message queue is full. * @param[in] timeout is the maximum number of clock ticks that the calling * thread is willing to block if the message queue is full. + * @param[in] lock_context The lock context of the interrupt disable. * @retval indication of the successful completion or reason for failure */ CORE_message_queue_Status _CORE_message_queue_Submit( @@ -261,7 +267,8 @@ CORE_message_queue_Status _CORE_message_queue_Submit( CORE_message_queue_API_mp_support_callout api_message_queue_mp_support, CORE_message_queue_Submit_types submit_type, bool wait, - Watchdog_Interval timeout + Watchdog_Interval timeout, + ISR_lock_Context *lock_context ); /** @@ -287,6 +294,7 @@ CORE_message_queue_Status _CORE_message_queue_Submit( * if the message queue is empty. * @param[in] timeout is the maximum number of clock ticks that the calling * thread is willing to block if the message queue is empty. + * @param[in] lock_context The lock context of the interrupt disable. * * @retval indication of the successful completion or reason for failure. * On success, the location pointed to @a size_p will contain the @@ -305,7 +313,8 @@ void _CORE_message_queue_Seize( void *buffer, size_t *size_p, bool wait, - Watchdog_Interval timeout + Watchdog_Interval timeout, + ISR_lock_Context *lock_context ); /** @@ -338,8 +347,9 @@ RTEMS_INLINE_ROUTINE CORE_message_queue_Status _CORE_message_queue_Send( size_t size, Objects_Id id, CORE_message_queue_API_mp_support_callout api_message_queue_mp_support, - bool wait, - Watchdog_Interval timeout + bool wait, + Watchdog_Interval timeout, + ISR_lock_Context *lock_context ) { return _CORE_message_queue_Submit( @@ -351,7 +361,8 @@ RTEMS_INLINE_ROUTINE CORE_message_queue_Status _CORE_message_queue_Send( api_message_queue_mp_support, CORE_MESSAGE_QUEUE_SEND_REQUEST, wait, /* sender may block */ - timeout /* timeout interval */ + timeout, /* timeout interval */ + lock_context ); } @@ -364,8 +375,9 @@ RTEMS_INLINE_ROUTINE CORE_message_queue_Status _CORE_message_queue_Urgent( size_t size, Objects_Id id, CORE_message_queue_API_mp_support_callout api_message_queue_mp_support, - bool wait, - Watchdog_Interval timeout + bool wait, + Watchdog_Interval timeout, + ISR_lock_Context *lock_context ) { return _CORE_message_queue_Submit( @@ -377,10 +389,46 @@ RTEMS_INLINE_ROUTINE CORE_message_queue_Status _CORE_message_queue_Urgent( api_message_queue_mp_support, CORE_MESSAGE_QUEUE_URGENT_REQUEST, wait, /* sender may block */ - timeout /* timeout interval */ + timeout, /* timeout interval */ + lock_context ); } +RTEMS_INLINE_ROUTINE void _CORE_message_queue_Acquire( + CORE_message_queue_Control *the_message_queue, + ISR_lock_Context *lock_context +) +{ + _Thread_queue_Acquire( &the_message_queue->Wait_queue, lock_context ); +} + +RTEMS_INLINE_ROUTINE void _CORE_message_queue_Acquire_critical( + CORE_message_queue_Control *the_message_queue, + ISR_lock_Context *lock_context +) +{ + _Thread_queue_Acquire_critical( &the_message_queue->Wait_queue, lock_context ); + + #if defined(RTEMS_MULTIPROCESSING) + /* + * In case RTEMS_MULTIPROCESSING is enabled, then we have to prevent + * deletion of the executing thread after the thread queue operations. + */ + _Thread_Dispatch_disable(); + #endif +} + +RTEMS_INLINE_ROUTINE void _CORE_message_queue_Release( + CORE_message_queue_Control *the_message_queue, + ISR_lock_Context *lock_context +) +{ + _Thread_queue_Release( &the_message_queue->Wait_queue, lock_context ); + #if defined(RTEMS_MULTIPROCESSING) + _Thread_Dispatch_enable( _Per_CPU_Get() ); + #endif +} + /** * This routine copies the contents of the source message buffer * to the destination message buffer. @@ -404,7 +452,7 @@ _CORE_message_queue_Allocate_message_buffer ( ) { return (CORE_message_queue_Buffer_control *) - _Chain_Get( &the_message_queue->Inactive_messages ); + _Chain_Get_unprotected( &the_message_queue->Inactive_messages ); } /** @@ -416,7 +464,7 @@ RTEMS_INLINE_ROUTINE void _CORE_message_queue_Free_message_buffer ( CORE_message_queue_Buffer_control *the_message ) { - _Chain_Append( &the_message_queue->Inactive_messages, &the_message->Node ); + _Chain_Append_unprotected( &the_message_queue->Inactive_messages, &the_message->Node ); } /** @@ -510,6 +558,55 @@ RTEMS_INLINE_ROUTINE bool _CORE_message_queue_Is_priority( the_message_queue, the_handler, the_argument ) #endif +RTEMS_INLINE_ROUTINE Thread_Control *_CORE_message_queue_Dequeue_receiver( + CORE_message_queue_Control *the_message_queue, + const void *buffer, + size_t size, + CORE_message_queue_Submit_types submit_type, + ISR_lock_Context *lock_context +) +{ + Thread_Control *the_thread; + + /* + * If there are pending messages, then there can't be threads + * waiting for us to send them a message. + * + * NOTE: This check is critical because threads can block on + * send and receive and this ensures that we are broadcasting + * the message to threads waiting to receive -- not to send. + */ + if ( the_message_queue->number_of_pending_messages != 0 ) { + return NULL; + } + + /* + * There must be no pending messages if there is a thread waiting to + * receive a message. + */ + the_thread = _Thread_queue_First_locked( &the_message_queue->Wait_queue ); + if ( the_thread == NULL ) { + return NULL; + } + + *(size_t *) the_thread->Wait.return_argument = size; + the_thread->Wait.count = (uint32_t) submit_type; + + _CORE_message_queue_Copy_buffer( + buffer, + the_thread->Wait.return_argument_second.mutable_object, + size + ); + + _Thread_queue_Extract_critical( + &the_message_queue->Wait_queue, + the_thread, + lock_context + ); + + return the_thread; +} + /** @} */ #ifdef __cplusplus diff --git a/cpukit/score/src/coremsgbroadcast.c b/cpukit/score/src/coremsgbroadcast.c index ff9f3ec220..3ee587ccfb 100644 --- a/cpukit/score/src/coremsgbroadcast.c +++ b/cpukit/score/src/coremsgbroadcast.c @@ -20,7 +20,6 @@ #include <rtems/score/coremsgimpl.h> #include <rtems/score/objectimpl.h> -#include <rtems/score/thread.h> CORE_message_queue_Status _CORE_message_queue_Broadcast( CORE_message_queue_Control *the_message_queue, @@ -33,55 +32,45 @@ CORE_message_queue_Status _CORE_message_queue_Broadcast( Objects_Id id __attribute__((unused)), CORE_message_queue_API_mp_support_callout api_message_queue_mp_support __attribute__((unused)), #endif - uint32_t *count + uint32_t *count, + ISR_lock_Context *lock_context ) { - Thread_Control *the_thread; - uint32_t number_broadcasted; - Thread_Wait_information *waitp; + Thread_Control *the_thread; + uint32_t number_broadcasted; if ( size > the_message_queue->maximum_message_size ) { + _ISR_lock_ISR_enable( lock_context ); return CORE_MESSAGE_QUEUE_STATUS_INVALID_SIZE; } - /* - * If there are pending messages, then there can't be threads - * waiting for us to send them a message. - * - * NOTE: This check is critical because threads can block on - * send and receive and this ensures that we are broadcasting - * the message to threads waiting to receive -- not to send. - */ + number_broadcasted = 0; - if ( the_message_queue->number_of_pending_messages != 0 ) { - *count = 0; - return CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL; - } + _CORE_message_queue_Acquire_critical( the_message_queue, lock_context ); - /* - * There must be no pending messages if there is a thread waiting to - * receive a message. - */ - number_broadcasted = 0; - while ((the_thread = - _Thread_queue_Dequeue(&the_message_queue->Wait_queue))) { - waitp = &the_thread->Wait; + while ( + ( the_thread = + _CORE_message_queue_Dequeue_receiver( + the_message_queue, + buffer, + size, + 0, + lock_context + ) + ) + ) { number_broadcasted += 1; - _CORE_message_queue_Copy_buffer( - buffer, - waitp->return_argument_second.mutable_object, - size - ); +#if defined(RTEMS_MULTIPROCESSING) + if ( !_Objects_Is_local_id( the_thread->Object.id ) ) + (*api_message_queue_mp_support) ( the_thread, id ); +#endif - *(size_t *) the_thread->Wait.return_argument = size; + _CORE_message_queue_Acquire( the_message_queue, lock_context ); + } - #if defined(RTEMS_MULTIPROCESSING) - if ( !_Objects_Is_local_id( the_thread->Object.id ) ) - (*api_message_queue_mp_support) ( the_thread, id ); - #endif + _CORE_message_queue_Release( the_message_queue, lock_context ); - } *count = number_broadcasted; return CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL; } diff --git a/cpukit/score/src/coremsgclose.c b/cpukit/score/src/coremsgclose.c index d808a4efbc..e0d370d512 100644 --- a/cpukit/score/src/coremsgclose.c +++ b/cpukit/score/src/coremsgclose.c @@ -18,11 +18,7 @@ #include "config.h" #endif -#include <rtems/system.h> -#include <rtems/score/chain.h> -#include <rtems/score/isr.h> #include <rtems/score/coremsgimpl.h> -#include <rtems/score/thread.h> #include <rtems/score/wkspace.h> void _CORE_message_queue_Close( @@ -31,6 +27,7 @@ void _CORE_message_queue_Close( uint32_t status ) { + ISR_lock_Context lock_context; /* * This will flush blocked threads whether they were blocked on @@ -49,7 +46,8 @@ void _CORE_message_queue_Close( * the flush satisfying any blocked senders as a side-effect. */ - (void) _CORE_message_queue_Flush( the_message_queue ); + _ISR_lock_ISR_disable( &lock_context ); + (void) _CORE_message_queue_Flush( the_message_queue, &lock_context ); (void) _Workspace_Free( the_message_queue->message_buffers ); diff --git a/cpukit/score/src/coremsgflush.c b/cpukit/score/src/coremsgflush.c index 05683f0327..f67dcf28eb 100644 --- a/cpukit/score/src/coremsgflush.c +++ b/cpukit/score/src/coremsgflush.c @@ -20,13 +20,12 @@ #endif #include <rtems/score/coremsgimpl.h> -#include <rtems/score/isr.h> uint32_t _CORE_message_queue_Flush( - CORE_message_queue_Control *the_message_queue + CORE_message_queue_Control *the_message_queue, + ISR_lock_Context *lock_context ) { - ISR_Level level; Chain_Node *inactive_head; Chain_Node *inactive_first; Chain_Node *message_queue_first; @@ -60,7 +59,7 @@ uint32_t _CORE_message_queue_Flush( * fixed execution time that only deals with pending messages. */ - _ISR_Disable( level ); + _CORE_message_queue_Acquire_critical( the_message_queue, lock_context ); count = the_message_queue->number_of_pending_messages; if ( count != 0 ) { @@ -79,6 +78,6 @@ uint32_t _CORE_message_queue_Flush( _Chain_Initialize_empty( &the_message_queue->Pending_messages ); } - _ISR_Enable( level ); + _CORE_message_queue_Release( the_message_queue, lock_context ); return count; } diff --git a/cpukit/score/src/coremsginsert.c b/cpukit/score/src/coremsginsert.c index 28407bae98..0a73af8c71 100644 --- a/cpukit/score/src/coremsginsert.c +++ b/cpukit/score/src/coremsginsert.c @@ -19,7 +19,6 @@ #endif #include <rtems/score/coremsgimpl.h> -#include <rtems/score/isrlevel.h> #if defined(RTEMS_SCORE_COREMSG_ENABLE_MESSAGE_PRIORITY) static bool _CORE_message_queue_Order( @@ -45,7 +44,6 @@ void _CORE_message_queue_Insert_message( ) { Chain_Control *pending_messages; - ISR_Level level; #if defined(RTEMS_SCORE_COREMSG_ENABLE_NOTIFICATION) bool notify; #endif @@ -53,8 +51,6 @@ void _CORE_message_queue_Insert_message( _CORE_message_queue_Set_message_priority( the_message, submit_type ); pending_messages = &the_message_queue->Pending_messages; - _ISR_Disable( level ); - #if defined(RTEMS_SCORE_COREMSG_ENABLE_NOTIFICATION) notify = ( the_message_queue->number_of_pending_messages == 0 ); #endif @@ -74,8 +70,6 @@ void _CORE_message_queue_Insert_message( _Chain_Prepend_unprotected( pending_messages, &the_message->Node ); } - _ISR_Enable( level ); - #if defined(RTEMS_SCORE_COREMSG_ENABLE_NOTIFICATION) /* * According to POSIX, does this happen before or after the message diff --git a/cpukit/score/src/coremsgseize.c b/cpukit/score/src/coremsgseize.c index ec6cf8c52e..0d1c36fe47 100644 --- a/cpukit/score/src/coremsgseize.c +++ b/cpukit/score/src/coremsgseize.c @@ -33,18 +33,17 @@ void _CORE_message_queue_Seize( void *buffer, size_t *size_p, bool wait, - Watchdog_Interval timeout + Watchdog_Interval timeout, + ISR_lock_Context *lock_context ) { - ISR_lock_Context lock_context; CORE_message_queue_Buffer_control *the_message; executing->Wait.return_code = CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL; - _Thread_queue_Acquire( &the_message_queue->Wait_queue, &lock_context ); + _CORE_message_queue_Acquire_critical( the_message_queue, lock_context ); the_message = _CORE_message_queue_Get_pending_message( the_message_queue ); if ( the_message != NULL ) { the_message_queue->number_of_pending_messages -= 1; - _Thread_queue_Release( &the_message_queue->Wait_queue, &lock_context ); *size_p = the_message->Contents.size; executing->Wait.count = @@ -61,6 +60,7 @@ void _CORE_message_queue_Seize( * So return immediately. */ _CORE_message_queue_Free_message_buffer(the_message_queue, the_message); + _CORE_message_queue_Release( the_message_queue, lock_context ); return; #else { @@ -73,12 +73,15 @@ void _CORE_message_queue_Seize( * NOTE: If we note that the queue was not full before this receive, * then we can avoid this dequeue. */ - the_thread = _Thread_queue_Dequeue( &the_message_queue->Wait_queue ); - if ( !the_thread ) { + the_thread = _Thread_queue_First_locked( + &the_message_queue->Wait_queue + ); + if ( the_thread == NULL ) { _CORE_message_queue_Free_message_buffer( the_message_queue, the_message ); + _CORE_message_queue_Release( the_message_queue, lock_context ); return; } @@ -103,13 +106,21 @@ void _CORE_message_queue_Seize( the_message, _CORE_message_queue_Get_message_priority( the_message ) ); + _Thread_queue_Extract_critical( + &the_message_queue->Wait_queue, + the_thread, + lock_context + ); + #if defined(RTEMS_MULTIPROCESSING) + _Thread_Dispatch_enable( _Per_CPU_Get() ); + #endif return; } #endif } if ( !wait ) { - _Thread_queue_Release( &the_message_queue->Wait_queue, &lock_context ); + _CORE_message_queue_Release( the_message_queue, lock_context ); executing->Wait.return_code = CORE_MESSAGE_QUEUE_STATUS_UNSATISFIED_NOWAIT; return; } @@ -125,6 +136,9 @@ void _CORE_message_queue_Seize( STATES_WAITING_FOR_MESSAGE, timeout, CORE_MESSAGE_QUEUE_STATUS_TIMEOUT, - &lock_context + lock_context ); + #if defined(RTEMS_MULTIPROCESSING) + _Thread_Dispatch_enable( _Per_CPU_Get() ); + #endif } diff --git a/cpukit/score/src/coremsgsubmit.c b/cpukit/score/src/coremsgsubmit.c index 0d0965ffbf..37f857917c 100644 --- a/cpukit/score/src/coremsgsubmit.c +++ b/cpukit/score/src/coremsgsubmit.c @@ -38,36 +38,39 @@ CORE_message_queue_Status _CORE_message_queue_Submit( #endif CORE_message_queue_Submit_types submit_type, bool wait, - Watchdog_Interval timeout + Watchdog_Interval timeout, + ISR_lock_Context *lock_context ) { - CORE_message_queue_Buffer_control *the_message; - Thread_Control *the_thread; + CORE_message_queue_Buffer_control *the_message; + Thread_Control *the_thread; if ( size > the_message_queue->maximum_message_size ) { + _ISR_lock_ISR_enable( lock_context ); return CORE_MESSAGE_QUEUE_STATUS_INVALID_SIZE; } + _CORE_message_queue_Acquire_critical( the_message_queue, lock_context ); + /* * Is there a thread currently waiting on this message queue? */ - if ( the_message_queue->number_of_pending_messages == 0 ) { - the_thread = _Thread_queue_Dequeue( &the_message_queue->Wait_queue ); - if ( the_thread ) { - _CORE_message_queue_Copy_buffer( - buffer, - the_thread->Wait.return_argument_second.mutable_object, - size - ); - *(size_t *) the_thread->Wait.return_argument = size; - the_thread->Wait.count = (uint32_t) submit_type; - - #if defined(RTEMS_MULTIPROCESSING) - if ( !_Objects_Is_local_id( the_thread->Object.id ) ) - (*api_message_queue_mp_support) ( the_thread, id ); - #endif - return CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL; - } + + the_thread = _CORE_message_queue_Dequeue_receiver( + the_message_queue, + buffer, + size, + submit_type, + lock_context + ); + if ( the_thread != NULL ) { + #if defined(RTEMS_MULTIPROCESSING) + if ( !_Objects_Is_local_id( the_thread->Object.id ) ) + (*api_message_queue_mp_support) ( the_thread, id ); + + _Thread_Dispatch_enable( _Per_CPU_Get() ); + #endif + return CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL; } /* @@ -77,23 +80,25 @@ CORE_message_queue_Status _CORE_message_queue_Submit( the_message = _CORE_message_queue_Allocate_message_buffer( the_message_queue ); if ( the_message ) { + the_message->Contents.size = size; + _CORE_message_queue_Set_message_priority( the_message, submit_type ); _CORE_message_queue_Copy_buffer( buffer, the_message->Contents.buffer, size ); - the_message->Contents.size = size; - _CORE_message_queue_Set_message_priority( the_message, submit_type ); _CORE_message_queue_Insert_message( the_message_queue, the_message, submit_type ); + _CORE_message_queue_Release( the_message_queue, lock_context ); return CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL; } #if !defined(RTEMS_SCORE_COREMSG_ENABLE_BLOCKING_SEND) + _CORE_message_queue_Release( the_message_queue, lock_context ); return CORE_MESSAGE_QUEUE_STATUS_TOO_MANY; #else /* @@ -102,6 +107,7 @@ CORE_message_queue_Status _CORE_message_queue_Submit( * on the queue. */ if ( !wait ) { + _CORE_message_queue_Release( the_message_queue, lock_context ); return CORE_MESSAGE_QUEUE_STATUS_TOO_MANY; } @@ -110,6 +116,7 @@ CORE_message_queue_Status _CORE_message_queue_Submit( * deadly to block in an ISR. */ if ( _ISR_Is_in_progress() ) { + _CORE_message_queue_Release( the_message_queue, lock_context ); return CORE_MESSAGE_QUEUE_STATUS_UNSATISFIED; } @@ -119,20 +126,22 @@ CORE_message_queue_Status _CORE_message_queue_Submit( * it as a variable. Doing this emphasizes how dangerous it * would be to use this variable prior to here. */ - { - executing->Wait.id = id; - executing->Wait.return_argument_second.immutable_object = buffer; - executing->Wait.option = (uint32_t) size; - executing->Wait.count = submit_type; - - _Thread_queue_Enqueue( - &the_message_queue->Wait_queue, - executing, - STATES_WAITING_FOR_MESSAGE, - timeout, - CORE_MESSAGE_QUEUE_STATUS_TIMEOUT - ); - } + executing->Wait.id = id; + executing->Wait.return_argument_second.immutable_object = buffer; + executing->Wait.option = (uint32_t) size; + executing->Wait.count = submit_type; + + _Thread_queue_Enqueue_critical( + &the_message_queue->Wait_queue, + executing, + STATES_WAITING_FOR_MESSAGE, + timeout, + CORE_MESSAGE_QUEUE_STATUS_TIMEOUT, + lock_context + ); + #if defined(RTEMS_MULTIPROCESSING) + _Thread_Dispatch_enable( _Per_CPU_Get() ); + #endif return CORE_MESSAGE_QUEUE_STATUS_UNSATISFIED_WAIT; #endif |