diff options
Diffstat (limited to '')
-rw-r--r-- | cpukit/score/include/rtems/score/coremsg.h | 44 | ||||
-rw-r--r-- | cpukit/score/inline/rtems/score/coremsg.inl | 24 | ||||
-rw-r--r-- | cpukit/score/macros/rtems/score/coremsg.inl | 10 | ||||
-rw-r--r-- | cpukit/score/src/coremsgbroadcast.c | 19 | ||||
-rw-r--r-- | cpukit/score/src/coremsgsubmit.c | 146 |
5 files changed, 154 insertions, 89 deletions
diff --git a/cpukit/score/include/rtems/score/coremsg.h b/cpukit/score/include/rtems/score/coremsg.h index 6ffedee0af..818b1e8c18 100644 --- a/cpukit/score/include/rtems/score/coremsg.h +++ b/cpukit/score/include/rtems/score/coremsg.h @@ -166,13 +166,12 @@ void _CORE_message_queue_Close( ); /* - * * _CORE_message_queue_Flush * * DESCRIPTION: * - * This function flushes the message_queue's task wait queue. The number - * messages flushed from the queue is returned. + * This function flushes the message_queue's pending message queue. The + * number of messages flushed from the queue is returned. * */ @@ -194,7 +193,20 @@ unsigned32 _CORE_message_queue_Flush_support( ); /* + * _CORE_message_queue_Flush_waiting_threads + * + * DESCRIPTION: * + * This function flushes the threads which are blocked on this + * message_queue's pending message queue. They are unblocked whether + * blocked sending or receiving. + */ + +void _CORE_message_queue_Flush_waiting_threads( + CORE_message_queue_Control *the_message_queue +); + +/* * _CORE_message_queue_Broadcast * * DESCRIPTION: @@ -214,7 +226,6 @@ CORE_message_queue_Status _CORE_message_queue_Broadcast( ); /* - * * _CORE_message_queue_Submit * * DESCRIPTION: @@ -228,17 +239,18 @@ CORE_message_queue_Status _CORE_message_queue_Broadcast( * */ -CORE_message_queue_Status _CORE_message_queue_Submit( +void _CORE_message_queue_Submit( CORE_message_queue_Control *the_message_queue, void *buffer, unsigned32 size, Objects_Id id, CORE_message_queue_API_mp_support_callout api_message_queue_mp_support, - CORE_message_queue_Submit_types submit_type + CORE_message_queue_Submit_types submit_type, + boolean wait, + Watchdog_Interval timeout ); /* - * * _CORE_message_queue_Seize * * DESCRIPTION: @@ -248,6 +260,7 @@ CORE_message_queue_Status _CORE_message_queue_Submit( * inactive message pool. The thread will be blocked if wait is TRUE, * otherwise an error will be given to the thread if no messages are available. * + * NOTE: Returns message priority via return are in TCB. */ void _CORE_message_queue_Seize( @@ -256,10 +269,25 @@ void _CORE_message_queue_Seize( void *buffer, unsigned32 *size, boolean wait, - CORE_message_queue_Submit_types *priority, Watchdog_Interval timeout ); +/* + * _CORE_message_queue_Insert_message + * + * DESCRIPTION: + * + * This kernel routine inserts the specified message into the + * message queue. It is assumed that the message has been filled + * in before this routine is called. + */ + +void _CORE_message_queue_Insert_message( + CORE_message_queue_Control *the_message_queue, + CORE_message_queue_Buffer_control *the_message, + CORE_message_queue_Submit_types submit_type +); + #ifndef __RTEMS_APPLICATION__ #include <rtems/score/coremsg.inl> #endif diff --git a/cpukit/score/inline/rtems/score/coremsg.inl b/cpukit/score/inline/rtems/score/coremsg.inl index af16fbd4aa..7356ce9537 100644 --- a/cpukit/score/inline/rtems/score/coremsg.inl +++ b/cpukit/score/inline/rtems/score/coremsg.inl @@ -27,15 +27,17 @@ * This routine sends a message to the end of the specified message queue. */ -RTEMS_INLINE_ROUTINE CORE_message_queue_Status _CORE_message_queue_Send( +RTEMS_INLINE_ROUTINE void _CORE_message_queue_Send( CORE_message_queue_Control *the_message_queue, void *buffer, unsigned32 size, Objects_Id id, - CORE_message_queue_API_mp_support_callout api_message_queue_mp_support + CORE_message_queue_API_mp_support_callout api_message_queue_mp_support, + boolean wait, + Watchdog_Interval timeout ) { - return _CORE_message_queue_Submit( + _CORE_message_queue_Submit( the_message_queue, buffer, size, @@ -45,7 +47,9 @@ RTEMS_INLINE_ROUTINE CORE_message_queue_Status _CORE_message_queue_Send( #else NULL, #endif - CORE_MESSAGE_QUEUE_SEND_REQUEST + CORE_MESSAGE_QUEUE_SEND_REQUEST, + wait, /* sender may block */ + timeout /* timeout interval */ ); } @@ -58,15 +62,17 @@ RTEMS_INLINE_ROUTINE CORE_message_queue_Status _CORE_message_queue_Send( * This routine sends a message to the front of the specified message queue. */ -RTEMS_INLINE_ROUTINE CORE_message_queue_Status _CORE_message_queue_Urgent( +RTEMS_INLINE_ROUTINE void _CORE_message_queue_Urgent( CORE_message_queue_Control *the_message_queue, void *buffer, unsigned32 size, Objects_Id id, - CORE_message_queue_API_mp_support_callout api_message_queue_mp_support + CORE_message_queue_API_mp_support_callout api_message_queue_mp_support, + boolean wait, + Watchdog_Interval timeout ) { - return _CORE_message_queue_Submit( + _CORE_message_queue_Submit( the_message_queue, buffer, size, @@ -76,7 +82,9 @@ RTEMS_INLINE_ROUTINE CORE_message_queue_Status _CORE_message_queue_Urgent( #else NULL, #endif - CORE_MESSAGE_QUEUE_URGENT_REQUEST + CORE_MESSAGE_QUEUE_URGENT_REQUEST, + wait, /* sender may block */ + timeout /* timeout interval */ ); } diff --git a/cpukit/score/macros/rtems/score/coremsg.inl b/cpukit/score/macros/rtems/score/coremsg.inl index bc45a51ac3..0717b3eea3 100644 --- a/cpukit/score/macros/rtems/score/coremsg.inl +++ b/cpukit/score/macros/rtems/score/coremsg.inl @@ -23,9 +23,10 @@ */ #define _CORE_message_queue_Send( _the_message_queue, _buffer, _size, \ -_id, _api_message_queue_mp_support ) \ + _id, _api_message_queue_mp_support, _wait, _timeout ) \ _CORE_message_queue_Submit( (_the_message_queue), (_buffer), (_size), \ - (_id), (_api_message_queue_mp_support), CORE_MESSAGE_QUEUE_SEND_REQUEST ) + (_id), (_api_message_queue_mp_support), \ + CORE_MESSAGE_QUEUE_SEND_REQUEST, (_wait), (_timeout) /*PAGE * @@ -34,9 +35,10 @@ _id, _api_message_queue_mp_support ) \ */ #define _CORE_message_queue_Urgent( _the_message_queue, _buffer, _size, \ -_id, _api_message_queue_mp_support ) \ + _id, _api_message_queue_mp_support, _wait, _timeout ) \ _CORE_message_queue_Submit( (_the_message_queue), (_buffer), (_size), \ - (_id), (_api_message_queue_mp_support), CORE_MESSAGE_QUEUE_URGENT_REQUEST ) + (_id), (_api_message_queue_mp_support), \ + CORE_MESSAGE_QUEUE_URGENT_REQUEST, (_wait), (_timeout) /*PAGE * diff --git a/cpukit/score/src/coremsgbroadcast.c b/cpukit/score/src/coremsgbroadcast.c index 2e6f649545..18e148ab1c 100644 --- a/cpukit/score/src/coremsgbroadcast.c +++ b/cpukit/score/src/coremsgbroadcast.c @@ -64,6 +64,25 @@ CORE_message_queue_Status _CORE_message_queue_Broadcast( Thread_Wait_information *waitp; unsigned32 constrained_size; + /* + * If there are pending messages, then there can't be threads + * waiting for us to send them a message. + * + * NOTE: This check is critical because threads can block on + * send and receive and this ensures that we are broadcasting + * the message to threads waiting to receive -- not to send. + */ + + if ( the_message_queue->number_of_pending_messages != 0 ) { + *count = 0; + return CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL; + } + + /* + * There must be no pending messages if there is a thread waiting to + * receive a message. + */ + number_broadcasted = 0; while ((the_thread = _Thread_queue_Dequeue(&the_message_queue->Wait_queue))) { waitp = &the_thread->Wait; diff --git a/cpukit/score/src/coremsgsubmit.c b/cpukit/score/src/coremsgsubmit.c index 4e15ab5bc0..6829351c18 100644 --- a/cpukit/score/src/coremsgsubmit.c +++ b/cpukit/score/src/coremsgsubmit.c @@ -53,102 +53,110 @@ * error code - if unsuccessful */ -CORE_message_queue_Status _CORE_message_queue_Submit( +void _CORE_message_queue_Submit( CORE_message_queue_Control *the_message_queue, void *buffer, unsigned32 size, Objects_Id id, CORE_message_queue_API_mp_support_callout api_message_queue_mp_support, - CORE_message_queue_Submit_types submit_type + CORE_message_queue_Submit_types submit_type, + boolean wait, + Watchdog_Interval timeout ) { + ISR_Level level; CORE_message_queue_Buffer_control *the_message; Thread_Control *the_thread; + Thread_Control *executing; - if ( size > the_message_queue->maximum_message_size ) - return CORE_MESSAGE_QUEUE_STATUS_INVALID_SIZE; + _Thread_Executing->Wait.return_code = CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL; + + if ( size > the_message_queue->maximum_message_size ) { + _Thread_Executing->Wait.return_code = + CORE_MESSAGE_QUEUE_STATUS_INVALID_SIZE; + return; + } /* - * Is there a thread currently waiting on this message queue? + * Is there a thread currently waiting on this message queue? */ - the_thread = _Thread_queue_Dequeue( &the_message_queue->Wait_queue ); - if ( the_thread ) { - _CORE_message_queue_Copy_buffer( - buffer, - the_thread->Wait.return_argument, - size - ); - *(unsigned32 *)the_thread->Wait.return_argument_1 = size; - the_thread->Wait.count = submit_type; + if ( the_message_queue->number_of_pending_messages == 0 ) { + the_thread = _Thread_queue_Dequeue( &the_message_queue->Wait_queue ); + if ( the_thread ) { + _CORE_message_queue_Copy_buffer( + buffer, + the_thread->Wait.return_argument, + size + ); + *(unsigned32 *)the_thread->Wait.return_argument_1 = size; + the_thread->Wait.count = submit_type; #if defined(RTEMS_MULTIPROCESSING) - if ( !_Objects_Is_local_id( the_thread->Object.id ) ) - (*api_message_queue_mp_support) ( the_thread, id ); + if ( !_Objects_Is_local_id( the_thread->Object.id ) ) + (*api_message_queue_mp_support) ( the_thread, id ); #endif - - return CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL; + return; + } } /* - * No one waiting on this one currently. - * Allocate a message buffer and store it away + * No one waiting on the message queue at this time, so attempt to + * queue the message up for a future receive. */ - if ( the_message_queue->number_of_pending_messages == + if ( the_message_queue->number_of_pending_messages < the_message_queue->maximum_pending_messages ) { - return CORE_MESSAGE_QUEUE_STATUS_TOO_MANY; - } - the_message = _CORE_message_queue_Allocate_message_buffer(the_message_queue); - if ( the_message == 0 ) - return CORE_MESSAGE_QUEUE_STATUS_UNSATISFIED; - - _CORE_message_queue_Copy_buffer( buffer, the_message->Contents.buffer, size ); - the_message->Contents.size = size; - the_message->priority = submit_type; - - the_message_queue->number_of_pending_messages += 1; - - switch ( submit_type ) { - case CORE_MESSAGE_QUEUE_SEND_REQUEST: - _CORE_message_queue_Append( the_message_queue, the_message ); - break; - case CORE_MESSAGE_QUEUE_URGENT_REQUEST: - _CORE_message_queue_Prepend( the_message_queue, the_message ); - break; - default: - /* XXX interrupt critical section needs to be addressed */ - { - CORE_message_queue_Buffer_control *this_message; - Chain_Node *the_node; - - the_message->priority = submit_type; - for ( the_node = the_message_queue->Pending_messages.first ; - !_Chain_Is_tail( &the_message_queue->Pending_messages, the_node ) ; - the_node = the_node->next ) { - - this_message = (CORE_message_queue_Buffer_control *) the_node; - - if ( this_message->priority >= the_message->priority ) - continue; - - _Chain_Insert( the_node, &the_message->Node ); - break; - } - } - break; + the_message = + _CORE_message_queue_Allocate_message_buffer( the_message_queue ); + + /* + * NOTE: If the system is consistent, this error should never occur. + */ + if ( !the_message ) { + _Thread_Executing->Wait.return_code = + CORE_MESSAGE_QUEUE_STATUS_UNSATISFIED; + return; + } + + _CORE_message_queue_Copy_buffer( + buffer, + the_message->Contents.buffer, + size + ); + the_message->Contents.size = size; + the_message->priority = submit_type; + + _CORE_message_queue_Insert_message( + the_message_queue, + the_message, + submit_type + ); + return; } /* - * According to POSIX, does this happen before or after the message - * is actually enqueued. It is logical to think afterwards, because - * the message is actually in the queue at this point. + * No message buffers were available so we may need to return an + * overflow error or block the sender until the message is placed + * on the queue. */ - if ( the_message_queue->number_of_pending_messages == 1 && - the_message_queue->notify_handler ) - (*the_message_queue->notify_handler)( the_message_queue->notify_argument ); - - return CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL; + if ( !wait ) { + _Thread_Executing->Wait.return_code = CORE_MESSAGE_QUEUE_STATUS_TOO_MANY; + return; + } + + executing = _Thread_Executing; + + _ISR_Disable( level ); + _Thread_queue_Enter_critical_section( &the_message_queue->Wait_queue ); + executing->Wait.queue = &the_message_queue->Wait_queue; + executing->Wait.id = id; + executing->Wait.return_argument = (void *)buffer; + executing->Wait.return_argument_1 = (void *)size; + executing->Wait.count = submit_type; + _ISR_Enable( level ); + + _Thread_queue_Enqueue( &the_message_queue->Wait_queue, timeout ); } |