diff options
Diffstat (limited to '')
-rw-r--r-- | cpukit/score/src/coremsgbroadcast.c | 19 | ||||
-rw-r--r-- | cpukit/score/src/coremsgsubmit.c | 146 |
2 files changed, 96 insertions, 69 deletions
diff --git a/cpukit/score/src/coremsgbroadcast.c b/cpukit/score/src/coremsgbroadcast.c index 2e6f649545..18e148ab1c 100644 --- a/cpukit/score/src/coremsgbroadcast.c +++ b/cpukit/score/src/coremsgbroadcast.c @@ -64,6 +64,25 @@ CORE_message_queue_Status _CORE_message_queue_Broadcast( Thread_Wait_information *waitp; unsigned32 constrained_size; + /* + * If there are pending messages, then there can't be threads + * waiting for us to send them a message. + * + * NOTE: This check is critical because threads can block on + * send and receive and this ensures that we are broadcasting + * the message to threads waiting to receive -- not to send. + */ + + if ( the_message_queue->number_of_pending_messages != 0 ) { + *count = 0; + return CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL; + } + + /* + * There must be no pending messages if there is a thread waiting to + * receive a message. + */ + number_broadcasted = 0; while ((the_thread = _Thread_queue_Dequeue(&the_message_queue->Wait_queue))) { waitp = &the_thread->Wait; diff --git a/cpukit/score/src/coremsgsubmit.c b/cpukit/score/src/coremsgsubmit.c index 4e15ab5bc0..6829351c18 100644 --- a/cpukit/score/src/coremsgsubmit.c +++ b/cpukit/score/src/coremsgsubmit.c @@ -53,102 +53,110 @@ * error code - if unsuccessful */ -CORE_message_queue_Status _CORE_message_queue_Submit( +void _CORE_message_queue_Submit( CORE_message_queue_Control *the_message_queue, void *buffer, unsigned32 size, Objects_Id id, CORE_message_queue_API_mp_support_callout api_message_queue_mp_support, - CORE_message_queue_Submit_types submit_type + CORE_message_queue_Submit_types submit_type, + boolean wait, + Watchdog_Interval timeout ) { + ISR_Level level; CORE_message_queue_Buffer_control *the_message; Thread_Control *the_thread; + Thread_Control *executing; - if ( size > the_message_queue->maximum_message_size ) - return CORE_MESSAGE_QUEUE_STATUS_INVALID_SIZE; + _Thread_Executing->Wait.return_code = CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL; + + if ( size > the_message_queue->maximum_message_size ) { + _Thread_Executing->Wait.return_code = + CORE_MESSAGE_QUEUE_STATUS_INVALID_SIZE; + return; + } /* - * Is there a thread currently waiting on this message queue? + * Is there a thread currently waiting on this message queue? */ - the_thread = _Thread_queue_Dequeue( &the_message_queue->Wait_queue ); - if ( the_thread ) { - _CORE_message_queue_Copy_buffer( - buffer, - the_thread->Wait.return_argument, - size - ); - *(unsigned32 *)the_thread->Wait.return_argument_1 = size; - the_thread->Wait.count = submit_type; + if ( the_message_queue->number_of_pending_messages == 0 ) { + the_thread = _Thread_queue_Dequeue( &the_message_queue->Wait_queue ); + if ( the_thread ) { + _CORE_message_queue_Copy_buffer( + buffer, + the_thread->Wait.return_argument, + size + ); + *(unsigned32 *)the_thread->Wait.return_argument_1 = size; + the_thread->Wait.count = submit_type; #if defined(RTEMS_MULTIPROCESSING) - if ( !_Objects_Is_local_id( the_thread->Object.id ) ) - (*api_message_queue_mp_support) ( the_thread, id ); + if ( !_Objects_Is_local_id( the_thread->Object.id ) ) + (*api_message_queue_mp_support) ( the_thread, id ); #endif - - return CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL; + return; + } } /* - * No one waiting on this one currently. - * Allocate a message buffer and store it away + * No one waiting on the message queue at this time, so attempt to + * queue the message up for a future receive. */ - if ( the_message_queue->number_of_pending_messages == + if ( the_message_queue->number_of_pending_messages < the_message_queue->maximum_pending_messages ) { - return CORE_MESSAGE_QUEUE_STATUS_TOO_MANY; - } - the_message = _CORE_message_queue_Allocate_message_buffer(the_message_queue); - if ( the_message == 0 ) - return CORE_MESSAGE_QUEUE_STATUS_UNSATISFIED; - - _CORE_message_queue_Copy_buffer( buffer, the_message->Contents.buffer, size ); - the_message->Contents.size = size; - the_message->priority = submit_type; - - the_message_queue->number_of_pending_messages += 1; - - switch ( submit_type ) { - case CORE_MESSAGE_QUEUE_SEND_REQUEST: - _CORE_message_queue_Append( the_message_queue, the_message ); - break; - case CORE_MESSAGE_QUEUE_URGENT_REQUEST: - _CORE_message_queue_Prepend( the_message_queue, the_message ); - break; - default: - /* XXX interrupt critical section needs to be addressed */ - { - CORE_message_queue_Buffer_control *this_message; - Chain_Node *the_node; - - the_message->priority = submit_type; - for ( the_node = the_message_queue->Pending_messages.first ; - !_Chain_Is_tail( &the_message_queue->Pending_messages, the_node ) ; - the_node = the_node->next ) { - - this_message = (CORE_message_queue_Buffer_control *) the_node; - - if ( this_message->priority >= the_message->priority ) - continue; - - _Chain_Insert( the_node, &the_message->Node ); - break; - } - } - break; + the_message = + _CORE_message_queue_Allocate_message_buffer( the_message_queue ); + + /* + * NOTE: If the system is consistent, this error should never occur. + */ + if ( !the_message ) { + _Thread_Executing->Wait.return_code = + CORE_MESSAGE_QUEUE_STATUS_UNSATISFIED; + return; + } + + _CORE_message_queue_Copy_buffer( + buffer, + the_message->Contents.buffer, + size + ); + the_message->Contents.size = size; + the_message->priority = submit_type; + + _CORE_message_queue_Insert_message( + the_message_queue, + the_message, + submit_type + ); + return; } /* - * According to POSIX, does this happen before or after the message - * is actually enqueued. It is logical to think afterwards, because - * the message is actually in the queue at this point. + * No message buffers were available so we may need to return an + * overflow error or block the sender until the message is placed + * on the queue. */ - if ( the_message_queue->number_of_pending_messages == 1 && - the_message_queue->notify_handler ) - (*the_message_queue->notify_handler)( the_message_queue->notify_argument ); - - return CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL; + if ( !wait ) { + _Thread_Executing->Wait.return_code = CORE_MESSAGE_QUEUE_STATUS_TOO_MANY; + return; + } + + executing = _Thread_Executing; + + _ISR_Disable( level ); + _Thread_queue_Enter_critical_section( &the_message_queue->Wait_queue ); + executing->Wait.queue = &the_message_queue->Wait_queue; + executing->Wait.id = id; + executing->Wait.return_argument = (void *)buffer; + executing->Wait.return_argument_1 = (void *)size; + executing->Wait.count = submit_type; + _ISR_Enable( level ); + + _Thread_queue_Enqueue( &the_message_queue->Wait_queue, timeout ); } |